001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInputStream;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.concurrent.CancellationException;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.ExecutionException;
028 import java.util.concurrent.Future;
029 import javax.transaction.xa.XAException;
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.command.MessageAck;
033 import org.apache.activemq.command.MessageId;
034 import org.apache.activemq.command.TransactionId;
035 import org.apache.activemq.command.XATransactionId;
036 import org.apache.activemq.openwire.OpenWireFormat;
037 import org.apache.activemq.protobuf.Buffer;
038 import org.apache.activemq.store.AbstractMessageStore;
039 import org.apache.activemq.store.MessageStore;
040 import org.apache.activemq.store.ProxyMessageStore;
041 import org.apache.activemq.store.ProxyTopicMessageStore;
042 import org.apache.activemq.store.TopicMessageStore;
043 import org.apache.activemq.store.TransactionRecoveryListener;
044 import org.apache.activemq.store.TransactionStore;
045 import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
046 import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047 import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
048 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052 import org.apache.activemq.wireformat.WireFormat;
053 import org.slf4j.Logger;
054 import org.slf4j.LoggerFactory;
055
056 /**
057 * Provides a TransactionStore implementation that can create transaction aware
058 * MessageStore objects from non transaction aware MessageStore objects.
059 *
060 *
061 */
062 public class KahaDBTransactionStore implements TransactionStore {
063 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065 private final WireFormat wireFormat = new OpenWireFormat();
066 private final KahaDBStore theStore;
067
068 public KahaDBTransactionStore(KahaDBStore theStore) {
069 this.theStore = theStore;
070 }
071
072 public class Tx {
073 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
074
075 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
076
077 public void add(AddMessageCommand msg) {
078 messages.add(msg);
079 }
080
081 public void add(RemoveMessageCommand ack) {
082 acks.add(ack);
083 }
084
085 public Message[] getMessages() {
086 Message rc[] = new Message[messages.size()];
087 int count = 0;
088 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
089 AddMessageCommand cmd = iter.next();
090 rc[count++] = cmd.getMessage();
091 }
092 return rc;
093 }
094
095 public MessageAck[] getAcks() {
096 MessageAck rc[] = new MessageAck[acks.size()];
097 int count = 0;
098 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
099 RemoveMessageCommand cmd = iter.next();
100 rc[count++] = cmd.getMessageAck();
101 }
102 return rc;
103 }
104
105 /**
106 * @return true if something to commit
107 * @throws IOException
108 */
109 public List<Future<Object>> commit() throws IOException {
110 List<Future<Object>> results = new ArrayList<Future<Object>>();
111 // Do all the message adds.
112 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
113 AddMessageCommand cmd = iter.next();
114 results.add(cmd.run());
115
116 }
117 // And removes..
118 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
119 RemoveMessageCommand cmd = iter.next();
120 cmd.run();
121 results.add(cmd.run());
122 }
123
124 return results;
125 }
126 }
127
128 public abstract class AddMessageCommand {
129 private final ConnectionContext ctx;
130 AddMessageCommand(ConnectionContext ctx) {
131 this.ctx = ctx;
132 }
133 abstract Message getMessage();
134 Future<Object> run() throws IOException {
135 return run(this.ctx);
136 }
137 abstract Future<Object> run(ConnectionContext ctx) throws IOException;
138 }
139
140 public abstract class RemoveMessageCommand {
141
142 private final ConnectionContext ctx;
143 RemoveMessageCommand(ConnectionContext ctx) {
144 this.ctx = ctx;
145 }
146 abstract MessageAck getMessageAck();
147 Future<Object> run() throws IOException {
148 return run(this.ctx);
149 }
150 abstract Future<Object> run(ConnectionContext context) throws IOException;
151 }
152
153 public MessageStore proxy(MessageStore messageStore) {
154 return new ProxyMessageStore(messageStore) {
155 @Override
156 public void addMessage(ConnectionContext context, final Message send) throws IOException {
157 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
158 }
159
160 @Override
161 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
162 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
163 }
164
165 @Override
166 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
167 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
168 }
169
170 @Override
171 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
172 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
173 }
174 };
175 }
176
177 public TopicMessageStore proxy(TopicMessageStore messageStore) {
178 return new ProxyTopicMessageStore(messageStore) {
179 @Override
180 public void addMessage(ConnectionContext context, final Message send) throws IOException {
181 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
182 }
183
184 @Override
185 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
186 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
187 }
188
189 @Override
190 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
191 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
192 }
193
194 @Override
195 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
196 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
197 }
198
199 @Override
200 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
201 MessageId messageId, MessageAck ack) throws IOException {
202 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
203 subscriptionName, messageId, ack);
204 }
205
206 };
207 }
208
209 /**
210 * @throws IOException
211 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
212 */
213 public void prepare(TransactionId txid) throws IOException {
214 inflightTransactions.remove(txid);
215 KahaTransactionInfo info = getTransactionInfo(txid);
216 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
217 }
218
219 public Tx getTx(Object txid) {
220 Tx tx = inflightTransactions.get(txid);
221 if (tx == null) {
222 tx = new Tx();
223 inflightTransactions.put(txid, tx);
224 }
225 return tx;
226 }
227
228 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
229 throws IOException {
230 if (txid != null) {
231 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
232 if (preCommit != null) {
233 preCommit.run();
234 }
235 Tx tx = inflightTransactions.remove(txid);
236 if (tx != null) {
237 List<Future<Object>> results = tx.commit();
238 boolean doneSomething = false;
239 for (Future<Object> result : results) {
240 try {
241 result.get();
242 } catch (InterruptedException e) {
243 theStore.brokerService.handleIOException(new IOException(e.getMessage()));
244 } catch (ExecutionException e) {
245 theStore.brokerService.handleIOException(new IOException(e.getMessage()));
246 }catch(CancellationException e) {
247 }
248 if (!result.isCancelled()) {
249 doneSomething = true;
250 }
251 }
252 if (postCommit != null) {
253 postCommit.run();
254 }
255 if (doneSomething) {
256 KahaTransactionInfo info = getTransactionInfo(txid);
257 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
258 }
259 }else {
260 //The Tx will be null for failed over clients - lets run their post commits
261 if (postCommit != null) {
262 postCommit.run();
263 }
264 }
265
266 } else {
267 KahaTransactionInfo info = getTransactionInfo(txid);
268 // ensure message order w.r.t to cursor and store for setBatch()
269 synchronized (this) {
270 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
271 }
272 }
273 }else {
274 LOG.error("Null transaction passed on commit");
275 }
276 }
277
278 /**
279 * @throws IOException
280 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
281 */
282 public void rollback(TransactionId txid) throws IOException {
283 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
284 KahaTransactionInfo info = getTransactionInfo(txid);
285 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
286 } else {
287 inflightTransactions.remove(txid);
288 }
289 }
290
291 public void start() throws Exception {
292 }
293
294 public void stop() throws Exception {
295 }
296
297 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
298 // All the inflight transactions get rolled back..
299 // inflightTransactions.clear();
300 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
301 XATransactionId xid = (XATransactionId) entry.getKey();
302 ArrayList<Message> messageList = new ArrayList<Message>();
303 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
304
305 for (Operation op : entry.getValue()) {
306 if (op.getClass() == AddOpperation.class) {
307 AddOpperation addOp = (AddOpperation) op;
308 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
309 .newInput()));
310 messageList.add(msg);
311 } else {
312 RemoveOpperation rmOp = (RemoveOpperation) op;
313 Buffer ackb = rmOp.getCommand().getAck();
314 MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
315 ackList.add(ack);
316 }
317 }
318
319 Message[] addedMessages = new Message[messageList.size()];
320 MessageAck[] acks = new MessageAck[ackList.size()];
321 messageList.toArray(addedMessages);
322 ackList.toArray(acks);
323 listener.recover(xid, addedMessages, acks);
324 }
325 }
326
327 /**
328 * @param message
329 * @throws IOException
330 */
331 void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
332 throws IOException {
333
334 if (message.getTransactionId() != null) {
335 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
336 destination.addMessage(context, message);
337 } else {
338 Tx tx = getTx(message.getTransactionId());
339 tx.add(new AddMessageCommand(context) {
340 @Override
341 public Message getMessage() {
342 return message;
343 }
344 @Override
345 public Future<Object> run(ConnectionContext ctx) throws IOException {
346 destination.addMessage(ctx, message);
347 return AbstractMessageStore.FUTURE;
348 }
349
350 });
351 }
352 } else {
353 destination.addMessage(context, message);
354 }
355 }
356
357 Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
358 throws IOException {
359
360 if (message.getTransactionId() != null) {
361 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
362 destination.addMessage(context, message);
363 return AbstractMessageStore.FUTURE;
364 } else {
365 Tx tx = getTx(message.getTransactionId());
366 tx.add(new AddMessageCommand(context) {
367 @Override
368 public Message getMessage() {
369 return message;
370 }
371 @Override
372 public Future<Object> run(ConnectionContext ctx) throws IOException {
373 return destination.asyncAddQueueMessage(ctx, message);
374 }
375
376 });
377 return AbstractMessageStore.FUTURE;
378 }
379 } else {
380 return destination.asyncAddQueueMessage(context, message);
381 }
382 }
383
384 Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
385 throws IOException {
386
387 if (message.getTransactionId() != null) {
388 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
389 destination.addMessage(context, message);
390 return AbstractMessageStore.FUTURE;
391 } else {
392 Tx tx = getTx(message.getTransactionId());
393 tx.add(new AddMessageCommand(context) {
394 @Override
395 public Message getMessage() {
396 return message;
397 }
398 @Override
399 public Future run(ConnectionContext ctx) throws IOException {
400 return destination.asyncAddTopicMessage(ctx, message);
401 }
402
403 });
404 return AbstractMessageStore.FUTURE;
405 }
406 } else {
407 return destination.asyncAddTopicMessage(context, message);
408 }
409 }
410
411 /**
412 * @param ack
413 * @throws IOException
414 */
415 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
416 throws IOException {
417
418 if (ack.isInTransaction()) {
419 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
420 destination.removeMessage(context, ack);
421 } else {
422 Tx tx = getTx(ack.getTransactionId());
423 tx.add(new RemoveMessageCommand(context) {
424 @Override
425 public MessageAck getMessageAck() {
426 return ack;
427 }
428
429 @Override
430 public Future<Object> run(ConnectionContext ctx) throws IOException {
431 destination.removeMessage(ctx, ack);
432 return AbstractMessageStore.FUTURE;
433 }
434 });
435 }
436 } else {
437 destination.removeMessage(context, ack);
438 }
439 }
440
441 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
442 throws IOException {
443
444 if (ack.isInTransaction()) {
445 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
446 destination.removeAsyncMessage(context, ack);
447 } else {
448 Tx tx = getTx(ack.getTransactionId());
449 tx.add(new RemoveMessageCommand(context) {
450 @Override
451 public MessageAck getMessageAck() {
452 return ack;
453 }
454
455 @Override
456 public Future<Object> run(ConnectionContext ctx) throws IOException {
457 destination.removeMessage(ctx, ack);
458 return AbstractMessageStore.FUTURE;
459 }
460 });
461 }
462 } else {
463 destination.removeAsyncMessage(context, ack);
464 }
465 }
466
467 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
468 final MessageId messageId, final MessageAck ack) throws IOException {
469
470 if (ack.isInTransaction()) {
471 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
472 destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
473 } else {
474 Tx tx = getTx(ack.getTransactionId());
475 tx.add(new RemoveMessageCommand(context) {
476 public MessageAck getMessageAck() {
477 return ack;
478 }
479
480 public Future<Object> run(ConnectionContext ctx) throws IOException {
481 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
482 return AbstractMessageStore.FUTURE;
483 }
484 });
485 }
486 } else {
487 destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
488 }
489 }
490
491
492 private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
493 return theStore.createTransactionInfo(txid);
494 }
495
496 }