001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.LinkedList;
021 import java.util.concurrent.atomic.AtomicLong;
022 import javax.jms.JMSException;
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
027 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
028 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
029 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
030 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
031 import org.apache.activemq.command.ConsumerControl;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageAck;
035 import org.apache.activemq.command.MessageDispatch;
036 import org.apache.activemq.command.MessageDispatchNotification;
037 import org.apache.activemq.command.MessagePull;
038 import org.apache.activemq.command.Response;
039 import org.apache.activemq.transaction.Synchronization;
040 import org.apache.activemq.usage.SystemUsage;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 public class TopicSubscription extends AbstractSubscription {
045
046 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
047 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
048
049 protected PendingMessageCursor matched;
050 protected final SystemUsage usageManager;
051 protected AtomicLong dispatchedCounter = new AtomicLong();
052
053 boolean singleDestination = true;
054 Destination destination;
055
056 private int maximumPendingMessages = -1;
057 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
058 private int discarded;
059 private final Object matchedListMutex = new Object();
060 private final AtomicLong enqueueCounter = new AtomicLong(0);
061 private final AtomicLong dequeueCounter = new AtomicLong(0);
062 private int memoryUsageHighWaterMark = 95;
063 // allow duplicate suppression in a ring network of brokers
064 protected int maxProducersToAudit = 1024;
065 protected int maxAuditDepth = 1000;
066 protected boolean enableAudit = false;
067 protected ActiveMQMessageAudit audit;
068 protected boolean active = false;
069
070 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
071 super(broker, context, info);
072 this.usageManager = usageManager;
073 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
074 if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
075 this.matched = new VMPendingMessageCursor(false);
076 } else {
077 this.matched = new FilePendingMessageCursor(broker,matchedName,false);
078 }
079 }
080
081 public void init() throws Exception {
082 this.matched.setSystemUsage(usageManager);
083 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
084 this.matched.start();
085 if (enableAudit) {
086 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
087 }
088 this.active=true;
089 }
090
091 public void add(MessageReference node) throws Exception {
092 if (isDuplicate(node)) {
093 return;
094 }
095 enqueueCounter.incrementAndGet();
096 if (!isFull() && matched.isEmpty() && !isSlave()) {
097 // if maximumPendingMessages is set we will only discard messages which
098 // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
099 dispatch(node);
100 setSlowConsumer(false);
101 } else {
102 //we are slow
103 if(!isSlowConsumer()) {
104 setSlowConsumer(true);
105 for (Destination dest: destinations) {
106 dest.slowConsumer(getContext(), this);
107 }
108 }
109 if (maximumPendingMessages != 0) {
110 boolean warnedAboutWait = false;
111 while (active) {
112 synchronized (matchedListMutex) {
113 while (matched.isFull()) {
114 if (getContext().getStopping().get()) {
115 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
116 + node.getMessageId());
117 enqueueCounter.decrementAndGet();
118 return;
119 }
120 if (!warnedAboutWait) {
121 LOG.info(toString() + ": Pending message cursor [" + matched
122 + "] is full, temp usage ("
123 + +matched.getSystemUsage().getTempUsage().getPercentUsage()
124 + "%) or memory usage ("
125 + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
126 + "%) limit reached, blocking message add() pending the release of resources.");
127 warnedAboutWait = true;
128 }
129 matchedListMutex.wait(20);
130 }
131 //Temporary storage could be full - so just try to add the message
132 //see https://issues.apache.org/activemq/browse/AMQ-2475
133 if (matched.tryAddMessageLast(node, 10)) {
134 break;
135 }
136 }
137 }
138 synchronized (matchedListMutex) {
139
140 // NOTE - be careful about the slaveBroker!
141 if (maximumPendingMessages > 0) {
142 // calculate the high water mark from which point we
143 // will eagerly evict expired messages
144 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
145 if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
146 max = maximumPendingMessages;
147 }
148 if (!matched.isEmpty() && matched.size() > max) {
149 removeExpiredMessages();
150 }
151 // lets discard old messages as we are a slow consumer
152 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
153 int pageInSize = matched.size() - maximumPendingMessages;
154 // only page in a 1000 at a time - else we could
155 // blow da memory
156 pageInSize = Math.max(1000, pageInSize);
157 LinkedList<MessageReference> list = null;
158 MessageReference[] oldMessages=null;
159 synchronized(matched){
160 list = matched.pageInList(pageInSize);
161 oldMessages = messageEvictionStrategy.evictMessages(list);
162 for (MessageReference ref : list) {
163 ref.decrementReferenceCount();
164 }
165 }
166 int messagesToEvict = 0;
167 if (oldMessages != null){
168 messagesToEvict = oldMessages.length;
169 for (int i = 0; i < messagesToEvict; i++) {
170 MessageReference oldMessage = oldMessages[i];
171 discard(oldMessage);
172 }
173 }
174 // lets avoid an infinite loop if we are given a bad
175 // eviction strategy
176 // for a bad strategy lets just not evict
177 if (messagesToEvict == 0) {
178 LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
179 break;
180 }
181 }
182 }
183 }
184 dispatchMatched();
185 }
186 }
187 }
188
189 private boolean isDuplicate(MessageReference node) {
190 boolean duplicate = false;
191 if (enableAudit && audit != null) {
192 duplicate = audit.isDuplicate(node);
193 if (LOG.isDebugEnabled()) {
194 if (duplicate) {
195 LOG.debug("ignoring duplicate add: " + node.getMessageId());
196 }
197 }
198 }
199 return duplicate;
200 }
201
202 /**
203 * Discard any expired messages from the matched list. Called from a
204 * synchronized block.
205 *
206 * @throws IOException
207 */
208 protected void removeExpiredMessages() throws IOException {
209 try {
210 matched.reset();
211 while (matched.hasNext()) {
212 MessageReference node = matched.next();
213 node.decrementReferenceCount();
214 if (broker.isExpired(node)) {
215 matched.remove();
216 dispatchedCounter.incrementAndGet();
217 node.decrementReferenceCount();
218 node.getRegionDestination().getDestinationStatistics().getExpired().increment();
219 broker.messageExpired(getContext(), node, this);
220 break;
221 }
222 }
223 } finally {
224 matched.release();
225 }
226 }
227
228 public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
229 synchronized (matchedListMutex) {
230 try {
231 matched.reset();
232 while (matched.hasNext()) {
233 MessageReference node = matched.next();
234 node.decrementReferenceCount();
235 if (node.getMessageId().equals(mdn.getMessageId())) {
236 matched.remove();
237 dispatchedCounter.incrementAndGet();
238 node.decrementReferenceCount();
239 break;
240 }
241 }
242 } finally {
243 matched.release();
244 }
245 }
246 }
247
248 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
249 // Handle the standard acknowledgment case.
250 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
251 if (context.isInTransaction()) {
252 context.getTransaction().addSynchronization(new Synchronization() {
253
254 @Override
255 public void afterCommit() throws Exception {
256 synchronized (TopicSubscription.this) {
257 if (singleDestination && destination != null) {
258 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
259 }
260 }
261 dequeueCounter.addAndGet(ack.getMessageCount());
262 dispatchMatched();
263 }
264 });
265 } else {
266 if (singleDestination && destination != null) {
267 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
268 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
269 }
270 dequeueCounter.addAndGet(ack.getMessageCount());
271 }
272 dispatchMatched();
273 return;
274 } else if (ack.isDeliveredAck()) {
275 // Message was delivered but not acknowledged: update pre-fetch
276 // counters.
277 // also. get these for a consumer expired message.
278 if (destination != null && !ack.isInTransaction()) {
279 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
280 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
281 }
282 dequeueCounter.addAndGet(ack.getMessageCount());
283 dispatchMatched();
284 return;
285 } else if (ack.isRedeliveredAck()) {
286 // nothing to do atm
287 return;
288 }
289 throw new JMSException("Invalid acknowledgment: " + ack);
290 }
291
292 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
293 // not supported for topics
294 return null;
295 }
296
297 public int getPendingQueueSize() {
298 return matched();
299 }
300
301 public int getDispatchedQueueSize() {
302 return (int)(dispatchedCounter.get() - dequeueCounter.get());
303 }
304
305 public int getMaximumPendingMessages() {
306 return maximumPendingMessages;
307 }
308
309 public long getDispatchedCounter() {
310 return dispatchedCounter.get();
311 }
312
313 public long getEnqueueCounter() {
314 return enqueueCounter.get();
315 }
316
317 public long getDequeueCounter() {
318 return dequeueCounter.get();
319 }
320
321 /**
322 * @return the number of messages discarded due to being a slow consumer
323 */
324 public int discarded() {
325 synchronized (matchedListMutex) {
326 return discarded;
327 }
328 }
329
330 /**
331 * @return the number of matched messages (messages targeted for the
332 * subscription but not yet able to be dispatched due to the
333 * prefetch buffer being full).
334 */
335 public int matched() {
336 synchronized (matchedListMutex) {
337 return matched.size();
338 }
339 }
340
341 /**
342 * Sets the maximum number of pending messages that can be matched against
343 * this consumer before old messages are discarded.
344 */
345 public void setMaximumPendingMessages(int maximumPendingMessages) {
346 this.maximumPendingMessages = maximumPendingMessages;
347 }
348
349 public MessageEvictionStrategy getMessageEvictionStrategy() {
350 return messageEvictionStrategy;
351 }
352
353 /**
354 * Sets the eviction strategy used to decide which message to evict when the
355 * slow consumer needs to discard messages
356 */
357 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
358 this.messageEvictionStrategy = messageEvictionStrategy;
359 }
360
361 public int getMaxProducersToAudit() {
362 return maxProducersToAudit;
363 }
364
365 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
366 this.maxProducersToAudit = maxProducersToAudit;
367 if (audit != null) {
368 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
369 }
370 }
371
372 public int getMaxAuditDepth() {
373 return maxAuditDepth;
374 }
375
376 public synchronized void setMaxAuditDepth(int maxAuditDepth) {
377 this.maxAuditDepth = maxAuditDepth;
378 if (audit != null) {
379 audit.setAuditDepth(maxAuditDepth);
380 }
381 }
382
383 public boolean isEnableAudit() {
384 return enableAudit;
385 }
386
387 public synchronized void setEnableAudit(boolean enableAudit) {
388 this.enableAudit = enableAudit;
389 if (enableAudit && audit==null) {
390 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
391 }
392 }
393
394 // Implementation methods
395 // -------------------------------------------------------------------------
396 public boolean isFull() {
397 return getDispatchedQueueSize() >= info.getPrefetchSize();
398 }
399
400 public int getInFlightSize() {
401 return getDispatchedQueueSize();
402 }
403
404
405 /**
406 * @return true when 60% or more room is left for dispatching messages
407 */
408 public boolean isLowWaterMark() {
409 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
410 }
411
412 /**
413 * @return true when 10% or less room is left for dispatching messages
414 */
415 public boolean isHighWaterMark() {
416 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
417 }
418
419 /**
420 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
421 */
422 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
423 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
424 }
425
426 /**
427 * @return the memoryUsageHighWaterMark
428 */
429 public int getMemoryUsageHighWaterMark() {
430 return this.memoryUsageHighWaterMark;
431 }
432
433 /**
434 * @return the usageManager
435 */
436 public SystemUsage getUsageManager() {
437 return this.usageManager;
438 }
439
440 /**
441 * @return the matched
442 */
443 public PendingMessageCursor getMatched() {
444 return this.matched;
445 }
446
447 /**
448 * @param matched the matched to set
449 */
450 public void setMatched(PendingMessageCursor matched) {
451 this.matched = matched;
452 }
453
454 /**
455 * inform the MessageConsumer on the client to change it's prefetch
456 *
457 * @param newPrefetch
458 */
459 public void updateConsumerPrefetch(int newPrefetch) {
460 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
461 ConsumerControl cc = new ConsumerControl();
462 cc.setConsumerId(info.getConsumerId());
463 cc.setPrefetch(newPrefetch);
464 context.getConnection().dispatchAsync(cc);
465 }
466 }
467
468 private void dispatchMatched() throws IOException {
469 synchronized (matchedListMutex) {
470 if (!matched.isEmpty() && !isFull()) {
471 try {
472 matched.reset();
473
474 while (matched.hasNext() && !isFull()) {
475 MessageReference message = matched.next();
476 message.decrementReferenceCount();
477 matched.remove();
478 // Message may have been sitting in the matched list a
479 // while
480 // waiting for the consumer to ak the message.
481 if (message.isExpired()) {
482 discard(message);
483 continue; // just drop it.
484 }
485 dispatch(message);
486 }
487 } finally {
488 matched.release();
489 }
490 }
491 }
492 }
493
494 private void dispatch(final MessageReference node) throws IOException {
495 Message message = (Message)node;
496 node.incrementReferenceCount();
497 // Make sure we can dispatch a message.
498 MessageDispatch md = new MessageDispatch();
499 md.setMessage(message);
500 md.setConsumerId(info.getConsumerId());
501 md.setDestination(node.getRegionDestination().getActiveMQDestination());
502 dispatchedCounter.incrementAndGet();
503 // Keep track if this subscription is receiving messages from a single
504 // destination.
505 if (singleDestination) {
506 if (destination == null) {
507 destination = node.getRegionDestination();
508 } else {
509 if (destination != node.getRegionDestination()) {
510 singleDestination = false;
511 }
512 }
513 }
514 if (info.isDispatchAsync()) {
515 md.setTransmitCallback(new Runnable() {
516
517 public void run() {
518 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
519 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
520 node.decrementReferenceCount();
521 }
522 });
523 context.getConnection().dispatchAsync(md);
524 } else {
525 context.getConnection().dispatchSync(md);
526 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
527 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
528 node.decrementReferenceCount();
529 }
530 }
531
532 private void discard(MessageReference message) {
533 message.decrementReferenceCount();
534 matched.remove(message);
535 discarded++;
536 if(destination != null) {
537 destination.getDestinationStatistics().getDequeues().increment();
538 }
539 if (LOG.isDebugEnabled()) {
540 LOG.debug("Discarding message " + message);
541 }
542 Destination dest = message.getRegionDestination();
543 if (dest != null) {
544 dest.messageDiscarded(getContext(), this, message);
545 }
546 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
547 }
548
549 @Override
550 public String toString() {
551 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
552 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
553 }
554
555 public void destroy() {
556 this.active=false;
557 synchronized (matchedListMutex) {
558 try {
559 matched.destroy();
560 } catch (Exception e) {
561 LOG.warn("Failed to destroy cursor", e);
562 }
563 }
564 setSlowConsumer(false);
565 }
566
567 @Override
568 public int getPrefetchSize() {
569 return info.getPrefetchSize();
570 }
571
572 }