001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInputStream;
020 import java.io.IOException;
021 import java.io.InterruptedIOException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.Iterator;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.Map.Entry;
031 import java.util.concurrent.ExecutorService;
032 import java.util.concurrent.Future;
033 import java.util.concurrent.FutureTask;
034 import java.util.concurrent.LinkedBlockingQueue;
035 import java.util.concurrent.Semaphore;
036 import java.util.concurrent.ThreadFactory;
037 import java.util.concurrent.ThreadPoolExecutor;
038 import java.util.concurrent.TimeUnit;
039 import java.util.concurrent.atomic.AtomicBoolean;
040 import java.util.concurrent.atomic.AtomicInteger;
041 import org.apache.activemq.broker.ConnectionContext;
042 import org.apache.activemq.command.ActiveMQDestination;
043 import org.apache.activemq.command.ActiveMQQueue;
044 import org.apache.activemq.command.ActiveMQTempQueue;
045 import org.apache.activemq.command.ActiveMQTempTopic;
046 import org.apache.activemq.command.ActiveMQTopic;
047 import org.apache.activemq.command.LocalTransactionId;
048 import org.apache.activemq.command.Message;
049 import org.apache.activemq.command.MessageAck;
050 import org.apache.activemq.command.MessageId;
051 import org.apache.activemq.command.ProducerId;
052 import org.apache.activemq.command.SubscriptionInfo;
053 import org.apache.activemq.command.TransactionId;
054 import org.apache.activemq.command.XATransactionId;
055 import org.apache.activemq.filter.BooleanExpression;
056 import org.apache.activemq.filter.MessageEvaluationContext;
057 import org.apache.activemq.openwire.OpenWireFormat;
058 import org.apache.activemq.protobuf.Buffer;
059 import org.apache.activemq.selector.SelectorParser;
060 import org.apache.activemq.store.AbstractMessageStore;
061 import org.apache.activemq.store.MessageRecoveryListener;
062 import org.apache.activemq.store.MessageStore;
063 import org.apache.activemq.store.PersistenceAdapter;
064 import org.apache.activemq.store.TopicMessageStore;
065 import org.apache.activemq.store.TransactionStore;
066 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
067 import org.apache.activemq.store.kahadb.data.KahaDestination;
068 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
069 import org.apache.activemq.store.kahadb.data.KahaLocation;
070 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
071 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
072 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
073 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
074 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
075 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
076 import org.apache.activemq.usage.MemoryUsage;
077 import org.apache.activemq.usage.SystemUsage;
078 import org.apache.activemq.util.IOExceptionSupport;
079 import org.apache.activemq.util.ServiceStopper;
080 import org.apache.activemq.wireformat.WireFormat;
081 import org.slf4j.Logger;
082 import org.slf4j.LoggerFactory;
083 import org.apache.kahadb.journal.Location;
084 import org.apache.kahadb.page.Transaction;
085
086 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
087 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
088 private static final int MAX_ASYNC_JOBS = 10000;
089
090 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
091 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
092 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
093 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
094 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
095 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
096
097 protected ExecutorService queueExecutor;
098 protected ExecutorService topicExecutor;
099 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
100 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
101 final WireFormat wireFormat = new OpenWireFormat();
102 private SystemUsage usageManager;
103 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
104 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
105 Semaphore globalQueueSemaphore;
106 Semaphore globalTopicSemaphore;
107 private boolean concurrentStoreAndDispatchQueues = true;
108 // when true, message order may be compromised when cache is exhausted if store is out
109 // or order w.r.t cache
110 private boolean concurrentStoreAndDispatchTopics = false;
111 private boolean concurrentStoreAndDispatchTransactions = false;
112 private int maxAsyncJobs = MAX_ASYNC_JOBS;
113 private final KahaDBTransactionStore transactionStore;
114
115 public KahaDBStore() {
116 this.transactionStore = new KahaDBTransactionStore(this);
117 }
118
119 public void setBrokerName(String brokerName) {
120 }
121
122 public void setUsageManager(SystemUsage usageManager) {
123 this.usageManager = usageManager;
124 }
125
126 public SystemUsage getUsageManager() {
127 return this.usageManager;
128 }
129
130 /**
131 * @return the concurrentStoreAndDispatch
132 */
133 public boolean isConcurrentStoreAndDispatchQueues() {
134 return this.concurrentStoreAndDispatchQueues;
135 }
136
137 /**
138 * @param concurrentStoreAndDispatch
139 * the concurrentStoreAndDispatch to set
140 */
141 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143 }
144
145 /**
146 * @return the concurrentStoreAndDispatch
147 */
148 public boolean isConcurrentStoreAndDispatchTopics() {
149 return this.concurrentStoreAndDispatchTopics;
150 }
151
152 /**
153 * @param concurrentStoreAndDispatch
154 * the concurrentStoreAndDispatch to set
155 */
156 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158 }
159
160 public boolean isConcurrentStoreAndDispatchTransactions() {
161 return this.concurrentStoreAndDispatchTransactions;
162 }
163
164 /**
165 * @return the maxAsyncJobs
166 */
167 public int getMaxAsyncJobs() {
168 return this.maxAsyncJobs;
169 }
170 /**
171 * @param maxAsyncJobs
172 * the maxAsyncJobs to set
173 */
174 public void setMaxAsyncJobs(int maxAsyncJobs) {
175 this.maxAsyncJobs = maxAsyncJobs;
176 }
177
178 @Override
179 public void doStart() throws Exception {
180 super.doStart();
181 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185 this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186 asyncQueueJobQueue, new ThreadFactory() {
187 public Thread newThread(Runnable runnable) {
188 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189 thread.setDaemon(true);
190 return thread;
191 }
192 });
193 this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194 asyncTopicJobQueue, new ThreadFactory() {
195 public Thread newThread(Runnable runnable) {
196 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197 thread.setDaemon(true);
198 return thread;
199 }
200 });
201 }
202
203 @Override
204 public void doStop(ServiceStopper stopper) throws Exception {
205 // drain down async jobs
206 LOG.info("Stopping async queue tasks");
207 if (this.globalQueueSemaphore != null) {
208 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209 }
210 synchronized (this.asyncQueueMaps) {
211 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212 synchronized (m) {
213 for (StoreTask task : m.values()) {
214 task.cancel();
215 }
216 }
217 }
218 this.asyncQueueMaps.clear();
219 }
220 LOG.info("Stopping async topic tasks");
221 if (this.globalTopicSemaphore != null) {
222 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223 }
224 synchronized (this.asyncTopicMaps) {
225 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226 synchronized (m) {
227 for (StoreTask task : m.values()) {
228 task.cancel();
229 }
230 }
231 }
232 this.asyncTopicMaps.clear();
233 }
234 if (this.globalQueueSemaphore != null) {
235 this.globalQueueSemaphore.drainPermits();
236 }
237 if (this.globalTopicSemaphore != null) {
238 this.globalTopicSemaphore.drainPermits();
239 }
240 if (this.queueExecutor != null) {
241 this.queueExecutor.shutdownNow();
242 }
243 if (this.topicExecutor != null) {
244 this.topicExecutor.shutdownNow();
245 }
246 LOG.info("Stopped KahaDB");
247 super.doStop(stopper);
248 }
249
250 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
251 StoreQueueTask task = null;
252 synchronized (store.asyncTaskMap) {
253 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
254 }
255 return task;
256 }
257
258 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
259 synchronized (store.asyncTaskMap) {
260 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
261 }
262 this.queueExecutor.execute(task);
263 }
264
265 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
266 StoreTopicTask task = null;
267 synchronized (store.asyncTaskMap) {
268 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
269 }
270 return task;
271 }
272
273 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
274 synchronized (store.asyncTaskMap) {
275 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
276 }
277 this.topicExecutor.execute(task);
278 }
279
280 public TransactionStore createTransactionStore() throws IOException {
281 return this.transactionStore;
282 }
283
284 public boolean getForceRecoverIndex() {
285 return this.forceRecoverIndex;
286 }
287
288 public void setForceRecoverIndex(boolean forceRecoverIndex) {
289 this.forceRecoverIndex = forceRecoverIndex;
290 }
291
292 public class KahaDBMessageStore extends AbstractMessageStore {
293 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
294 protected KahaDestination dest;
295 private final int maxAsyncJobs;
296 private final Semaphore localDestinationSemaphore;
297
298 double doneTasks, canceledTasks = 0;
299
300 public KahaDBMessageStore(ActiveMQDestination destination) {
301 super(destination);
302 this.dest = convert(destination);
303 this.maxAsyncJobs = getMaxAsyncJobs();
304 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
305 }
306
307 @Override
308 public ActiveMQDestination getDestination() {
309 return destination;
310 }
311
312 @Override
313 public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
314 throws IOException {
315 if (isConcurrentStoreAndDispatchQueues()) {
316 StoreQueueTask result = new StoreQueueTask(this, context, message);
317 result.aquireLocks();
318 addQueueTask(this, result);
319 return result.getFuture();
320 } else {
321 return super.asyncAddQueueMessage(context, message);
322 }
323 }
324
325 @Override
326 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
327 if (isConcurrentStoreAndDispatchQueues()) {
328 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
329 StoreQueueTask task = null;
330 synchronized (asyncTaskMap) {
331 task = (StoreQueueTask) asyncTaskMap.get(key);
332 }
333 if (task != null) {
334 if (!task.cancel()) {
335 try {
336
337 task.future.get();
338 } catch (InterruptedException e) {
339 throw new InterruptedIOException(e.toString());
340 } catch (Exception ignored) {
341 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
342 }
343 removeMessage(context, ack);
344 } else {
345 synchronized (asyncTaskMap) {
346 asyncTaskMap.remove(key);
347 }
348 }
349 } else {
350 removeMessage(context, ack);
351 }
352 } else {
353 removeMessage(context, ack);
354 }
355 }
356
357 public void addMessage(ConnectionContext context, Message message) throws IOException {
358 KahaAddMessageCommand command = new KahaAddMessageCommand();
359 command.setDestination(dest);
360 command.setMessageId(message.getMessageId().toString());
361 command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
362 command.setPriority(message.getPriority());
363 command.setPrioritySupported(isPrioritizedMessages());
364 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
365 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
366 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
367
368 }
369
370 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
371 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
372 command.setDestination(dest);
373 command.setMessageId(ack.getLastMessageId().toString());
374 command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
375
376 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
377 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
378 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
379 }
380
381 public void removeAllMessages(ConnectionContext context) throws IOException {
382 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
383 command.setDestination(dest);
384 store(command, true, null, null);
385 }
386
387 public Message getMessage(MessageId identity) throws IOException {
388 final String key = identity.toString();
389
390 // Hopefully one day the page file supports concurrent read
391 // operations... but for now we must
392 // externally synchronize...
393 Location location;
394 indexLock.readLock().lock();
395 try {
396 location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
397 public Location execute(Transaction tx) throws IOException {
398 StoredDestination sd = getStoredDestination(dest, tx);
399 Long sequence = sd.messageIdIndex.get(tx, key);
400 if (sequence == null) {
401 return null;
402 }
403 return sd.orderIndex.get(tx, sequence).location;
404 }
405 });
406 }finally {
407 indexLock.readLock().unlock();
408 }
409 if (location == null) {
410 return null;
411 }
412
413 return loadMessage(location);
414 }
415
416 public int getMessageCount() throws IOException {
417 try {
418 lockAsyncJobQueue();
419 indexLock.readLock().lock();
420 try {
421 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
422 public Integer execute(Transaction tx) throws IOException {
423 // Iterate through all index entries to get a count
424 // of
425 // messages in the destination.
426 StoredDestination sd = getStoredDestination(dest, tx);
427 int rc = 0;
428 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
429 .hasNext();) {
430 iterator.next();
431 rc++;
432 }
433 return rc;
434 }
435 });
436 }finally {
437 indexLock.readLock().unlock();
438 }
439 } finally {
440 unlockAsyncJobQueue();
441 }
442 }
443
444 @Override
445 public boolean isEmpty() throws IOException {
446 indexLock.readLock().lock();
447 try {
448 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
449 public Boolean execute(Transaction tx) throws IOException {
450 // Iterate through all index entries to get a count of
451 // messages in the destination.
452 StoredDestination sd = getStoredDestination(dest, tx);
453 return sd.locationIndex.isEmpty(tx);
454 }
455 });
456 }finally {
457 indexLock.readLock().unlock();
458 }
459 }
460
461 public void recover(final MessageRecoveryListener listener) throws Exception {
462 indexLock.readLock().lock();
463 try {
464 pageFile.tx().execute(new Transaction.Closure<Exception>() {
465 public void execute(Transaction tx) throws Exception {
466 StoredDestination sd = getStoredDestination(dest, tx);
467 sd.orderIndex.resetCursorPosition();
468 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
469 .hasNext();) {
470 Entry<Long, MessageKeys> entry = iterator.next();
471 Message msg = loadMessage(entry.getValue().location);
472 listener.recoverMessage(msg);
473 }
474 }
475 });
476 }finally {
477 indexLock.readLock().unlock();
478 }
479 }
480
481
482 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
483 indexLock.readLock().lock();
484 try {
485 pageFile.tx().execute(new Transaction.Closure<Exception>() {
486 public void execute(Transaction tx) throws Exception {
487 StoredDestination sd = getStoredDestination(dest, tx);
488 Entry<Long, MessageKeys> entry = null;
489 int counter = 0;
490 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
491 listener.hasSpace() && iterator.hasNext(); ) {
492 entry = iterator.next();
493 Message msg = loadMessage(entry.getValue().location);
494 listener.recoverMessage(msg);
495 counter++;
496 if (counter >= maxReturned) {
497 break;
498 }
499 }
500 sd.orderIndex.stoppedIterating();
501 }
502 });
503 }finally {
504 indexLock.readLock().unlock();
505 }
506 }
507
508 public void resetBatching() {
509 try {
510 pageFile.tx().execute(new Transaction.Closure<Exception>() {
511 public void execute(Transaction tx) throws Exception {
512 StoredDestination sd = getExistingStoredDestination(dest, tx);
513 if (sd != null) {
514 sd.orderIndex.resetCursorPosition();}
515 }
516 });
517 } catch (Exception e) {
518 LOG.error("Failed to reset batching",e);
519 }
520 }
521
522 @Override
523 public void setBatch(MessageId identity) throws IOException {
524 try {
525 final String key = identity.toString();
526 lockAsyncJobQueue();
527
528 // Hopefully one day the page file supports concurrent read
529 // operations... but for now we must
530 // externally synchronize...
531
532 indexLock.writeLock().lock();
533 try {
534 pageFile.tx().execute(new Transaction.Closure<IOException>() {
535 public void execute(Transaction tx) throws IOException {
536 StoredDestination sd = getStoredDestination(dest, tx);
537 Long location = sd.messageIdIndex.get(tx, key);
538 if (location != null) {
539 sd.orderIndex.setBatch(tx, location);
540 }
541 }
542 });
543 }finally {
544 indexLock.writeLock().unlock();
545 }
546
547 } finally {
548 unlockAsyncJobQueue();
549 }
550
551 }
552
553 @Override
554 public void setMemoryUsage(MemoryUsage memoeyUSage) {
555 }
556 @Override
557 public void start() throws Exception {
558 super.start();
559 }
560 @Override
561 public void stop() throws Exception {
562 super.stop();
563 }
564
565 protected void lockAsyncJobQueue() {
566 try {
567 this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
568 } catch (Exception e) {
569 LOG.error("Failed to lock async jobs for " + this.destination, e);
570 }
571 }
572
573 protected void unlockAsyncJobQueue() {
574 this.localDestinationSemaphore.release(this.maxAsyncJobs);
575 }
576
577 protected void acquireLocalAsyncLock() {
578 try {
579 this.localDestinationSemaphore.acquire();
580 } catch (InterruptedException e) {
581 LOG.error("Failed to aquire async lock for " + this.destination, e);
582 }
583 }
584
585 protected void releaseLocalAsyncLock() {
586 this.localDestinationSemaphore.release();
587 }
588
589 }
590
591 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
592 private final AtomicInteger subscriptionCount = new AtomicInteger();
593 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
594 super(destination);
595 this.subscriptionCount.set(getAllSubscriptions().length);
596 asyncTopicMaps.add(asyncTaskMap);
597 }
598
599 @Override
600 public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
601 throws IOException {
602 if (isConcurrentStoreAndDispatchTopics()) {
603 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
604 result.aquireLocks();
605 addTopicTask(this, result);
606 return result.getFuture();
607 } else {
608 return super.asyncAddTopicMessage(context, message);
609 }
610 }
611
612 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
613 MessageId messageId, MessageAck ack)
614 throws IOException {
615 String subscriptionKey = subscriptionKey(clientId, subscriptionName);
616 if (isConcurrentStoreAndDispatchTopics()) {
617 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
618 StoreTopicTask task = null;
619 synchronized (asyncTaskMap) {
620 task = (StoreTopicTask) asyncTaskMap.get(key);
621 }
622 if (task != null) {
623 if (task.addSubscriptionKey(subscriptionKey)) {
624 removeTopicTask(this, messageId);
625 if (task.cancel()) {
626 synchronized (asyncTaskMap) {
627 asyncTaskMap.remove(key);
628 }
629 }
630 }
631 } else {
632 doAcknowledge(context, subscriptionKey, messageId, ack);
633 }
634 } else {
635 doAcknowledge(context, subscriptionKey, messageId, ack);
636 }
637 }
638
639 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
640 throws IOException {
641 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
642 command.setDestination(dest);
643 command.setSubscriptionKey(subscriptionKey);
644 command.setMessageId(messageId.toString());
645 command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
646 if (ack != null && ack.isUnmatchedAck()) {
647 command.setAck(UNMATCHED);
648 }
649 store(command, false, null, null);
650 }
651
652 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
653 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
654 .getSubscriptionName());
655 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
656 command.setDestination(dest);
657 command.setSubscriptionKey(subscriptionKey);
658 command.setRetroactive(retroactive);
659 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
660 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
661 store(command, isEnableJournalDiskSyncs() && true, null, null);
662 this.subscriptionCount.incrementAndGet();
663 }
664
665 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
666 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
667 command.setDestination(dest);
668 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
669 store(command, isEnableJournalDiskSyncs() && true, null, null);
670 this.subscriptionCount.decrementAndGet();
671 }
672
673 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
674
675 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
676 indexLock.readLock().lock();
677 try {
678 pageFile.tx().execute(new Transaction.Closure<IOException>() {
679 public void execute(Transaction tx) throws IOException {
680 StoredDestination sd = getStoredDestination(dest, tx);
681 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
682 .hasNext();) {
683 Entry<String, KahaSubscriptionCommand> entry = iterator.next();
684 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
685 .getValue().getSubscriptionInfo().newInput()));
686 subscriptions.add(info);
687
688 }
689 }
690 });
691 }finally {
692 indexLock.readLock().unlock();
693 }
694
695 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
696 subscriptions.toArray(rc);
697 return rc;
698 }
699
700 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
701 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
702 indexLock.readLock().lock();
703 try {
704 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
705 public SubscriptionInfo execute(Transaction tx) throws IOException {
706 StoredDestination sd = getStoredDestination(dest, tx);
707 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
708 if (command == null) {
709 return null;
710 }
711 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
712 .getSubscriptionInfo().newInput()));
713 }
714 });
715 }finally {
716 indexLock.readLock().unlock();
717 }
718 }
719
720 public int getMessageCount(String clientId, String subscriptionName) throws IOException {
721 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
722 indexLock.writeLock().lock();
723 try {
724 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
725 public Integer execute(Transaction tx) throws IOException {
726 StoredDestination sd = getStoredDestination(dest, tx);
727 LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
728 if (cursorPos == null) {
729 // The subscription might not exist.
730 return 0;
731 }
732
733 int counter = 0;
734 for (Iterator<Entry<Long, HashSet<String>>> iterator =
735 sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
736 Entry<Long, HashSet<String>> entry = iterator.next();
737 if (entry.getValue().contains(subscriptionKey)) {
738 counter++;
739 }
740 }
741 return counter;
742 }
743 });
744 }finally {
745 indexLock.writeLock().unlock();
746 }
747 }
748
749 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
750 throws Exception {
751 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
752 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
753 indexLock.writeLock().lock();
754 try {
755 pageFile.tx().execute(new Transaction.Closure<Exception>() {
756 public void execute(Transaction tx) throws Exception {
757 StoredDestination sd = getStoredDestination(dest, tx);
758 LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
759 sd.orderIndex.setBatch(tx, cursorPos);
760 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
761 .hasNext();) {
762 Entry<Long, MessageKeys> entry = iterator.next();
763 listener.recoverMessage(loadMessage(entry.getValue().location));
764 }
765 sd.orderIndex.resetCursorPosition();
766 }
767 });
768 }finally {
769 indexLock.writeLock().unlock();
770 }
771 }
772
773 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
774 final MessageRecoveryListener listener) throws Exception {
775 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
776 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
777 indexLock.writeLock().lock();
778 try {
779 pageFile.tx().execute(new Transaction.Closure<Exception>() {
780 public void execute(Transaction tx) throws Exception {
781 StoredDestination sd = getStoredDestination(dest, tx);
782 sd.orderIndex.resetCursorPosition();
783 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
784 if (moc == null) {
785 LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
786 if (pos == null) {
787 // sub deleted
788 return;
789 }
790 sd.orderIndex.setBatch(tx, pos);
791 moc = sd.orderIndex.cursor;
792 } else {
793 sd.orderIndex.cursor.sync(moc);
794 }
795
796 Entry<Long, MessageKeys> entry = null;
797 int counter = 0;
798 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
799 .hasNext();) {
800 entry = iterator.next();
801 if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
802 counter++;
803 }
804 if (counter >= maxReturned || listener.hasSpace() == false) {
805 break;
806 }
807 }
808 sd.orderIndex.stoppedIterating();
809 if (entry != null) {
810 MessageOrderCursor copy = sd.orderIndex.cursor.copy();
811 sd.subscriptionCursors.put(subscriptionKey, copy);
812 }
813 }
814 });
815 }finally {
816 indexLock.writeLock().unlock();
817 }
818 }
819
820 public void resetBatching(String clientId, String subscriptionName) {
821 try {
822 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
823 indexLock.writeLock().lock();
824 try {
825 pageFile.tx().execute(new Transaction.Closure<IOException>() {
826 public void execute(Transaction tx) throws IOException {
827 StoredDestination sd = getStoredDestination(dest, tx);
828 sd.subscriptionCursors.remove(subscriptionKey);
829 }
830 });
831 }finally {
832 indexLock.writeLock().unlock();
833 }
834 } catch (IOException e) {
835 throw new RuntimeException(e);
836 }
837 }
838 }
839
840 String subscriptionKey(String clientId, String subscriptionName) {
841 return clientId + ":" + subscriptionName;
842 }
843
844 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
845 return this.transactionStore.proxy(new KahaDBMessageStore(destination));
846 }
847
848 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
849 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
850 }
851
852 /**
853 * Cleanup method to remove any state associated with the given destination.
854 * This method does not stop the message store (it might not be cached).
855 *
856 * @param destination
857 * Destination to forget
858 */
859 public void removeQueueMessageStore(ActiveMQQueue destination) {
860 }
861
862 /**
863 * Cleanup method to remove any state associated with the given destination
864 * This method does not stop the message store (it might not be cached).
865 *
866 * @param destination
867 * Destination to forget
868 */
869 public void removeTopicMessageStore(ActiveMQTopic destination) {
870 }
871
872 public void deleteAllMessages() throws IOException {
873 deleteAllMessages = true;
874 }
875
876 public Set<ActiveMQDestination> getDestinations() {
877 try {
878 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
879 indexLock.readLock().lock();
880 try {
881 pageFile.tx().execute(new Transaction.Closure<IOException>() {
882 public void execute(Transaction tx) throws IOException {
883 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
884 .hasNext();) {
885 Entry<String, StoredDestination> entry = iterator.next();
886 if (!isEmptyTopic(entry, tx)) {
887 rc.add(convert(entry.getKey()));
888 }
889 }
890 }
891
892 private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
893 throws IOException {
894 boolean isEmptyTopic = false;
895 ActiveMQDestination dest = convert(entry.getKey());
896 if (dest.isTopic()) {
897 StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
898 if (loadedStore.subscriptionAcks.isEmpty(tx)) {
899 isEmptyTopic = true;
900 }
901 }
902 return isEmptyTopic;
903 }
904 });
905 }finally {
906 indexLock.readLock().unlock();
907 }
908 return rc;
909 } catch (IOException e) {
910 throw new RuntimeException(e);
911 }
912 }
913
914 public long getLastMessageBrokerSequenceId() throws IOException {
915 return 0;
916 }
917
918 public long getLastProducerSequenceId(ProducerId id) {
919 indexLock.readLock().lock();
920 try {
921 return metadata.producerSequenceIdTracker.getLastSeqId(id);
922 } finally {
923 indexLock.readLock().unlock();
924 }
925 }
926
927 public long size() {
928 return storeSize.get();
929 }
930
931 public void beginTransaction(ConnectionContext context) throws IOException {
932 throw new IOException("Not yet implemented.");
933 }
934 public void commitTransaction(ConnectionContext context) throws IOException {
935 throw new IOException("Not yet implemented.");
936 }
937 public void rollbackTransaction(ConnectionContext context) throws IOException {
938 throw new IOException("Not yet implemented.");
939 }
940
941 public void checkpoint(boolean sync) throws IOException {
942 super.checkpointCleanup(false);
943 }
944
945 // /////////////////////////////////////////////////////////////////
946 // Internal helper methods.
947 // /////////////////////////////////////////////////////////////////
948
949 /**
950 * @param location
951 * @return
952 * @throws IOException
953 */
954 Message loadMessage(Location location) throws IOException {
955 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
956 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
957 return msg;
958 }
959
960 // /////////////////////////////////////////////////////////////////
961 // Internal conversion methods.
962 // /////////////////////////////////////////////////////////////////
963
964 KahaLocation convert(Location location) {
965 KahaLocation rc = new KahaLocation();
966 rc.setLogId(location.getDataFileId());
967 rc.setOffset(location.getOffset());
968 return rc;
969 }
970
971 KahaDestination convert(ActiveMQDestination dest) {
972 KahaDestination rc = new KahaDestination();
973 rc.setName(dest.getPhysicalName());
974 switch (dest.getDestinationType()) {
975 case ActiveMQDestination.QUEUE_TYPE:
976 rc.setType(DestinationType.QUEUE);
977 return rc;
978 case ActiveMQDestination.TOPIC_TYPE:
979 rc.setType(DestinationType.TOPIC);
980 return rc;
981 case ActiveMQDestination.TEMP_QUEUE_TYPE:
982 rc.setType(DestinationType.TEMP_QUEUE);
983 return rc;
984 case ActiveMQDestination.TEMP_TOPIC_TYPE:
985 rc.setType(DestinationType.TEMP_TOPIC);
986 return rc;
987 default:
988 return null;
989 }
990 }
991
992 ActiveMQDestination convert(String dest) {
993 int p = dest.indexOf(":");
994 if (p < 0) {
995 throw new IllegalArgumentException("Not in the valid destination format");
996 }
997 int type = Integer.parseInt(dest.substring(0, p));
998 String name = dest.substring(p + 1);
999
1000 switch (KahaDestination.DestinationType.valueOf(type)) {
1001 case QUEUE:
1002 return new ActiveMQQueue(name);
1003 case TOPIC:
1004 return new ActiveMQTopic(name);
1005 case TEMP_QUEUE:
1006 return new ActiveMQTempQueue(name);
1007 case TEMP_TOPIC:
1008 return new ActiveMQTempTopic(name);
1009 default:
1010 throw new IllegalArgumentException("Not in the valid destination format");
1011 }
1012 }
1013
1014 static class AsyncJobKey {
1015 MessageId id;
1016 ActiveMQDestination destination;
1017
1018 AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1019 this.id = id;
1020 this.destination = destination;
1021 }
1022
1023 @Override
1024 public boolean equals(Object obj) {
1025 if (obj == this) {
1026 return true;
1027 }
1028 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1029 && destination.equals(((AsyncJobKey) obj).destination);
1030 }
1031
1032 @Override
1033 public int hashCode() {
1034 return id.hashCode() + destination.hashCode();
1035 }
1036
1037 @Override
1038 public String toString() {
1039 return destination.getPhysicalName() + "-" + id;
1040 }
1041 }
1042
1043 interface StoreTask {
1044 public boolean cancel();
1045 }
1046
1047 class StoreQueueTask implements Runnable, StoreTask {
1048 protected final Message message;
1049 protected final ConnectionContext context;
1050 protected final KahaDBMessageStore store;
1051 protected final InnerFutureTask future;
1052 protected final AtomicBoolean done = new AtomicBoolean();
1053 protected final AtomicBoolean locked = new AtomicBoolean();
1054
1055 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1056 this.store = store;
1057 this.context = context;
1058 this.message = message;
1059 this.future = new InnerFutureTask(this);
1060 }
1061
1062 public Future<Object> getFuture() {
1063 return this.future;
1064 }
1065
1066 public boolean cancel() {
1067 releaseLocks();
1068 if (this.done.compareAndSet(false, true)) {
1069 return this.future.cancel(false);
1070 }
1071 return false;
1072 }
1073
1074 void aquireLocks() {
1075 if (this.locked.compareAndSet(false, true)) {
1076 try {
1077 globalQueueSemaphore.acquire();
1078 store.acquireLocalAsyncLock();
1079 message.incrementReferenceCount();
1080 } catch (InterruptedException e) {
1081 LOG.warn("Failed to aquire lock", e);
1082 }
1083 }
1084
1085 }
1086
1087 void releaseLocks() {
1088 if (this.locked.compareAndSet(true, false)) {
1089 store.releaseLocalAsyncLock();
1090 globalQueueSemaphore.release();
1091 message.decrementReferenceCount();
1092 }
1093 }
1094
1095 public void run() {
1096 this.store.doneTasks++;
1097 try {
1098 if (this.done.compareAndSet(false, true)) {
1099 this.store.addMessage(context, message);
1100 removeQueueTask(this.store, this.message.getMessageId());
1101 this.future.complete();
1102 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1103 System.err.println(this.store.dest.getName() + " cancelled: "
1104 + (this.store.canceledTasks / this.store.doneTasks) * 100);
1105 this.store.canceledTasks = this.store.doneTasks = 0;
1106 }
1107 } catch (Exception e) {
1108 this.future.setException(e);
1109 } finally {
1110 releaseLocks();
1111 }
1112 }
1113
1114 protected Message getMessage() {
1115 return this.message;
1116 }
1117
1118 private class InnerFutureTask extends FutureTask<Object> {
1119
1120 public InnerFutureTask(Runnable runnable) {
1121 super(runnable, null);
1122
1123 }
1124
1125 public void setException(final Exception e) {
1126 super.setException(e);
1127 }
1128
1129 public void complete() {
1130 super.set(null);
1131 }
1132 }
1133 }
1134
1135 class StoreTopicTask extends StoreQueueTask {
1136 private final int subscriptionCount;
1137 private final List<String> subscriptionKeys = new ArrayList<String>(1);
1138 private final KahaDBTopicMessageStore topicStore;
1139 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1140 int subscriptionCount) {
1141 super(store, context, message);
1142 this.topicStore = store;
1143 this.subscriptionCount = subscriptionCount;
1144
1145 }
1146
1147 @Override
1148 void aquireLocks() {
1149 if (this.locked.compareAndSet(false, true)) {
1150 try {
1151 globalTopicSemaphore.acquire();
1152 store.acquireLocalAsyncLock();
1153 message.incrementReferenceCount();
1154 } catch (InterruptedException e) {
1155 LOG.warn("Failed to aquire lock", e);
1156 }
1157 }
1158
1159 }
1160
1161 @Override
1162 void releaseLocks() {
1163 if (this.locked.compareAndSet(true, false)) {
1164 message.decrementReferenceCount();
1165 store.releaseLocalAsyncLock();
1166 globalTopicSemaphore.release();
1167 }
1168 }
1169
1170 /**
1171 * add a key
1172 *
1173 * @param key
1174 * @return true if all acknowledgements received
1175 */
1176 public boolean addSubscriptionKey(String key) {
1177 synchronized (this.subscriptionKeys) {
1178 this.subscriptionKeys.add(key);
1179 }
1180 return this.subscriptionKeys.size() >= this.subscriptionCount;
1181 }
1182
1183 @Override
1184 public void run() {
1185 this.store.doneTasks++;
1186 try {
1187 if (this.done.compareAndSet(false, true)) {
1188 this.topicStore.addMessage(context, message);
1189 // apply any acks we have
1190 synchronized (this.subscriptionKeys) {
1191 for (String key : this.subscriptionKeys) {
1192 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1193
1194 }
1195 }
1196 removeTopicTask(this.topicStore, this.message.getMessageId());
1197 this.future.complete();
1198 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1199 System.err.println(this.store.dest.getName() + " cancelled: "
1200 + (this.store.canceledTasks / this.store.doneTasks) * 100);
1201 this.store.canceledTasks = this.store.doneTasks = 0;
1202 }
1203 } catch (Exception e) {
1204 this.future.setException(e);
1205 } finally {
1206 releaseLocks();
1207 }
1208 }
1209 }
1210 }