001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.AbstractList;
021 import java.util.ArrayList;
022 import java.util.Collection;
023 import java.util.Collections;
024 import java.util.Comparator;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.LinkedHashMap;
028 import java.util.LinkedList;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.concurrent.CancellationException;
033 import java.util.concurrent.CopyOnWriteArraySet;
034 import java.util.concurrent.CountDownLatch;
035 import java.util.concurrent.DelayQueue;
036 import java.util.concurrent.Delayed;
037 import java.util.concurrent.ExecutorService;
038 import java.util.concurrent.Future;
039 import java.util.concurrent.TimeUnit;
040 import java.util.concurrent.atomic.AtomicLong;
041 import java.util.concurrent.locks.Lock;
042 import java.util.concurrent.locks.ReentrantLock;
043 import java.util.concurrent.locks.ReentrantReadWriteLock;
044 import javax.jms.InvalidSelectorException;
045 import javax.jms.JMSException;
046 import javax.jms.ResourceAllocationException;
047 import org.apache.activemq.broker.BrokerService;
048 import org.apache.activemq.broker.ConnectionContext;
049 import org.apache.activemq.broker.ProducerBrokerExchange;
050 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
051 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
052 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
053 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
054 import org.apache.activemq.broker.region.group.MessageGroupMap;
055 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
056 import org.apache.activemq.broker.region.policy.DispatchPolicy;
057 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
058 import org.apache.activemq.command.*;
059 import org.apache.activemq.filter.BooleanExpression;
060 import org.apache.activemq.filter.MessageEvaluationContext;
061 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
062 import org.apache.activemq.security.SecurityContext;
063 import org.apache.activemq.selector.SelectorParser;
064 import org.apache.activemq.store.MessageRecoveryListener;
065 import org.apache.activemq.store.MessageStore;
066 import org.apache.activemq.thread.Scheduler;
067 import org.apache.activemq.thread.Task;
068 import org.apache.activemq.thread.TaskRunner;
069 import org.apache.activemq.thread.TaskRunnerFactory;
070 import org.apache.activemq.transaction.Synchronization;
071 import org.apache.activemq.usage.Usage;
072 import org.apache.activemq.usage.UsageListener;
073 import org.apache.activemq.util.BrokerSupport;
074 import org.slf4j.Logger;
075 import org.slf4j.LoggerFactory;
076 import org.slf4j.MDC;
077
078 /**
079 * The Queue is a List of MessageEntry objects that are dispatched to matching
080 * subscriptions.
081 *
082 *
083 */
084 public class Queue extends BaseDestination implements Task, UsageListener {
085 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
086 protected final TaskRunnerFactory taskFactory;
087 protected TaskRunner taskRunner;
088 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
089 protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
090 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
091 protected PendingMessageCursor messages;
092 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
093 private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
094 // Messages that are paged in but have not yet been targeted at a
095 // subscription
096 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
097 private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
098 private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
099 private MessageGroupMap messageGroupOwners;
100 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
101 private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
102 final Lock sendLock = new ReentrantLock();
103 private ExecutorService executor;
104 protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
105 .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
106 private boolean useConsumerPriority = true;
107 private boolean strictOrderDispatch = false;
108 private final QueueDispatchSelector dispatchSelector;
109 private boolean optimizedDispatch = false;
110 private boolean firstConsumer = false;
111 private int timeBeforeDispatchStarts = 0;
112 private int consumersBeforeDispatchStarts = 0;
113 private CountDownLatch consumersBeforeStartsLatch;
114 private final AtomicLong pendingWakeups = new AtomicLong();
115 private boolean allConsumersExclusiveByDefault = false;
116
117 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
118 public void run() {
119 asyncWakeup();
120 }
121 };
122 private final Runnable expireMessagesTask = new Runnable() {
123 public void run() {
124 expireMessages();
125 }
126 };
127
128 private final Object iteratingMutex = new Object() {
129 };
130 private final Scheduler scheduler;
131
132 class TimeoutMessage implements Delayed {
133
134 Message message;
135 ConnectionContext context;
136 long trigger;
137
138 public TimeoutMessage(Message message, ConnectionContext context, long delay) {
139 this.message = message;
140 this.context = context;
141 this.trigger = System.currentTimeMillis() + delay;
142 }
143
144 public long getDelay(TimeUnit unit) {
145 long n = trigger - System.currentTimeMillis();
146 return unit.convert(n, TimeUnit.MILLISECONDS);
147 }
148
149 public int compareTo(Delayed delayed) {
150 long other = ((TimeoutMessage) delayed).trigger;
151 int returnValue;
152 if (this.trigger < other) {
153 returnValue = -1;
154 } else if (this.trigger > other) {
155 returnValue = 1;
156 } else {
157 returnValue = 0;
158 }
159 return returnValue;
160 }
161
162 }
163
164 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
165
166 class FlowControlTimeoutTask extends Thread {
167
168 @Override
169 public void run() {
170 TimeoutMessage timeout;
171 try {
172 while (true) {
173 timeout = flowControlTimeoutMessages.take();
174 if (timeout != null) {
175 synchronized (messagesWaitingForSpace) {
176 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
177 ExceptionResponse response = new ExceptionResponse(
178 new ResourceAllocationException(
179 "Usage Manager Memory Limit reached. Stopping producer ("
180 + timeout.message.getProducerId()
181 + ") to prevent flooding "
182 + getActiveMQDestination().getQualifiedName()
183 + "."
184 + " See http://activemq.apache.org/producer-flow-control.html for more info"));
185 response.setCorrelationId(timeout.message.getCommandId());
186 timeout.context.getConnection().dispatchAsync(response);
187 }
188 }
189 }
190 }
191 } catch (InterruptedException e) {
192 if (LOG.isDebugEnabled()) {
193 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
194 }
195 }
196 }
197 };
198
199 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
200
201 private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
202
203 public int compare(Subscription s1, Subscription s2) {
204 // We want the list sorted in descending order
205 return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
206 }
207 };
208
209 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
210 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
211 super(brokerService, store, destination, parentStats);
212 this.taskFactory = taskFactory;
213 this.dispatchSelector = new QueueDispatchSelector(destination);
214 this.scheduler = brokerService.getBroker().getScheduler();
215 }
216
217 public List<Subscription> getConsumers() {
218 consumersLock.readLock().lock();
219 try {
220 return new ArrayList<Subscription>(consumers);
221 }finally {
222 consumersLock.readLock().unlock();
223 }
224 }
225
226 // make the queue easily visible in the debugger from its task runner
227 // threads
228 final class QueueThread extends Thread {
229 final Queue queue;
230
231 public QueueThread(Runnable runnable, String name, Queue queue) {
232 super(runnable, name);
233 this.queue = queue;
234 }
235 }
236
237 @Override
238 public void initialize() throws Exception {
239 if (this.messages == null) {
240 if (destination.isTemporary() || broker == null || store == null) {
241 this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
242 } else {
243 this.messages = new StoreQueueCursor(broker, this);
244 }
245 }
246 // If a VMPendingMessageCursor don't use the default Producer System
247 // Usage
248 // since it turns into a shared blocking queue which can lead to a
249 // network deadlock.
250 // If we are cursoring to disk..it's not and issue because it does not
251 // block due
252 // to large disk sizes.
253 if (messages instanceof VMPendingMessageCursor) {
254 this.systemUsage = brokerService.getSystemUsage();
255 memoryUsage.setParent(systemUsage.getMemoryUsage());
256 }
257
258 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
259
260 super.initialize();
261 if (store != null) {
262 // Restore the persistent messages.
263 messages.setSystemUsage(systemUsage);
264 messages.setEnableAudit(isEnableAudit());
265 messages.setMaxAuditDepth(getMaxAuditDepth());
266 messages.setMaxProducersToAudit(getMaxProducersToAudit());
267 messages.setUseCache(isUseCache());
268 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
269 if (messages.isRecoveryRequired()) {
270 store.recover(new MessageRecoveryListener() {
271 double totalMessageCount = store.getMessageCount();
272 int recoveredMessageCount = 0;
273
274 public boolean recoverMessage(Message message) {
275 // Message could have expired while it was being
276 // loaded..
277 if ((++recoveredMessageCount % 50000) == 0) {
278 LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
279 + recoveredMessageCount + " messages. " +
280 (int)(recoveredMessageCount*100/totalMessageCount) + "% complete");
281 }
282 if (message.isExpired()) {
283 if (broker.isExpired(message)) {
284 messageExpired(createConnectionContext(), createMessageReference(message));
285 // drop message will decrement so counter
286 // balance here
287 destinationStatistics.getMessages().increment();
288 }
289 return true;
290 }
291 if (hasSpace()) {
292 message.setRegionDestination(Queue.this);
293 messagesLock.writeLock().lock();
294 try{
295 try {
296 messages.addMessageLast(message);
297 } catch (Exception e) {
298 LOG.error("Failed to add message to cursor", e);
299 }
300 }finally {
301 messagesLock.writeLock().unlock();
302 }
303 destinationStatistics.getMessages().increment();
304 return true;
305 }
306 return false;
307 }
308
309 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
310 throw new RuntimeException("Should not be called.");
311 }
312
313 public boolean hasSpace() {
314 return true;
315 }
316
317 public boolean isDuplicate(MessageId id) {
318 return false;
319 }
320 });
321 } else {
322 int messageCount = store.getMessageCount();
323 destinationStatistics.getMessages().setCount(messageCount);
324 }
325 }
326 }
327
328 /*
329 * Holder for subscription that needs attention on next iterate browser
330 * needs access to existing messages in the queue that have already been
331 * dispatched
332 */
333 class BrowserDispatch {
334 QueueBrowserSubscription browser;
335
336 public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
337 browser = browserSubscription;
338 browser.incrementQueueRef();
339 }
340
341 void done() {
342 try {
343 browser.decrementQueueRef();
344 } catch (Exception e) {
345 LOG.warn("decrement ref on browser: " + browser, e);
346 }
347 }
348
349 public QueueBrowserSubscription getBrowser() {
350 return browser;
351 }
352 }
353
354 LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
355
356 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
357 if (LOG.isDebugEnabled()) {
358 LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
359 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
360 + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
361 + getDestinationStatistics().getInflight().getCount());
362 }
363
364 super.addSubscription(context, sub);
365 // synchronize with dispatch method so that no new messages are sent
366 // while setting up a subscription. avoid out of order messages,
367 // duplicates, etc.
368 pagedInPendingDispatchLock.writeLock().lock();
369 try {
370
371 sub.add(context, this);
372
373 // needs to be synchronized - so no contention with dispatching
374 // consumersLock.
375 consumersLock.writeLock().lock();
376 try {
377
378 // set a flag if this is a first consumer
379 if (consumers.size() == 0) {
380 firstConsumer = true;
381 if (consumersBeforeDispatchStarts != 0) {
382 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
383 }
384 } else {
385 if (consumersBeforeStartsLatch != null) {
386 consumersBeforeStartsLatch.countDown();
387 }
388 }
389
390 addToConsumerList(sub);
391 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
392 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
393 if (exclusiveConsumer == null) {
394 exclusiveConsumer = sub;
395 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
396 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
397 exclusiveConsumer = sub;
398 }
399 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
400 }
401 }finally {
402 consumersLock.writeLock().unlock();
403 }
404
405 if (sub instanceof QueueBrowserSubscription) {
406 // tee up for dispatch in next iterate
407 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
408 pagedInMessagesLock.readLock().lock();
409 try{
410 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
411 browserDispatches.addLast(browserDispatch);
412 }finally {
413 pagedInMessagesLock.readLock().unlock();
414 }
415 }
416
417 if (!(this.optimizedDispatch || isSlave())) {
418 wakeup();
419 }
420 }finally {
421 pagedInPendingDispatchLock.writeLock().unlock();
422 }
423 if (this.optimizedDispatch || isSlave()) {
424 // Outside of dispatchLock() to maintain the lock hierarchy of
425 // iteratingMutex -> dispatchLock. - see
426 // https://issues.apache.org/activemq/browse/AMQ-1878
427 wakeup();
428 }
429 }
430
431 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
432 throws Exception {
433 super.removeSubscription(context, sub, lastDeiveredSequenceId);
434 // synchronize with dispatch method so that no new messages are sent
435 // while removing up a subscription.
436 pagedInPendingDispatchLock.writeLock().lock();
437 try {
438 if (LOG.isDebugEnabled()) {
439 LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
440 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
441 + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
442 + getDestinationStatistics().getInflight().getCount());
443 }
444 consumersLock.writeLock().lock();
445 try {
446 removeFromConsumerList(sub);
447 if (sub.getConsumerInfo().isExclusive()) {
448 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
449 if (exclusiveConsumer == sub) {
450 exclusiveConsumer = null;
451 for (Subscription s : consumers) {
452 if (s.getConsumerInfo().isExclusive()
453 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
454 .getConsumerInfo().getPriority())) {
455 exclusiveConsumer = s;
456
457 }
458 }
459 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
460 }
461 } else if (isAllConsumersExclusiveByDefault()) {
462 Subscription exclusiveConsumer = null;
463 for (Subscription s : consumers) {
464 if (exclusiveConsumer == null
465 || s.getConsumerInfo().getPriority() > exclusiveConsumer
466 .getConsumerInfo().getPriority()) {
467 exclusiveConsumer = s;
468 }
469 }
470 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
471 }
472 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
473 getMessageGroupOwners().removeConsumer(consumerId);
474
475 // redeliver inflight messages
476
477 boolean markAsRedelivered = false;
478 MessageReference lastDeliveredRef = null;
479 List<MessageReference> unAckedMessages = sub.remove(context, this);
480
481 // locate last redelivered in unconsumed list (list in delivery rather than seq order)
482 if (lastDeiveredSequenceId != 0) {
483 for (MessageReference ref : unAckedMessages) {
484 if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
485 lastDeliveredRef = ref;
486 markAsRedelivered = true;
487 LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
488 break;
489 }
490 }
491 }
492 for (MessageReference ref : unAckedMessages) {
493 QueueMessageReference qmr = (QueueMessageReference) ref;
494 if (qmr.getLockOwner() == sub) {
495 qmr.unlock();
496
497 // have no delivery information
498 if (lastDeiveredSequenceId == 0) {
499 qmr.incrementRedeliveryCounter();
500 } else {
501 if (markAsRedelivered) {
502 qmr.incrementRedeliveryCounter();
503 }
504 if (ref == lastDeliveredRef) {
505 // all that follow were not redelivered
506 markAsRedelivered = false;
507 }
508 }
509 }
510 redeliveredWaitingDispatch.add(qmr);
511 }
512 if (!redeliveredWaitingDispatch.isEmpty()) {
513 doDispatch(new ArrayList<QueueMessageReference>());
514 }
515 }finally {
516 consumersLock.writeLock().unlock();
517 }
518 if (!(this.optimizedDispatch || isSlave())) {
519 wakeup();
520 }
521 }finally {
522 pagedInPendingDispatchLock.writeLock().unlock();
523 }
524 if (this.optimizedDispatch || isSlave()) {
525 // Outside of dispatchLock() to maintain the lock hierarchy of
526 // iteratingMutex -> dispatchLock. - see
527 // https://issues.apache.org/activemq/browse/AMQ-1878
528 wakeup();
529 }
530 }
531
532 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
533 final ConnectionContext context = producerExchange.getConnectionContext();
534 // There is delay between the client sending it and it arriving at the
535 // destination.. it may have expired.
536 message.setRegionDestination(this);
537 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
538 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
539 && !context.isInRecoveryMode();
540 if (message.isExpired()) {
541 // message not stored - or added to stats yet - so chuck here
542 broker.getRoot().messageExpired(context, message, null);
543 if (sendProducerAck) {
544 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
545 context.getConnection().dispatchAsync(ack);
546 }
547 return;
548 }
549 if (memoryUsage.isFull()) {
550 isFull(context, memoryUsage);
551 fastProducer(context, producerInfo);
552 if (isProducerFlowControl() && context.isProducerFlowControl()) {
553 if (warnOnProducerFlowControl) {
554 warnOnProducerFlowControl = false;
555 LOG
556 .info("Usage Manager Memory Limit ("
557 + memoryUsage.getLimit()
558 + ") reached on "
559 + getActiveMQDestination().getQualifiedName()
560 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
561 + " See http://activemq.apache.org/producer-flow-control.html for more info");
562 }
563
564 if (systemUsage.isSendFailIfNoSpace()) {
565 throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
566 + message.getProducerId() + ") to prevent flooding "
567 + getActiveMQDestination().getQualifiedName() + "."
568 + " See http://activemq.apache.org/producer-flow-control.html for more info");
569 }
570
571 // We can avoid blocking due to low usage if the producer is
572 // sending
573 // a sync message or if it is using a producer window
574 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
575 // copy the exchange state since the context will be
576 // modified while we are waiting
577 // for space.
578 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
579 synchronized (messagesWaitingForSpace) {
580 // Start flow control timeout task
581 // Prevent trying to start it multiple times
582 if (!flowControlTimeoutTask.isAlive()) {
583 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
584 flowControlTimeoutTask.start();
585 }
586 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
587 public void run() {
588
589 try {
590 // While waiting for space to free up... the
591 // message may have expired.
592 if (message.isExpired()) {
593 LOG.error("expired waiting for space..");
594 broker.messageExpired(context, message, null);
595 destinationStatistics.getExpired().increment();
596 } else {
597 doMessageSend(producerExchangeCopy, message);
598 }
599
600 if (sendProducerAck) {
601 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
602 .getSize());
603 context.getConnection().dispatchAsync(ack);
604 } else {
605 Response response = new Response();
606 response.setCorrelationId(message.getCommandId());
607 context.getConnection().dispatchAsync(response);
608 }
609
610 } catch (Exception e) {
611 if (!sendProducerAck && !context.isInRecoveryMode()) {
612 ExceptionResponse response = new ExceptionResponse(e);
613 response.setCorrelationId(message.getCommandId());
614 context.getConnection().dispatchAsync(response);
615 } else {
616 LOG.debug("unexpected exception on deferred send of :" + message, e);
617 }
618 }
619 }
620 });
621
622 if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
623 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
624 .getSendFailIfNoSpaceAfterTimeout()));
625 }
626
627 registerCallbackForNotFullNotification();
628 context.setDontSendReponse(true);
629 return;
630 }
631
632 } else {
633
634 if (memoryUsage.isFull()) {
635 waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
636 + message.getProducerId() + ") stopped to prevent flooding "
637 + getActiveMQDestination().getQualifiedName() + "."
638 + " See http://activemq.apache.org/producer-flow-control.html for more info");
639 }
640
641 // The usage manager could have delayed us by the time
642 // we unblock the message could have expired..
643 if (message.isExpired()) {
644 if (LOG.isDebugEnabled()) {
645 LOG.debug("Expired message: " + message);
646 }
647 broker.getRoot().messageExpired(context, message, null);
648 return;
649 }
650 }
651 }
652 }
653 doMessageSend(producerExchange, message);
654 if (sendProducerAck) {
655 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
656 context.getConnection().dispatchAsync(ack);
657 }
658 }
659
660 private void registerCallbackForNotFullNotification() {
661 // If the usage manager is not full, then the task will not
662 // get called..
663 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
664 // so call it directly here.
665 sendMessagesWaitingForSpaceTask.run();
666 }
667 }
668
669 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
670 Exception {
671 final ConnectionContext context = producerExchange.getConnectionContext();
672 Future<Object> result = null;
673
674 checkUsage(context, message);
675 sendLock.lockInterruptibly();
676 try {
677 if (store != null && message.isPersistent()) {
678 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
679 if (messages.isCacheEnabled()) {
680 result = store.asyncAddQueueMessage(context, message);
681 } else {
682 store.addMessage(context, message);
683 }
684 if (isReduceMemoryFootprint()) {
685 message.clearMarshalledState();
686 }
687 }
688 if (context.isInTransaction()) {
689 // If this is a transacted message.. increase the usage now so that
690 // a big TX does not blow up
691 // our memory. This increment is decremented once the tx finishes..
692 message.incrementReferenceCount();
693
694 context.getTransaction().addSynchronization(new Synchronization() {
695 @Override
696 public void afterCommit() throws Exception {
697 sendLock.lockInterruptibly();
698 try {
699 // It could take while before we receive the commit
700 // op, by that time the message could have expired..
701 if (broker.isExpired(message)) {
702 broker.messageExpired(context, message, null);
703 destinationStatistics.getExpired().increment();
704 return;
705 }
706 sendMessage(message);
707 } finally {
708 sendLock.unlock();
709 message.decrementReferenceCount();
710 }
711 messageSent(context, message);
712 }
713 @Override
714 public void afterRollback() throws Exception {
715 message.decrementReferenceCount();
716 }
717 });
718 } else {
719 // Add to the pending list, this takes care of incrementing the
720 // usage manager.
721 sendMessage(message);
722 }
723 } finally {
724 sendLock.unlock();
725 }
726 if (!context.isInTransaction()) {
727 messageSent(context, message);
728 }
729 if (result != null && !result.isCancelled()) {
730 try {
731 result.get();
732 } catch (CancellationException e) {
733 // ignore - the task has been cancelled if the message
734 // has already been deleted
735 }
736 }
737 }
738
739 private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
740 if (message.isPersistent()) {
741 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
742 final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
743 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
744 + message.getProducerId() + ") to prevent flooding "
745 + getActiveMQDestination().getQualifiedName() + "."
746 + " See http://activemq.apache.org/producer-flow-control.html for more info";
747
748 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
749 }
750 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
751 final String logMessage = "Usage Manager Temp Store is Full ("
752 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
753 +"). Stopping producer (" + message.getProducerId()
754 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
755 + " See http://activemq.apache.org/producer-flow-control.html for more info";
756
757 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
758 }
759 }
760
761 private void expireMessages() {
762 if (LOG.isDebugEnabled()) {
763 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages ..");
764 }
765
766 // just track the insertion count
767 List<Message> browsedMessages = new AbstractList<Message>() {
768 int size = 0;
769
770 @Override
771 public void add(int index, Message element) {
772 size++;
773 }
774
775 @Override
776 public int size() {
777 return size;
778 }
779
780 @Override
781 public Message get(int index) {
782 return null;
783 }
784 };
785 doBrowse(browsedMessages, this.getMaxExpirePageSize());
786 asyncWakeup();
787 if (LOG.isDebugEnabled()) {
788 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done.");
789 }
790 }
791
792 public void gc() {
793 }
794
795 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
796 throws IOException {
797 messageConsumed(context, node);
798 if (store != null && node.isPersistent()) {
799 // the original ack may be a ranged ack, but we are trying to delete
800 // a specific
801 // message store here so we need to convert to a non ranged ack.
802 if (ack.getMessageCount() > 0) {
803 // Dup the ack
804 MessageAck a = new MessageAck();
805 ack.copy(a);
806 ack = a;
807 // Convert to non-ranged.
808 ack.setFirstMessageId(node.getMessageId());
809 ack.setLastMessageId(node.getMessageId());
810 ack.setMessageCount(1);
811 }
812
813 store.removeAsyncMessage(context, ack);
814 }
815 }
816
817 Message loadMessage(MessageId messageId) throws IOException {
818 Message msg = null;
819 if (store != null) { // can be null for a temp q
820 msg = store.getMessage(messageId);
821 if (msg != null) {
822 msg.setRegionDestination(this);
823 }
824 }
825 return msg;
826 }
827
828 @Override
829 public String toString() {
830 int size = 0;
831 messagesLock.readLock().lock();
832 try{
833 size = messages.size();
834 }finally {
835 messagesLock.readLock().unlock();
836 }
837 return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
838 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
839 + messageGroupOwners;
840 }
841
842 public void start() throws Exception {
843 if (memoryUsage != null) {
844 memoryUsage.start();
845 }
846 if (systemUsage.getStoreUsage() != null) {
847 systemUsage.getStoreUsage().start();
848 }
849 systemUsage.getMemoryUsage().addUsageListener(this);
850 messages.start();
851 if (getExpireMessagesPeriod() > 0) {
852 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
853 }
854 doPageIn(false);
855 }
856
857 public void stop() throws Exception {
858 if (taskRunner != null) {
859 taskRunner.shutdown();
860 }
861 if (this.executor != null) {
862 this.executor.shutdownNow();
863 }
864
865 scheduler.cancel(expireMessagesTask);
866
867 if (flowControlTimeoutTask.isAlive()) {
868 flowControlTimeoutTask.interrupt();
869 }
870
871 if (messages != null) {
872 messages.stop();
873 }
874
875 systemUsage.getMemoryUsage().removeUsageListener(this);
876 if (memoryUsage != null) {
877 memoryUsage.stop();
878 }
879 if (store != null) {
880 store.stop();
881 }
882 }
883
884 // Properties
885 // -------------------------------------------------------------------------
886 @Override
887 public ActiveMQDestination getActiveMQDestination() {
888 return destination;
889 }
890
891 public MessageGroupMap getMessageGroupOwners() {
892 if (messageGroupOwners == null) {
893 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
894 }
895 return messageGroupOwners;
896 }
897
898 public DispatchPolicy getDispatchPolicy() {
899 return dispatchPolicy;
900 }
901
902 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
903 this.dispatchPolicy = dispatchPolicy;
904 }
905
906 public MessageGroupMapFactory getMessageGroupMapFactory() {
907 return messageGroupMapFactory;
908 }
909
910 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
911 this.messageGroupMapFactory = messageGroupMapFactory;
912 }
913
914 public PendingMessageCursor getMessages() {
915 return this.messages;
916 }
917
918 public void setMessages(PendingMessageCursor messages) {
919 this.messages = messages;
920 }
921
922 public boolean isUseConsumerPriority() {
923 return useConsumerPriority;
924 }
925
926 public void setUseConsumerPriority(boolean useConsumerPriority) {
927 this.useConsumerPriority = useConsumerPriority;
928 }
929
930 public boolean isStrictOrderDispatch() {
931 return strictOrderDispatch;
932 }
933
934 public void setStrictOrderDispatch(boolean strictOrderDispatch) {
935 this.strictOrderDispatch = strictOrderDispatch;
936 }
937
938 public boolean isOptimizedDispatch() {
939 return optimizedDispatch;
940 }
941
942 public void setOptimizedDispatch(boolean optimizedDispatch) {
943 this.optimizedDispatch = optimizedDispatch;
944 }
945
946 public int getTimeBeforeDispatchStarts() {
947 return timeBeforeDispatchStarts;
948 }
949
950 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
951 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
952 }
953
954 public int getConsumersBeforeDispatchStarts() {
955 return consumersBeforeDispatchStarts;
956 }
957
958 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
959 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
960 }
961
962 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
963 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
964 }
965
966 public boolean isAllConsumersExclusiveByDefault() {
967 return allConsumersExclusiveByDefault;
968 }
969
970
971 // Implementation methods
972 // -------------------------------------------------------------------------
973 private QueueMessageReference createMessageReference(Message message) {
974 QueueMessageReference result = new IndirectMessageReference(message);
975 return result;
976 }
977
978 public Message[] browse() {
979 List<Message> browseList = new ArrayList<Message>();
980 doBrowse(browseList, getMaxBrowsePageSize());
981 return browseList.toArray(new Message[browseList.size()]);
982 }
983
984 public void doBrowse(List<Message> browseList, int max) {
985 final ConnectionContext connectionContext = createConnectionContext();
986 try {
987 pageInMessages(false);
988 List<MessageReference> toExpire = new ArrayList<MessageReference>();
989
990 pagedInPendingDispatchLock.writeLock().lock();
991 try {
992 addAll(pagedInPendingDispatch, browseList, max, toExpire);
993 for (MessageReference ref : toExpire) {
994 pagedInPendingDispatch.remove(ref);
995 if (broker.isExpired(ref)) {
996 LOG.debug("expiring from pagedInPending: " + ref);
997 messageExpired(connectionContext, ref);
998 }
999 }
1000 } finally {
1001 pagedInPendingDispatchLock.writeLock().unlock();
1002 }
1003 toExpire.clear();
1004 pagedInMessagesLock.readLock().lock();
1005 try {
1006 addAll(pagedInMessages.values(), browseList, max, toExpire);
1007 } finally {
1008 pagedInMessagesLock.readLock().unlock();
1009 }
1010 for (MessageReference ref : toExpire) {
1011 if (broker.isExpired(ref)) {
1012 LOG.debug("expiring from pagedInMessages: " + ref);
1013 messageExpired(connectionContext, ref);
1014 } else {
1015 pagedInMessagesLock.writeLock().lock();
1016 try {
1017 pagedInMessages.remove(ref.getMessageId());
1018 } finally {
1019 pagedInMessagesLock.writeLock().unlock();
1020 }
1021 }
1022 }
1023
1024 if (browseList.size() < getMaxBrowsePageSize()) {
1025 messagesLock.writeLock().lock();
1026 try {
1027 try {
1028 messages.reset();
1029 while (messages.hasNext() && browseList.size() < max) {
1030 MessageReference node = messages.next();
1031 if (node.isExpired()) {
1032 if (broker.isExpired(node)) {
1033 LOG.debug("expiring from messages: " + node);
1034 messageExpired(connectionContext, createMessageReference(node.getMessage()));
1035 }
1036 messages.remove();
1037 } else {
1038 messages.rollback(node.getMessageId());
1039 if (browseList.contains(node.getMessage()) == false) {
1040 browseList.add(node.getMessage());
1041 }
1042 }
1043 node.decrementReferenceCount();
1044 }
1045 } finally {
1046 messages.release();
1047 }
1048 } finally {
1049 messagesLock.writeLock().unlock();
1050 }
1051 }
1052
1053 } catch (Exception e) {
1054 LOG.error("Problem retrieving message for browse", e);
1055 }
1056 }
1057
1058 private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize,
1059 List<MessageReference> toExpire) throws Exception {
1060 for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
1061 QueueMessageReference ref = i.next();
1062 if (ref.isExpired()) {
1063 toExpire.add(ref);
1064 } else if (l.contains(ref.getMessage()) == false) {
1065 l.add(ref.getMessage());
1066 }
1067 }
1068 }
1069
1070 public QueueMessageReference getMessage(String id) {
1071 MessageId msgId = new MessageId(id);
1072 pagedInMessagesLock.readLock().lock();
1073 try{
1074 QueueMessageReference ref = this.pagedInMessages.get(msgId);
1075 if (ref != null) {
1076 return ref;
1077 }
1078 }finally {
1079 pagedInMessagesLock.readLock().unlock();
1080 }
1081 messagesLock.readLock().lock();
1082 try{
1083 try {
1084 messages.reset();
1085 while (messages.hasNext()) {
1086 MessageReference mr = messages.next();
1087 QueueMessageReference qmr = createMessageReference(mr.getMessage());
1088 qmr.decrementReferenceCount();
1089 messages.rollback(qmr.getMessageId());
1090 if (msgId.equals(qmr.getMessageId())) {
1091 return qmr;
1092 }
1093 }
1094 } finally {
1095 messages.release();
1096 }
1097 }finally {
1098 messagesLock.readLock().unlock();
1099 }
1100 return null;
1101 }
1102
1103 public void purge() throws Exception {
1104 ConnectionContext c = createConnectionContext();
1105 List<MessageReference> list = null;
1106 do {
1107 doPageIn(true);
1108 pagedInMessagesLock.readLock().lock();
1109 try {
1110 list = new ArrayList<MessageReference>(pagedInMessages.values());
1111 }finally {
1112 pagedInMessagesLock.readLock().unlock();
1113 }
1114
1115 for (MessageReference ref : list) {
1116 try {
1117 QueueMessageReference r = (QueueMessageReference) ref;
1118 removeMessage(c, r);
1119 } catch (IOException e) {
1120 }
1121 }
1122 // don't spin/hang if stats are out and there is nothing left in the
1123 // store
1124 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1125 if (this.destinationStatistics.getMessages().getCount() > 0) {
1126 LOG.warn(getActiveMQDestination().getQualifiedName()
1127 + " after purge complete, message count stats report: "
1128 + this.destinationStatistics.getMessages().getCount());
1129 }
1130 gc();
1131 this.destinationStatistics.getMessages().setCount(0);
1132 getMessages().clear();
1133 }
1134
1135 /**
1136 * Removes the message matching the given messageId
1137 */
1138 public boolean removeMessage(String messageId) throws Exception {
1139 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1140 }
1141
1142 /**
1143 * Removes the messages matching the given selector
1144 *
1145 * @return the number of messages removed
1146 */
1147 public int removeMatchingMessages(String selector) throws Exception {
1148 return removeMatchingMessages(selector, -1);
1149 }
1150
1151 /**
1152 * Removes the messages matching the given selector up to the maximum number
1153 * of matched messages
1154 *
1155 * @return the number of messages removed
1156 */
1157 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1158 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1159 }
1160
1161 /**
1162 * Removes the messages matching the given filter up to the maximum number
1163 * of matched messages
1164 *
1165 * @return the number of messages removed
1166 */
1167 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1168 int movedCounter = 0;
1169 Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
1170 ConnectionContext context = createConnectionContext();
1171 do {
1172 doPageIn(true);
1173 pagedInMessagesLock.readLock().lock();
1174 try{
1175 set.addAll(pagedInMessages.values());
1176 }finally {
1177 pagedInMessagesLock.readLock().unlock();
1178 }
1179 List<MessageReference> list = new ArrayList<MessageReference>(set);
1180 for (MessageReference ref : list) {
1181 IndirectMessageReference r = (IndirectMessageReference) ref;
1182 if (filter.evaluate(context, r)) {
1183
1184 removeMessage(context, r);
1185 set.remove(r);
1186 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1187 return movedCounter;
1188 }
1189 }
1190 }
1191 } while (set.size() < this.destinationStatistics.getMessages().getCount());
1192 return movedCounter;
1193 }
1194
1195 /**
1196 * Copies the message matching the given messageId
1197 */
1198 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1199 throws Exception {
1200 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1201 }
1202
1203 /**
1204 * Copies the messages matching the given selector
1205 *
1206 * @return the number of messages copied
1207 */
1208 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1209 throws Exception {
1210 return copyMatchingMessagesTo(context, selector, dest, -1);
1211 }
1212
1213 /**
1214 * Copies the messages matching the given selector up to the maximum number
1215 * of matched messages
1216 *
1217 * @return the number of messages copied
1218 */
1219 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1220 int maximumMessages) throws Exception {
1221 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1222 }
1223
1224 /**
1225 * Copies the messages matching the given filter up to the maximum number of
1226 * matched messages
1227 *
1228 * @return the number of messages copied
1229 */
1230 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1231 int maximumMessages) throws Exception {
1232 int movedCounter = 0;
1233 int count = 0;
1234 Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
1235 do {
1236 int oldMaxSize = getMaxPageSize();
1237 setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1238 doPageIn(true);
1239 setMaxPageSize(oldMaxSize);
1240 pagedInMessagesLock.readLock().lock();
1241 try {
1242 set.addAll(pagedInMessages.values());
1243 }finally {
1244 pagedInMessagesLock.readLock().unlock();
1245 }
1246 List<MessageReference> list = new ArrayList<MessageReference>(set);
1247 for (MessageReference ref : list) {
1248 IndirectMessageReference r = (IndirectMessageReference) ref;
1249 if (filter.evaluate(context, r)) {
1250
1251 r.incrementReferenceCount();
1252 try {
1253 Message m = r.getMessage();
1254 BrokerSupport.resend(context, m, dest);
1255 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1256 return movedCounter;
1257 }
1258 } finally {
1259 r.decrementReferenceCount();
1260 }
1261 }
1262 count++;
1263 }
1264 } while (count < this.destinationStatistics.getMessages().getCount());
1265 return movedCounter;
1266 }
1267
1268 /**
1269 * Move a message
1270 *
1271 * @param context
1272 * connection context
1273 * @param m
1274 * QueueMessageReference
1275 * @param dest
1276 * ActiveMQDestination
1277 * @throws Exception
1278 */
1279 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1280 BrokerSupport.resend(context, m.getMessage(), dest);
1281 removeMessage(context, m);
1282 messagesLock.writeLock().lock();
1283 try{
1284 messages.rollback(m.getMessageId());
1285 }finally {
1286 messagesLock.writeLock().unlock();
1287 }
1288 return true;
1289 }
1290
1291 /**
1292 * Moves the message matching the given messageId
1293 */
1294 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1295 throws Exception {
1296 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1297 }
1298
1299 /**
1300 * Moves the messages matching the given selector
1301 *
1302 * @return the number of messages removed
1303 */
1304 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1305 throws Exception {
1306 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1307 }
1308
1309 /**
1310 * Moves the messages matching the given selector up to the maximum number
1311 * of matched messages
1312 */
1313 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1314 int maximumMessages) throws Exception {
1315 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1316 }
1317
1318 /**
1319 * Moves the messages matching the given filter up to the maximum number of
1320 * matched messages
1321 */
1322 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1323 ActiveMQDestination dest, int maximumMessages) throws Exception {
1324 int movedCounter = 0;
1325 Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>();
1326 do {
1327 doPageIn(true);
1328 pagedInMessagesLock.readLock().lock();
1329 try{
1330 set.addAll(pagedInMessages.values());
1331 }finally {
1332 pagedInMessagesLock.readLock().unlock();
1333 }
1334 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1335 for (QueueMessageReference ref : list) {
1336 if (filter.evaluate(context, ref)) {
1337 // We should only move messages that can be locked.
1338 moveMessageTo(context, ref, dest);
1339 set.remove(ref);
1340 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1341 return movedCounter;
1342 }
1343 }
1344 }
1345 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1346 return movedCounter;
1347 }
1348
1349 BrowserDispatch getNextBrowserDispatch() {
1350 pagedInMessagesLock.readLock().lock();
1351 try{
1352 if (browserDispatches.isEmpty()) {
1353 return null;
1354 }
1355 return browserDispatches.removeFirst();
1356 }finally {
1357 pagedInMessagesLock.readLock().unlock();
1358 }
1359
1360 }
1361
1362 /**
1363 * @return true if we would like to iterate again
1364 * @see org.apache.activemq.thread.Task#iterate()
1365 */
1366 public boolean iterate() {
1367 MDC.put("activemq.destination", getName());
1368 boolean pageInMoreMessages = false;
1369 synchronized (iteratingMutex) {
1370
1371 // do early to allow dispatch of these waiting messages
1372 synchronized (messagesWaitingForSpace) {
1373 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1374 while (it.hasNext()) {
1375 if (!memoryUsage.isFull()) {
1376 Runnable op = it.next();
1377 it.remove();
1378 op.run();
1379 } else {
1380 registerCallbackForNotFullNotification();
1381 break;
1382 }
1383 }
1384 }
1385
1386 if (firstConsumer) {
1387 firstConsumer = false;
1388 try {
1389 if (consumersBeforeDispatchStarts > 0) {
1390 int timeout = 1000; // wait one second by default if
1391 // consumer count isn't reached
1392 if (timeBeforeDispatchStarts > 0) {
1393 timeout = timeBeforeDispatchStarts;
1394 }
1395 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1396 if (LOG.isDebugEnabled()) {
1397 LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
1398 }
1399 } else {
1400 if (LOG.isDebugEnabled()) {
1401 LOG.debug(timeout + " ms elapsed and " + consumers.size()
1402 + " consumers subscribed. Starting dispatch.");
1403 }
1404 }
1405 }
1406 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1407 iteratingMutex.wait(timeBeforeDispatchStarts);
1408 if (LOG.isDebugEnabled()) {
1409 LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
1410 }
1411 }
1412 } catch (Exception e) {
1413 LOG.error(e.toString());
1414 }
1415 }
1416
1417 BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
1418
1419 messagesLock.readLock().lock();
1420 try{
1421 pageInMoreMessages |= !messages.isEmpty();
1422 }finally {
1423 messagesLock.readLock().unlock();
1424 }
1425
1426 pagedInPendingDispatchLock.readLock().lock();
1427 try {
1428 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
1429 }finally {
1430 pagedInPendingDispatchLock.readLock().unlock();
1431 }
1432
1433 // Perhaps we should page always into the pagedInPendingDispatch
1434 // list if
1435 // !messages.isEmpty(), and then if
1436 // !pagedInPendingDispatch.isEmpty()
1437 // then we do a dispatch.
1438 if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
1439 try {
1440 pageInMessages(pendingBrowserDispatch != null);
1441
1442 } catch (Throwable e) {
1443 LOG.error("Failed to page in more queue messages ", e);
1444 }
1445 }
1446
1447 if (pendingBrowserDispatch != null) {
1448 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
1449 pagedInMessagesLock.readLock().lock();
1450 try{
1451 alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
1452 }finally {
1453 pagedInMessagesLock.readLock().unlock();
1454 }
1455 if (LOG.isDebugEnabled()) {
1456 LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
1457 + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
1458 }
1459 do {
1460 try {
1461 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1462 msgContext.setDestination(destination);
1463
1464 QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
1465 for (QueueMessageReference node : alreadyDispatchedMessages) {
1466 if (!node.isAcked()) {
1467 msgContext.setMessageReference(node);
1468 if (browser.matches(node, msgContext)) {
1469 browser.add(node);
1470 }
1471 }
1472 }
1473 pendingBrowserDispatch.done();
1474 } catch (Exception e) {
1475 LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
1476 }
1477
1478 } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
1479 }
1480
1481 if (pendingWakeups.get() > 0) {
1482 pendingWakeups.decrementAndGet();
1483 }
1484 MDC.remove("activemq.destination");
1485 return pendingWakeups.get() > 0;
1486 }
1487 }
1488
1489 protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1490 return new MessageReferenceFilter() {
1491 public boolean evaluate(ConnectionContext context, MessageReference r) {
1492 return messageId.equals(r.getMessageId().toString());
1493 }
1494
1495 @Override
1496 public String toString() {
1497 return "MessageIdFilter: " + messageId;
1498 }
1499 };
1500 }
1501
1502 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1503 final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1504
1505 return new MessageReferenceFilter() {
1506 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1507 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1508
1509 messageEvaluationContext.setMessageReference(r);
1510 if (messageEvaluationContext.getDestination() == null) {
1511 messageEvaluationContext.setDestination(getActiveMQDestination());
1512 }
1513
1514 return selectorExpression.matches(messageEvaluationContext);
1515 }
1516 };
1517 }
1518
1519 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1520 removeMessage(c, null, r);
1521 pagedInPendingDispatchLock.writeLock().lock();
1522 try {
1523 pagedInPendingDispatch.remove(r);
1524 } finally {
1525 pagedInPendingDispatchLock.writeLock().unlock();
1526 }
1527
1528 }
1529
1530 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1531 MessageAck ack = new MessageAck();
1532 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1533 ack.setDestination(destination);
1534 ack.setMessageID(r.getMessageId());
1535 removeMessage(c, subs, r, ack);
1536 }
1537
1538 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1539 MessageAck ack) throws IOException {
1540 reference.setAcked(true);
1541 // This sends the ack the the journal..
1542 if (!ack.isInTransaction()) {
1543 acknowledge(context, sub, ack, reference);
1544 getDestinationStatistics().getDequeues().increment();
1545 dropMessage(reference);
1546 } else {
1547 try {
1548 acknowledge(context, sub, ack, reference);
1549 } finally {
1550 context.getTransaction().addSynchronization(new Synchronization() {
1551
1552 @Override
1553 public void afterCommit() throws Exception {
1554 getDestinationStatistics().getDequeues().increment();
1555 dropMessage(reference);
1556 wakeup();
1557 }
1558
1559 @Override
1560 public void afterRollback() throws Exception {
1561 reference.setAcked(false);
1562 }
1563 });
1564 }
1565 }
1566 if (ack.isPoisonAck()) {
1567 // message gone to DLQ, is ok to allow redelivery
1568 messagesLock.writeLock().lock();
1569 try{
1570 messages.rollback(reference.getMessageId());
1571 }finally {
1572 messagesLock.writeLock().unlock();
1573 }
1574 }
1575
1576 }
1577
1578 private void dropMessage(QueueMessageReference reference) {
1579 reference.drop();
1580 destinationStatistics.getMessages().decrement();
1581 pagedInMessagesLock.writeLock().lock();
1582 try{
1583 pagedInMessages.remove(reference.getMessageId());
1584 }finally {
1585 pagedInMessagesLock.writeLock().unlock();
1586 }
1587 }
1588
1589 public void messageExpired(ConnectionContext context, MessageReference reference) {
1590 messageExpired(context, null, reference);
1591 }
1592
1593 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1594 if (LOG.isDebugEnabled()) {
1595 LOG.debug("message expired: " + reference);
1596 }
1597 broker.messageExpired(context, reference, subs);
1598 destinationStatistics.getExpired().increment();
1599 try {
1600 removeMessage(context, subs, (QueueMessageReference) reference);
1601 } catch (IOException e) {
1602 LOG.error("Failed to remove expired Message from the store ", e);
1603 }
1604 }
1605
1606 protected ConnectionContext createConnectionContext() {
1607 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
1608 answer.setBroker(this.broker);
1609 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
1610 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1611 return answer;
1612 }
1613
1614 final void sendMessage(final Message msg) throws Exception {
1615 messagesLock.writeLock().lock();
1616 try{
1617 messages.addMessageLast(msg);
1618 }finally {
1619 messagesLock.writeLock().unlock();
1620 }
1621 }
1622
1623 final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1624 destinationStatistics.getEnqueues().increment();
1625 destinationStatistics.getMessages().increment();
1626 messageDelivered(context, msg);
1627 consumersLock.readLock().lock();
1628 try {
1629 if (consumers.isEmpty()) {
1630 onMessageWithNoConsumers(context, msg);
1631 }
1632 }finally {
1633 consumersLock.readLock().unlock();
1634 }
1635 if (LOG.isTraceEnabled()) {
1636 LOG.trace("Message " + msg.getMessageId() + " sent to " + this.destination);
1637 }
1638 wakeup();
1639 }
1640
1641 public void wakeup() {
1642 if (optimizedDispatch || isSlave()) {
1643 iterate();
1644 pendingWakeups.incrementAndGet();
1645 } else {
1646 asyncWakeup();
1647 }
1648 }
1649
1650 private void asyncWakeup() {
1651 try {
1652 pendingWakeups.incrementAndGet();
1653 this.taskRunner.wakeup();
1654 } catch (InterruptedException e) {
1655 LOG.warn("Async task tunner failed to wakeup ", e);
1656 }
1657 }
1658
1659 private boolean isSlave() {
1660 return broker.getBrokerService().isSlave();
1661 }
1662
1663 private void doPageIn(boolean force) throws Exception {
1664 List<QueueMessageReference> newlyPaged = doPageInForDispatch(force);
1665 pagedInPendingDispatchLock.writeLock().lock();
1666 try {
1667 if (pagedInPendingDispatch.isEmpty()) {
1668 pagedInPendingDispatch.addAll(newlyPaged);
1669 } else {
1670 for (QueueMessageReference qmr : newlyPaged) {
1671 if (!pagedInPendingDispatch.contains(qmr)) {
1672 pagedInPendingDispatch.add(qmr);
1673 }
1674 }
1675 }
1676 } finally {
1677 pagedInPendingDispatchLock.writeLock().unlock();
1678 }
1679 }
1680
1681 private List<QueueMessageReference> doPageInForDispatch(boolean force) throws Exception {
1682 List<QueueMessageReference> result = null;
1683 List<QueueMessageReference> resultList = null;
1684
1685 int toPageIn = Math.min(getMaxPageSize(), messages.size());
1686 if (LOG.isDebugEnabled()) {
1687 LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
1688 + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
1689 + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
1690 + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
1691 }
1692
1693 if (isLazyDispatch() && !force) {
1694 // Only page in the minimum number of messages which can be
1695 // dispatched immediately.
1696 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1697 }
1698 int pagedInPendingSize = 0;
1699 pagedInPendingDispatchLock.readLock().lock();
1700 try {
1701 pagedInPendingSize = pagedInPendingDispatch.size();
1702 } finally {
1703 pagedInPendingDispatchLock.readLock().unlock();
1704 }
1705 if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1706 int count = 0;
1707 result = new ArrayList<QueueMessageReference>(toPageIn);
1708 messagesLock.writeLock().lock();
1709 try {
1710 try {
1711 messages.setMaxBatchSize(toPageIn);
1712 messages.reset();
1713 while (messages.hasNext() && count < toPageIn) {
1714 MessageReference node = messages.next();
1715 messages.remove();
1716
1717 QueueMessageReference ref = createMessageReference(node.getMessage());
1718 if (ref.isExpired()) {
1719 if (broker.isExpired(ref)) {
1720 messageExpired(createConnectionContext(), ref);
1721 } else {
1722 ref.decrementReferenceCount();
1723 }
1724 } else {
1725 result.add(ref);
1726 count++;
1727 }
1728 }
1729 } finally {
1730 messages.release();
1731 }
1732 } finally {
1733 messagesLock.writeLock().unlock();
1734 }
1735 // Only add new messages, not already pagedIn to avoid multiple
1736 // dispatch attempts
1737 pagedInMessagesLock.writeLock().lock();
1738 try {
1739 resultList = new ArrayList<QueueMessageReference>(result.size());
1740 for (QueueMessageReference ref : result) {
1741 if (!pagedInMessages.containsKey(ref.getMessageId())) {
1742 pagedInMessages.put(ref.getMessageId(), ref);
1743 resultList.add(ref);
1744 } else {
1745 ref.decrementReferenceCount();
1746 }
1747 }
1748 } finally {
1749 pagedInMessagesLock.writeLock().unlock();
1750 }
1751 } else {
1752 // Avoid return null list, if condition is not validated
1753 resultList = new ArrayList<QueueMessageReference>();
1754 }
1755
1756 return resultList;
1757 }
1758
1759 private void doDispatch(List<QueueMessageReference> list) throws Exception {
1760 boolean doWakeUp = false;
1761
1762 pagedInPendingDispatchLock.writeLock().lock();
1763 try {
1764 if (!redeliveredWaitingDispatch.isEmpty()) {
1765 // Try first to dispatch redelivered messages to keep an
1766 // proper order
1767 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
1768 }
1769 if (!pagedInPendingDispatch.isEmpty()) {
1770 // Next dispatch anything that had not been
1771 // dispatched before.
1772 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
1773 }
1774 // and now see if we can dispatch the new stuff.. and append to
1775 // the pending
1776 // list anything that does not actually get dispatched.
1777 if (list != null && !list.isEmpty()) {
1778 if (pagedInPendingDispatch.isEmpty()) {
1779 pagedInPendingDispatch.addAll(doActualDispatch(list));
1780 } else {
1781 for (QueueMessageReference qmr : list) {
1782 if (!pagedInPendingDispatch.contains(qmr)) {
1783 pagedInPendingDispatch.add(qmr);
1784 }
1785 }
1786 doWakeUp = true;
1787 }
1788 }
1789 } finally {
1790 pagedInPendingDispatchLock.writeLock().unlock();
1791 }
1792
1793 if (doWakeUp) {
1794 // avoid lock order contention
1795 asyncWakeup();
1796 }
1797 }
1798
1799 /**
1800 * @return list of messages that could get dispatched to consumers if they
1801 * were not full.
1802 */
1803 private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
1804 List<Subscription> consumers;
1805 consumersLock.writeLock().lock();
1806 try {
1807 if (this.consumers.isEmpty() || isSlave()) {
1808 // slave dispatch happens in processDispatchNotification
1809 return list;
1810 }
1811 consumers = new ArrayList<Subscription>(this.consumers);
1812 }finally {
1813 consumersLock.writeLock().unlock();
1814 }
1815
1816 List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
1817 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
1818
1819 for (MessageReference node : list) {
1820 Subscription target = null;
1821 int interestCount = 0;
1822 for (Subscription s : consumers) {
1823 if (s instanceof QueueBrowserSubscription) {
1824 interestCount++;
1825 continue;
1826 }
1827 if (dispatchSelector.canSelect(s, node)) {
1828 if (!fullConsumers.contains(s)) {
1829 if (!s.isFull()) {
1830 if (assignMessageGroup(s, (QueueMessageReference)node)) {
1831 // Dispatch it.
1832 s.add(node);
1833 target = s;
1834 break;
1835 }
1836 } else {
1837 // no further dispatch of list to a full consumer to
1838 // avoid out of order message receipt
1839 fullConsumers.add(s);
1840 }
1841 }
1842 interestCount++;
1843 } else {
1844 // makes sure it gets dispatched again
1845 if (!node.isDropped() && !((QueueMessageReference) node).isAcked()
1846 && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
1847 interestCount++;
1848 }
1849 }
1850 }
1851
1852 if ((target == null && interestCount > 0) || consumers.size() == 0) {
1853 // This means all subs were full or that there are no
1854 // consumers...
1855 rc.add((QueueMessageReference) node);
1856 }
1857
1858 // If it got dispatched, rotate the consumer list to get round robin
1859 // distribution.
1860 if (target != null && !strictOrderDispatch && consumers.size() > 1
1861 && !dispatchSelector.isExclusiveConsumer(target)) {
1862 consumersLock.writeLock().lock();
1863 try {
1864 if (removeFromConsumerList(target)) {
1865 addToConsumerList(target);
1866 consumers = new ArrayList<Subscription>(this.consumers);
1867 }
1868 }finally {
1869 consumersLock.writeLock().unlock();
1870 }
1871 }
1872 }
1873
1874 return rc;
1875 }
1876
1877 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
1878 //QueueMessageReference node = (QueueMessageReference) m;
1879 boolean result = true;
1880 // Keep message groups together.
1881 String groupId = node.getGroupID();
1882 int sequence = node.getGroupSequence();
1883 if (groupId != null) {
1884 //MessageGroupMap messageGroupOwners = ((Queue) node
1885 // .getRegionDestination()).getMessageGroupOwners();
1886
1887 MessageGroupMap messageGroupOwners = getMessageGroupOwners();
1888 // If we can own the first, then no-one else should own the
1889 // rest.
1890 if (sequence == 1) {
1891 assignGroup(subscription, messageGroupOwners, node, groupId);
1892 } else {
1893
1894 // Make sure that the previous owner is still valid, we may
1895 // need to become the new owner.
1896 ConsumerId groupOwner;
1897
1898 groupOwner = messageGroupOwners.get(groupId);
1899 if (groupOwner == null) {
1900 assignGroup(subscription, messageGroupOwners, node, groupId);
1901 } else {
1902 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
1903 // A group sequence < 1 is an end of group signal.
1904 if (sequence < 0) {
1905 messageGroupOwners.removeGroup(groupId);
1906 }
1907 } else {
1908 result = false;
1909 }
1910 }
1911 }
1912 }
1913
1914 return result;
1915
1916 }
1917
1918 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
1919 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
1920 Message message = n.getMessage();
1921 if (message instanceof ActiveMQMessage) {
1922 ActiveMQMessage activeMessage = (ActiveMQMessage) message;
1923 try {
1924 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
1925 } catch (JMSException e) {
1926 LOG.warn("Failed to set boolean header: " + e, e);
1927 }
1928 }
1929 }
1930
1931 protected void pageInMessages(boolean force) throws Exception {
1932 doDispatch(doPageInForDispatch(force));
1933 }
1934
1935 private void addToConsumerList(Subscription sub) {
1936 if (useConsumerPriority) {
1937 consumers.add(sub);
1938 Collections.sort(consumers, orderedCompare);
1939 } else {
1940 consumers.add(sub);
1941 }
1942 }
1943
1944 private boolean removeFromConsumerList(Subscription sub) {
1945 return consumers.remove(sub);
1946 }
1947
1948 private int getConsumerMessageCountBeforeFull() throws Exception {
1949 int total = 0;
1950 boolean zeroPrefetch = false;
1951 consumersLock.readLock().lock();
1952 try{
1953 for (Subscription s : consumers) {
1954 zeroPrefetch |= s.getPrefetchSize() == 0;
1955 int countBeforeFull = s.countBeforeFull();
1956 total += countBeforeFull;
1957 }
1958 }finally {
1959 consumersLock.readLock().unlock();
1960 }
1961 if (total == 0 && zeroPrefetch) {
1962 total = 1;
1963 }
1964 return total;
1965 }
1966
1967 /*
1968 * In slave mode, dispatch is ignored till we get this notification as the
1969 * dispatch process is non deterministic between master and slave. On a
1970 * notification, the actual dispatch to the subscription (as chosen by the
1971 * master) is completed. (non-Javadoc)
1972 * @see
1973 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
1974 * (org.apache.activemq.command.MessageDispatchNotification)
1975 */
1976 @Override
1977 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
1978 // do dispatch
1979 Subscription sub = getMatchingSubscription(messageDispatchNotification);
1980 if (sub != null) {
1981 MessageReference message = getMatchingMessage(messageDispatchNotification);
1982 sub.add(message);
1983 sub.processMessageDispatchNotification(messageDispatchNotification);
1984 }
1985 }
1986
1987 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
1988 throws Exception {
1989 QueueMessageReference message = null;
1990 MessageId messageId = messageDispatchNotification.getMessageId();
1991
1992 pagedInPendingDispatchLock.writeLock().lock();
1993 try {
1994 for (QueueMessageReference ref : pagedInPendingDispatch) {
1995 if (messageId.equals(ref.getMessageId())) {
1996 message = ref;
1997 pagedInPendingDispatch.remove(ref);
1998 break;
1999 }
2000 }
2001 } finally {
2002 pagedInPendingDispatchLock.writeLock().unlock();
2003 }
2004
2005 if (message == null) {
2006 pagedInMessagesLock.readLock().lock();
2007 try {
2008 message = pagedInMessages.get(messageId);
2009 } finally {
2010 pagedInMessagesLock.readLock().unlock();
2011 }
2012 }
2013
2014 if (message == null) {
2015 messagesLock.writeLock().lock();
2016 try {
2017 try {
2018 messages.setMaxBatchSize(getMaxPageSize());
2019 messages.reset();
2020 while (messages.hasNext()) {
2021 MessageReference node = messages.next();
2022 messages.remove();
2023 if (messageId.equals(node.getMessageId())) {
2024 message = this.createMessageReference(node.getMessage());
2025 break;
2026 }
2027 }
2028 } finally {
2029 messages.release();
2030 }
2031 } finally {
2032 messagesLock.writeLock().unlock();
2033 }
2034 }
2035
2036 if (message == null) {
2037 Message msg = loadMessage(messageId);
2038 if (msg != null) {
2039 message = this.createMessageReference(msg);
2040 }
2041 }
2042
2043 if (message == null) {
2044 throw new JMSException("Slave broker out of sync with master - Message: "
2045 + messageDispatchNotification.getMessageId() + " on "
2046 + messageDispatchNotification.getDestination() + " does not exist among pending("
2047 + pagedInPendingDispatch.size() + ") for subscription: "
2048 + messageDispatchNotification.getConsumerId());
2049 }
2050 return message;
2051 }
2052
2053 /**
2054 * Find a consumer that matches the id in the message dispatch notification
2055 *
2056 * @param messageDispatchNotification
2057 * @return sub or null if the subscription has been removed before dispatch
2058 * @throws JMSException
2059 */
2060 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2061 throws JMSException {
2062 Subscription sub = null;
2063 consumersLock.readLock().lock();
2064 try {
2065 for (Subscription s : consumers) {
2066 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2067 sub = s;
2068 break;
2069 }
2070 }
2071 }finally {
2072 consumersLock.readLock().unlock();
2073 }
2074 return sub;
2075 }
2076
2077 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
2078 if (oldPercentUsage > newPercentUsage) {
2079 asyncWakeup();
2080 }
2081 }
2082
2083 @Override
2084 protected Logger getLog() {
2085 return LOG;
2086 }
2087 }