001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.concurrent.CopyOnWriteArrayList;
024 import java.util.concurrent.CountDownLatch;
025 import java.util.concurrent.TimeUnit;
026 import javax.jms.InvalidSelectorException;
027 import javax.jms.JMSException;
028 import org.apache.activemq.broker.Broker;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
032 import org.apache.activemq.command.ActiveMQMessage;
033 import org.apache.activemq.command.ConsumerControl;
034 import org.apache.activemq.command.ConsumerInfo;
035 import org.apache.activemq.command.Message;
036 import org.apache.activemq.command.MessageAck;
037 import org.apache.activemq.command.MessageDispatch;
038 import org.apache.activemq.command.MessageDispatchNotification;
039 import org.apache.activemq.command.MessageId;
040 import org.apache.activemq.command.MessagePull;
041 import org.apache.activemq.command.Response;
042 import org.apache.activemq.thread.Scheduler;
043 import org.apache.activemq.transaction.Synchronization;
044 import org.apache.activemq.usage.SystemUsage;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * A subscription that honors the pre-fetch option of the ConsumerInfo.
050 *
051 *
052 */
053 public abstract class PrefetchSubscription extends AbstractSubscription {
054
055 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056 protected final Scheduler scheduler;
057
058 protected PendingMessageCursor pending;
059 protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
060 protected int prefetchExtension;
061 protected boolean usePrefetchExtension = true;
062 protected long enqueueCounter;
063 protected long dispatchCounter;
064 protected long dequeueCounter;
065 private int maxProducersToAudit=32;
066 private int maxAuditDepth=2048;
067 protected final SystemUsage usageManager;
068 protected final Object pendingLock = new Object();
069 private final Object dispatchLock = new Object();
070 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071
072 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073 super(broker,context, info);
074 this.usageManager=usageManager;
075 pending = cursor;
076 this.scheduler = broker.getScheduler();
077 }
078
079 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080 this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081 }
082
083 /**
084 * Allows a message to be pulled on demand by a client
085 */
086 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087 // The slave should not deliver pull messages. TODO: when the slave
088 // becomes a master,
089 // He should send a NULL message to all the consumers to 'wake them up'
090 // in case
091 // they were waiting for a message.
092 if (getPrefetchSize() == 0 && !isSlave()) {
093 final long dispatchCounterBeforePull;
094 synchronized(this) {
095 prefetchExtension++;
096 dispatchCounterBeforePull = dispatchCounter;
097 }
098
099 // Have the destination push us some messages.
100 for (Destination dest : destinations) {
101 dest.iterate();
102 }
103 dispatchPending();
104
105 synchronized(this) {
106 // If there was nothing dispatched.. we may need to setup a timeout.
107 if (dispatchCounterBeforePull == dispatchCounter) {
108 // immediate timeout used by receiveNoWait()
109 if (pull.getTimeout() == -1) {
110 // Send a NULL message.
111 add(QueueMessageReference.NULL_MESSAGE);
112 dispatchPending();
113 }
114 if (pull.getTimeout() > 0) {
115 scheduler.executeAfterDelay(new Runnable() {
116
117 public void run() {
118 pullTimeout(dispatchCounterBeforePull);
119 }
120 }, pull.getTimeout());
121 }
122 }
123 }
124 }
125 return null;
126 }
127
128 /**
129 * Occurs when a pull times out. If nothing has been dispatched since the
130 * timeout was setup, then send the NULL message.
131 */
132 final void pullTimeout(long dispatchCounterBeforePull) {
133 synchronized (pendingLock) {
134 if (dispatchCounterBeforePull == dispatchCounter) {
135 try {
136 add(QueueMessageReference.NULL_MESSAGE);
137 dispatchPending();
138 } catch (Exception e) {
139 context.getConnection().serviceException(e);
140 }
141 }
142 }
143 }
144
145 public void add(MessageReference node) throws Exception {
146 synchronized (pendingLock) {
147 // The destination may have just been removed...
148 if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
149 // perhaps we should inform the caller that we are no longer valid to dispatch to?
150 return;
151 }
152 enqueueCounter++;
153 pending.addMessageLast(node);
154 }
155 dispatchPending();
156 }
157
158 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
159 synchronized(pendingLock) {
160 try {
161 pending.reset();
162 while (pending.hasNext()) {
163 MessageReference node = pending.next();
164 node.decrementReferenceCount();
165 if (node.getMessageId().equals(mdn.getMessageId())) {
166 // Synchronize between dispatched list and removal of messages from pending list
167 // related to remove subscription action
168 synchronized(dispatchLock) {
169 pending.remove();
170 createMessageDispatch(node, node.getMessage());
171 dispatched.add(node);
172 onDispatch(node, node.getMessage());
173 }
174 return;
175 }
176 }
177 } finally {
178 pending.release();
179 }
180 }
181 throw new JMSException(
182 "Slave broker out of sync with master: Dispatched message ("
183 + mdn.getMessageId() + ") was not in the pending list for "
184 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
185 }
186
187 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
188 // Handle the standard acknowledgment case.
189 boolean callDispatchMatched = false;
190 Destination destination = null;
191
192 if (!isSlave()) {
193 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
194 // suppress unexpected ack exception in this expected case
195 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
196 return;
197 }
198 }
199 if (LOG.isTraceEnabled()) {
200 LOG.trace("ack:" + ack);
201 }
202 synchronized(dispatchLock) {
203 if (ack.isStandardAck()) {
204 // First check if the ack matches the dispatched. When using failover this might
205 // not be the case. We don't ever want to ack the wrong messages.
206 assertAckMatchesDispatched(ack);
207
208 // Acknowledge all dispatched messages up till the message id of
209 // the acknowledgment.
210 int index = 0;
211 boolean inAckRange = false;
212 List<MessageReference> removeList = new ArrayList<MessageReference>();
213 for (final MessageReference node : dispatched) {
214 MessageId messageId = node.getMessageId();
215 if (ack.getFirstMessageId() == null
216 || ack.getFirstMessageId().equals(messageId)) {
217 inAckRange = true;
218 }
219 if (inAckRange) {
220 // Don't remove the nodes until we are committed.
221 if (!context.isInTransaction()) {
222 dequeueCounter++;
223 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
224 removeList.add(node);
225 } else {
226 // setup a Synchronization to remove nodes from the
227 // dispatched list.
228 context.getTransaction().addSynchronization(
229 new Synchronization() {
230
231 @Override
232 public void afterCommit()
233 throws Exception {
234 synchronized(dispatchLock) {
235 dequeueCounter++;
236 dispatched.remove(node);
237 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
238 }
239 }
240
241 @Override
242 public void afterRollback() throws Exception {
243 synchronized(dispatchLock) {
244 if (isSlave()) {
245 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
246 } else {
247 // poisionAck will decrement - otherwise still inflight on client
248 }
249 }
250 }
251 });
252 }
253 index++;
254 acknowledge(context, ack, node);
255 if (ack.getLastMessageId().equals(messageId)) {
256 // contract prefetch if dispatch required a pull
257 if (getPrefetchSize() == 0) {
258 prefetchExtension = Math.max(0, prefetchExtension - index);
259 } else if (usePrefetchExtension && context.isInTransaction()) {
260 // extend prefetch window only if not a pulling consumer
261 prefetchExtension = Math.max(prefetchExtension, index);
262 }
263 destination = node.getRegionDestination();
264 callDispatchMatched = true;
265 break;
266 }
267 }
268 }
269 for (final MessageReference node : removeList) {
270 dispatched.remove(node);
271 }
272 // this only happens after a reconnect - get an ack which is not
273 // valid
274 if (!callDispatchMatched) {
275 LOG.warn("Could not correlate acknowledgment with dispatched message: "
276 + ack);
277 }
278 } else if (ack.isIndividualAck()) {
279 // Message was delivered and acknowledge - but only delete the
280 // individual message
281 for (final MessageReference node : dispatched) {
282 MessageId messageId = node.getMessageId();
283 if (ack.getLastMessageId().equals(messageId)) {
284 // this should never be within a transaction
285 dequeueCounter++;
286 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
287 destination = node.getRegionDestination();
288 acknowledge(context, ack, node);
289 dispatched.remove(node);
290 prefetchExtension = Math.max(0, prefetchExtension - 1);
291 callDispatchMatched = true;
292 break;
293 }
294 }
295 }else if (ack.isDeliveredAck()) {
296 // Message was delivered but not acknowledged: update pre-fetch
297 // counters.
298 int index = 0;
299 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
300 final MessageReference node = iter.next();
301 if (node.isExpired()) {
302 if (broker.isExpired(node)) {
303 node.getRegionDestination().messageExpired(context, this, node);
304 }
305 dispatched.remove(node);
306 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
307 }
308 if (ack.getLastMessageId().equals(node.getMessageId())) {
309 if (usePrefetchExtension) {
310 prefetchExtension = Math.max(prefetchExtension, index + 1);
311 }
312 destination = node.getRegionDestination();
313 callDispatchMatched = true;
314 break;
315 }
316 }
317 if (!callDispatchMatched) {
318 throw new JMSException(
319 "Could not correlate acknowledgment with dispatched message: "
320 + ack);
321 }
322 } else if (ack.isRedeliveredAck()) {
323 // Message was re-delivered but it was not yet considered to be
324 // a DLQ message.
325 boolean inAckRange = false;
326 for (final MessageReference node : dispatched) {
327 MessageId messageId = node.getMessageId();
328 if (ack.getFirstMessageId() == null
329 || ack.getFirstMessageId().equals(messageId)) {
330 inAckRange = true;
331 }
332 if (inAckRange) {
333 if (ack.getLastMessageId().equals(messageId)) {
334 destination = node.getRegionDestination();
335 callDispatchMatched = true;
336 break;
337 }
338 }
339 }
340 if (!callDispatchMatched) {
341 throw new JMSException(
342 "Could not correlate acknowledgment with dispatched message: "
343 + ack);
344 }
345 } else if (ack.isPoisonAck()) {
346 // TODO: what if the message is already in a DLQ???
347 // Handle the poison ACK case: we need to send the message to a
348 // DLQ
349 if (ack.isInTransaction()) {
350 throw new JMSException("Poison ack cannot be transacted: "
351 + ack);
352 }
353 int index = 0;
354 boolean inAckRange = false;
355 List<MessageReference> removeList = new ArrayList<MessageReference>();
356 for (final MessageReference node : dispatched) {
357 MessageId messageId = node.getMessageId();
358 if (ack.getFirstMessageId() == null
359 || ack.getFirstMessageId().equals(messageId)) {
360 inAckRange = true;
361 }
362 if (inAckRange) {
363 if (ack.getPoisonCause() != null) {
364 node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
365 ack.getPoisonCause().toString());
366 }
367 sendToDLQ(context, node);
368 node.getRegionDestination().getDestinationStatistics()
369 .getInflight().decrement();
370 removeList.add(node);
371 dequeueCounter++;
372 index++;
373 acknowledge(context, ack, node);
374 if (ack.getLastMessageId().equals(messageId)) {
375 prefetchExtension = Math.max(0, prefetchExtension
376 - (index + 1));
377 destination = node.getRegionDestination();
378 callDispatchMatched = true;
379 break;
380 }
381 }
382 }
383 for (final MessageReference node : removeList) {
384 dispatched.remove(node);
385 }
386 if (!callDispatchMatched) {
387 throw new JMSException(
388 "Could not correlate acknowledgment with dispatched message: "
389 + ack);
390 }
391 }
392 }
393 if (callDispatchMatched && destination != null) {
394 destination.wakeup();
395 dispatchPending();
396 } else {
397 if (isSlave()) {
398 throw new JMSException(
399 "Slave broker out of sync with master: Acknowledgment ("
400 + ack + ") was not in the dispatch list: "
401 + dispatched);
402 } else {
403 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
404 + ack);
405 }
406 }
407 }
408
409 /**
410 * Checks an ack versus the contents of the dispatched list.
411 *
412 * @param ack
413 * @throws JMSException if it does not match
414 */
415 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
416 MessageId firstAckedMsg = ack.getFirstMessageId();
417 MessageId lastAckedMsg = ack.getLastMessageId();
418 int checkCount = 0;
419 boolean checkFoundStart = false;
420 boolean checkFoundEnd = false;
421 for (MessageReference node : dispatched) {
422
423 if (firstAckedMsg == null) {
424 checkFoundStart = true;
425 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
426 checkFoundStart = true;
427 }
428
429 if (checkFoundStart) {
430 checkCount++;
431 }
432
433 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
434 checkFoundEnd = true;
435 break;
436 }
437 }
438 if (!checkFoundStart && firstAckedMsg != null)
439 throw new JMSException("Unmatched acknowledge: " + ack
440 + "; Could not find Message-ID " + firstAckedMsg
441 + " in dispatched-list (start of ack)");
442 if (!checkFoundEnd && lastAckedMsg != null)
443 throw new JMSException("Unmatched acknowledge: " + ack
444 + "; Could not find Message-ID " + lastAckedMsg
445 + " in dispatched-list (end of ack)");
446 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
447 throw new JMSException("Unmatched acknowledge: " + ack
448 + "; Expected message count (" + ack.getMessageCount()
449 + ") differs from count in dispatched-list (" + checkCount
450 + ")");
451 }
452 }
453
454 /**
455 * @param context
456 * @param node
457 * @throws IOException
458 * @throws Exception
459 */
460 protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
461 broker.getRoot().sendToDeadLetterQueue(context, node, this);
462 }
463
464 public int getInFlightSize() {
465 return dispatched.size();
466 }
467
468 /**
469 * Used to determine if the broker can dispatch to the consumer.
470 *
471 * @return
472 */
473 public boolean isFull() {
474 return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
475 }
476
477 /**
478 * @return true when 60% or more room is left for dispatching messages
479 */
480 public boolean isLowWaterMark() {
481 return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
482 }
483
484 /**
485 * @return true when 10% or less room is left for dispatching messages
486 */
487 public boolean isHighWaterMark() {
488 return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
489 }
490
491 @Override
492 public int countBeforeFull() {
493 return info.getPrefetchSize() + prefetchExtension - dispatched.size();
494 }
495
496 public int getPendingQueueSize() {
497 return pending.size();
498 }
499
500 public int getDispatchedQueueSize() {
501 return dispatched.size();
502 }
503
504 public long getDequeueCounter() {
505 return dequeueCounter;
506 }
507
508 public long getDispatchedCounter() {
509 return dispatchCounter;
510 }
511
512 public long getEnqueueCounter() {
513 return enqueueCounter;
514 }
515
516 @Override
517 public boolean isRecoveryRequired() {
518 return pending.isRecoveryRequired();
519 }
520
521 public PendingMessageCursor getPending() {
522 return this.pending;
523 }
524
525 public void setPending(PendingMessageCursor pending) {
526 this.pending = pending;
527 if (this.pending!=null) {
528 this.pending.setSystemUsage(usageManager);
529 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
530 }
531 }
532
533 @Override
534 public void add(ConnectionContext context, Destination destination) throws Exception {
535 synchronized(pendingLock) {
536 super.add(context, destination);
537 pending.add(context, destination);
538 }
539 }
540
541 @Override
542 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
543 List<MessageReference> rc = new ArrayList<MessageReference>();
544 synchronized(pendingLock) {
545 super.remove(context, destination);
546 // Here is a potential problem concerning Inflight stat:
547 // Messages not already committed or rolled back may not be removed from dispatched list at the moment
548 // Except if each commit or rollback callback action comes before remove of subscriber.
549 rc.addAll(pending.remove(context, destination));
550
551 // Synchronized to DispatchLock
552 synchronized(dispatchLock) {
553 ArrayList<MessageReference> references = new ArrayList<MessageReference>();
554 for (MessageReference r : dispatched) {
555 if( r.getRegionDestination() == destination) {
556 references.add(r);
557 }
558 }
559 rc.addAll(references);
560 destination.getDestinationStatistics().getDispatched().subtract(references.size());
561 destination.getDestinationStatistics().getInflight().subtract(references.size());
562 dispatched.removeAll(references);
563 }
564 }
565 return rc;
566 }
567
568 protected void dispatchPending() throws IOException {
569 if (!isSlave()) {
570 synchronized(pendingLock) {
571 try {
572 int numberToDispatch = countBeforeFull();
573 if (numberToDispatch > 0) {
574 setSlowConsumer(false);
575 setPendingBatchSize(pending, numberToDispatch);
576 int count = 0;
577 pending.reset();
578 while (pending.hasNext() && !isFull()
579 && count < numberToDispatch) {
580 MessageReference node = pending.next();
581 if (node == null) {
582 break;
583 }
584
585 // Synchronize between dispatched list and remove of message from pending list
586 // related to remove subscription action
587 synchronized(dispatchLock) {
588 pending.remove();
589 node.decrementReferenceCount();
590 if( !isDropped(node) && canDispatch(node)) {
591
592 // Message may have been sitting in the pending
593 // list a while waiting for the consumer to ak the message.
594 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
595 //increment number to dispatch
596 numberToDispatch++;
597 if (broker.isExpired(node)) {
598 node.getRegionDestination().messageExpired(context, this, node);
599 }
600 continue;
601 }
602 dispatch(node);
603 count++;
604 }
605 }
606 }
607 } else if (!isSlowConsumer()) {
608 setSlowConsumer(true);
609 for (Destination dest :destinations) {
610 dest.slowConsumer(context, this);
611 }
612 }
613 } finally {
614 pending.release();
615 }
616 }
617 }
618 }
619
620 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
621 pending.setMaxBatchSize(numberToDispatch);
622 }
623
624 protected boolean dispatch(final MessageReference node) throws IOException {
625 final Message message = node.getMessage();
626 if (message == null) {
627 return false;
628 }
629
630 okForAckAsDispatchDone.countDown();
631
632 // No reentrant lock - Patch needed to IndirectMessageReference on method lock
633 if (!isSlave()) {
634
635 MessageDispatch md = createMessageDispatch(node, message);
636 // NULL messages don't count... they don't get Acked.
637 if (node != QueueMessageReference.NULL_MESSAGE) {
638 dispatchCounter++;
639 dispatched.add(node);
640 } else {
641 prefetchExtension = Math.max(0, prefetchExtension - 1);
642 }
643 if (info.isDispatchAsync()) {
644 md.setTransmitCallback(new Runnable() {
645
646 public void run() {
647 // Since the message gets queued up in async dispatch,
648 // we don't want to
649 // decrease the reference count until it gets put on the
650 // wire.
651 onDispatch(node, message);
652 }
653 });
654 context.getConnection().dispatchAsync(md);
655 } else {
656 context.getConnection().dispatchSync(md);
657 onDispatch(node, message);
658 }
659 return true;
660 } else {
661 return false;
662 }
663 }
664
665 protected void onDispatch(final MessageReference node, final Message message) {
666 if (node.getRegionDestination() != null) {
667 if (node != QueueMessageReference.NULL_MESSAGE) {
668 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
669 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
670 if (LOG.isTraceEnabled()) {
671 LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
672 + message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
673 }
674 }
675 }
676
677 if (info.isDispatchAsync()) {
678 try {
679 dispatchPending();
680 } catch (IOException e) {
681 context.getConnection().serviceExceptionAsync(e);
682 }
683 }
684 }
685
686 /**
687 * inform the MessageConsumer on the client to change it's prefetch
688 *
689 * @param newPrefetch
690 */
691 public void updateConsumerPrefetch(int newPrefetch) {
692 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
693 ConsumerControl cc = new ConsumerControl();
694 cc.setConsumerId(info.getConsumerId());
695 cc.setPrefetch(newPrefetch);
696 context.getConnection().dispatchAsync(cc);
697 }
698 }
699
700 /**
701 * @param node
702 * @param message
703 * @return MessageDispatch
704 */
705 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
706 if (node == QueueMessageReference.NULL_MESSAGE) {
707 MessageDispatch md = new MessageDispatch();
708 md.setMessage(null);
709 md.setConsumerId(info.getConsumerId());
710 md.setDestination(null);
711 return md;
712 } else {
713 MessageDispatch md = new MessageDispatch();
714 md.setConsumerId(info.getConsumerId());
715 md.setDestination(node.getRegionDestination().getActiveMQDestination());
716 md.setMessage(message);
717 md.setRedeliveryCounter(node.getRedeliveryCounter());
718 return md;
719 }
720 }
721
722 /**
723 * Use when a matched message is about to be dispatched to the client.
724 *
725 * @param node
726 * @return false if the message should not be dispatched to the client
727 * (another sub may have already dispatched it for example).
728 * @throws IOException
729 */
730 protected abstract boolean canDispatch(MessageReference node) throws IOException;
731
732 protected abstract boolean isDropped(MessageReference node);
733
734 /**
735 * Used during acknowledgment to remove the message.
736 *
737 * @throws IOException
738 */
739 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
740
741
742 public int getMaxProducersToAudit() {
743 return maxProducersToAudit;
744 }
745
746 public void setMaxProducersToAudit(int maxProducersToAudit) {
747 this.maxProducersToAudit = maxProducersToAudit;
748 }
749
750 public int getMaxAuditDepth() {
751 return maxAuditDepth;
752 }
753
754 public void setMaxAuditDepth(int maxAuditDepth) {
755 this.maxAuditDepth = maxAuditDepth;
756 }
757
758 public boolean isUsePrefetchExtension() {
759 return usePrefetchExtension;
760 }
761
762 public void setUsePrefetchExtension(boolean usePrefetchExtension) {
763 this.usePrefetchExtension = usePrefetchExtension;
764 }
765 }