001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.Map;
022 import java.util.Map.Entry;
023 import java.util.concurrent.ConcurrentHashMap;
024 import javax.transaction.xa.XAException;
025 import org.apache.activemq.broker.BrokerService;
026 import org.apache.activemq.broker.BrokerServiceAware;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.command.TransactionId;
032 import org.apache.activemq.command.XATransactionId;
033 import org.apache.activemq.kaha.RuntimeStoreException;
034 import org.apache.activemq.store.MessageStore;
035 import org.apache.activemq.store.ProxyMessageStore;
036 import org.apache.activemq.store.ProxyTopicMessageStore;
037 import org.apache.activemq.store.TopicMessageStore;
038 import org.apache.activemq.store.TransactionRecoveryListener;
039 import org.apache.activemq.store.TransactionStore;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 /**
044 * Provides a TransactionStore implementation that can create transaction aware
045 * MessageStore objects from non transaction aware MessageStore objects.
046 *
047 *
048 */
049 public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
050 private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
051
052 private final Map transactions = new ConcurrentHashMap();
053 private final Map prepared;
054 private final KahaPersistenceAdapter adaptor;
055
056 private BrokerService brokerService;
057
058 KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
059 this.adaptor = adaptor;
060 this.prepared = preparedMap;
061 }
062
063 public MessageStore proxy(MessageStore messageStore) {
064 return new ProxyMessageStore(messageStore) {
065 @Override
066 public void addMessage(ConnectionContext context, final Message send) throws IOException {
067 KahaTransactionStore.this.addMessage(getDelegate(), send);
068 }
069
070 @Override
071 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
072 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
073 }
074 };
075 }
076
077 public TopicMessageStore proxy(TopicMessageStore messageStore) {
078 return new ProxyTopicMessageStore(messageStore) {
079 @Override
080 public void addMessage(ConnectionContext context, final Message send) throws IOException {
081 KahaTransactionStore.this.addMessage(getDelegate(), send);
082 }
083
084 @Override
085 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
086 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
087 }
088
089 @Override
090 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
091 MessageId messageId, MessageAck ack) throws IOException {
092 KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
093 }
094 };
095 }
096
097 /**
098 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
099 */
100 public void prepare(TransactionId txid) {
101 KahaTransaction tx = getTx(txid);
102 if (tx != null) {
103 tx.prepare();
104 prepared.put(txid, tx);
105 }
106 }
107
108 public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
109 if(before != null) {
110 before.run();
111 }
112 KahaTransaction tx = getTx(txid);
113 if (tx != null) {
114 tx.commit(this);
115 removeTx(txid);
116 }
117 if (after != null) {
118 after.run();
119 }
120 }
121
122 /**
123 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
124 */
125 public void rollback(TransactionId txid) {
126 KahaTransaction tx = getTx(txid);
127 if (tx != null) {
128 tx.rollback();
129 removeTx(txid);
130 }
131 }
132
133 public void start() throws Exception {
134 }
135
136 public void stop() throws Exception {
137 }
138
139 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
140 for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
141 Map.Entry entry = (Entry)i.next();
142 XATransactionId xid = (XATransactionId)entry.getKey();
143 KahaTransaction kt = (KahaTransaction)entry.getValue();
144 listener.recover(xid, kt.getMessages(), kt.getAcks());
145 }
146 }
147
148 /**
149 * @param message
150 * @throws IOException
151 */
152 void addMessage(final MessageStore destination, final Message message) throws IOException {
153 try {
154 if (message.isInTransaction()) {
155 KahaTransaction tx = getOrCreateTx(message.getTransactionId());
156 tx.add((KahaMessageStore)destination, message);
157 } else {
158 destination.addMessage(null, message);
159 }
160 } catch (RuntimeStoreException rse) {
161 if (rse.getCause() instanceof IOException) {
162 brokerService.handleIOException((IOException)rse.getCause());
163 }
164 throw rse;
165 }
166 }
167
168 /**
169 * @param ack
170 * @throws IOException
171 */
172 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
173 try {
174 if (ack.isInTransaction()) {
175 KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
176 tx.add((KahaMessageStore)destination, ack);
177 } else {
178 destination.removeMessage(null, ack);
179 }
180 } catch (RuntimeStoreException rse) {
181 if (rse.getCause() instanceof IOException) {
182 brokerService.handleIOException((IOException)rse.getCause());
183 }
184 throw rse;
185 }
186 }
187
188 final void acknowledge(final TopicMessageStore destination, String clientId,
189 String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
190 try {
191 if (ack.isInTransaction()) {
192 KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
193 tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
194 } else {
195 destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
196 }
197 } catch (RuntimeStoreException rse) {
198 if (rse.getCause() instanceof IOException) {
199 brokerService.handleIOException((IOException)rse.getCause());
200 }
201 throw rse;
202 }
203 }
204
205 protected synchronized KahaTransaction getTx(TransactionId key) {
206 KahaTransaction result = (KahaTransaction)transactions.get(key);
207 if (result == null) {
208 result = (KahaTransaction)prepared.get(key);
209 }
210 return result;
211 }
212
213 protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
214 KahaTransaction result = (KahaTransaction)transactions.get(key);
215 if (result == null) {
216 result = new KahaTransaction();
217 transactions.put(key, result);
218 }
219 return result;
220 }
221
222 protected synchronized void removeTx(TransactionId key) {
223 transactions.remove(key);
224 prepared.remove(key);
225 }
226
227 public void delete() {
228 transactions.clear();
229 prepared.clear();
230 }
231
232 protected MessageStore getStoreById(Object id) {
233 return adaptor.retrieveMessageStore(id);
234 }
235
236 public void setBrokerService(BrokerService brokerService) {
237 this.brokerService = brokerService;
238 }
239 }