001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import javax.jms.ResourceAllocationException;
021 import org.apache.activemq.advisory.AdvisorySupport;
022 import org.apache.activemq.broker.Broker;
023 import org.apache.activemq.broker.BrokerService;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.broker.ProducerBrokerExchange;
026 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
027 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.ActiveMQTopic;
030 import org.apache.activemq.command.Message;
031 import org.apache.activemq.command.MessageDispatchNotification;
032 import org.apache.activemq.command.ProducerInfo;
033 import org.apache.activemq.state.ProducerState;
034 import org.apache.activemq.store.MessageStore;
035 import org.apache.activemq.usage.MemoryUsage;
036 import org.apache.activemq.usage.SystemUsage;
037 import org.apache.activemq.usage.Usage;
038 import org.slf4j.Logger;
039
040 /**
041 *
042 */
043 public abstract class BaseDestination implements Destination {
044 /**
045 * The maximum number of messages to page in to the destination from
046 * persistent storage
047 */
048 public static final int MAX_PAGE_SIZE = 200;
049 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
050 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
051 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
052 public static final int MAX_PRODUCERS_TO_AUDIT = 64;
053 public static final int MAX_AUDIT_DEPTH = 2048;
054
055 protected final ActiveMQDestination destination;
056 protected final Broker broker;
057 protected final MessageStore store;
058 protected SystemUsage systemUsage;
059 protected MemoryUsage memoryUsage;
060 private boolean producerFlowControl = true;
061 protected boolean warnOnProducerFlowControl = true;
062 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
063
064 private int maxProducersToAudit = 1024;
065 private int maxAuditDepth = 2048;
066 private boolean enableAudit = true;
067 private int maxPageSize = MAX_PAGE_SIZE;
068 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
069 private boolean useCache = true;
070 private int minimumMessageSize = 1024;
071 private boolean lazyDispatch = false;
072 private boolean advisoryForSlowConsumers;
073 private boolean advisdoryForFastProducers;
074 private boolean advisoryForDiscardingMessages;
075 private boolean advisoryWhenFull;
076 private boolean advisoryForDelivery;
077 private boolean advisoryForConsumed;
078 private boolean sendAdvisoryIfNoConsumers;
079 protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
080 protected final BrokerService brokerService;
081 protected final Broker regionBroker;
082 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
083 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
084 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
085 protected int cursorMemoryHighWaterMark = 70;
086 protected int storeUsageHighWaterMark = 100;
087 private SlowConsumerStrategy slowConsumerStrategy;
088 private boolean prioritizedMessages;
089 private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
090 private boolean gcIfInactive;
091 private long lastActiveTime=0l;
092 private boolean reduceMemoryFootprint = false;
093
094 /**
095 * @param broker
096 * @param store
097 * @param destination
098 * @param parentStats
099 * @throws Exception
100 */
101 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
102 this.brokerService = brokerService;
103 this.broker = brokerService.getBroker();
104 this.store = store;
105 this.destination = destination;
106 // let's copy the enabled property from the parent DestinationStatistics
107 this.destinationStatistics.setEnabled(parentStats.isEnabled());
108 this.destinationStatistics.setParent(parentStats);
109 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
110 this.memoryUsage = this.systemUsage.getMemoryUsage();
111 this.memoryUsage.setUsagePortion(1.0f);
112 this.regionBroker = brokerService.getRegionBroker();
113 }
114
115 /**
116 * initialize the destination
117 *
118 * @throws Exception
119 */
120 public void initialize() throws Exception {
121 // Let the store know what usage manager we are using so that he can
122 // flush messages to disk when usage gets high.
123 if (store != null) {
124 store.setMemoryUsage(this.memoryUsage);
125 }
126 }
127
128 /**
129 * @return the producerFlowControl
130 */
131 public boolean isProducerFlowControl() {
132 return producerFlowControl;
133 }
134
135 /**
136 * @param producerFlowControl the producerFlowControl to set
137 */
138 public void setProducerFlowControl(boolean producerFlowControl) {
139 this.producerFlowControl = producerFlowControl;
140 }
141
142 /**
143 * Set's the interval at which warnings about producers being blocked by
144 * resource usage will be triggered. Values of 0 or less will disable
145 * warnings
146 *
147 * @param blockedProducerWarningInterval the interval at which warning about
148 * blocked producers will be triggered.
149 */
150 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
151 this.blockedProducerWarningInterval = blockedProducerWarningInterval;
152 }
153
154 /**
155 *
156 * @return the interval at which warning about blocked producers will be
157 * triggered.
158 */
159 public long getBlockedProducerWarningInterval() {
160 return blockedProducerWarningInterval;
161 }
162
163 /**
164 * @return the maxProducersToAudit
165 */
166 public int getMaxProducersToAudit() {
167 return maxProducersToAudit;
168 }
169
170 /**
171 * @param maxProducersToAudit the maxProducersToAudit to set
172 */
173 public void setMaxProducersToAudit(int maxProducersToAudit) {
174 this.maxProducersToAudit = maxProducersToAudit;
175 }
176
177 /**
178 * @return the maxAuditDepth
179 */
180 public int getMaxAuditDepth() {
181 return maxAuditDepth;
182 }
183
184 /**
185 * @param maxAuditDepth the maxAuditDepth to set
186 */
187 public void setMaxAuditDepth(int maxAuditDepth) {
188 this.maxAuditDepth = maxAuditDepth;
189 }
190
191 /**
192 * @return the enableAudit
193 */
194 public boolean isEnableAudit() {
195 return enableAudit;
196 }
197
198 /**
199 * @param enableAudit the enableAudit to set
200 */
201 public void setEnableAudit(boolean enableAudit) {
202 this.enableAudit = enableAudit;
203 }
204
205 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
206 destinationStatistics.getProducers().increment();
207 this.lastActiveTime=0l;
208 }
209
210 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
211 destinationStatistics.getProducers().decrement();
212 }
213
214 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
215 destinationStatistics.getConsumers().increment();
216 this.lastActiveTime=0l;
217 }
218
219 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
220 destinationStatistics.getConsumers().decrement();
221 }
222
223
224 public final MemoryUsage getMemoryUsage() {
225 return memoryUsage;
226 }
227
228 public DestinationStatistics getDestinationStatistics() {
229 return destinationStatistics;
230 }
231
232 public ActiveMQDestination getActiveMQDestination() {
233 return destination;
234 }
235
236 public final String getName() {
237 return getActiveMQDestination().getPhysicalName();
238 }
239
240 public final MessageStore getMessageStore() {
241 return store;
242 }
243
244 public final boolean isActive() {
245 return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
246 }
247
248 public int getMaxPageSize() {
249 return maxPageSize;
250 }
251
252 public void setMaxPageSize(int maxPageSize) {
253 this.maxPageSize = maxPageSize;
254 }
255
256 public int getMaxBrowsePageSize() {
257 return this.maxBrowsePageSize;
258 }
259
260 public void setMaxBrowsePageSize(int maxPageSize) {
261 this.maxBrowsePageSize = maxPageSize;
262 }
263
264 public int getMaxExpirePageSize() {
265 return this.maxExpirePageSize;
266 }
267
268 public void setMaxExpirePageSize(int maxPageSize) {
269 this.maxExpirePageSize = maxPageSize;
270 }
271
272 public void setExpireMessagesPeriod(long expireMessagesPeriod) {
273 this.expireMessagesPeriod = expireMessagesPeriod;
274 }
275
276 public long getExpireMessagesPeriod() {
277 return expireMessagesPeriod;
278 }
279
280 public boolean isUseCache() {
281 return useCache;
282 }
283
284 public void setUseCache(boolean useCache) {
285 this.useCache = useCache;
286 }
287
288 public int getMinimumMessageSize() {
289 return minimumMessageSize;
290 }
291
292 public void setMinimumMessageSize(int minimumMessageSize) {
293 this.minimumMessageSize = minimumMessageSize;
294 }
295
296 public boolean isLazyDispatch() {
297 return lazyDispatch;
298 }
299
300 public void setLazyDispatch(boolean lazyDispatch) {
301 this.lazyDispatch = lazyDispatch;
302 }
303
304 protected long getDestinationSequenceId() {
305 return regionBroker.getBrokerSequenceId();
306 }
307
308 /**
309 * @return the advisoryForSlowConsumers
310 */
311 public boolean isAdvisoryForSlowConsumers() {
312 return advisoryForSlowConsumers;
313 }
314
315 /**
316 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
317 */
318 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
319 this.advisoryForSlowConsumers = advisoryForSlowConsumers;
320 }
321
322 /**
323 * @return the advisoryForDiscardingMessages
324 */
325 public boolean isAdvisoryForDiscardingMessages() {
326 return advisoryForDiscardingMessages;
327 }
328
329 /**
330 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
331 * set
332 */
333 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
334 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
335 }
336
337 /**
338 * @return the advisoryWhenFull
339 */
340 public boolean isAdvisoryWhenFull() {
341 return advisoryWhenFull;
342 }
343
344 /**
345 * @param advisoryWhenFull the advisoryWhenFull to set
346 */
347 public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
348 this.advisoryWhenFull = advisoryWhenFull;
349 }
350
351 /**
352 * @return the advisoryForDelivery
353 */
354 public boolean isAdvisoryForDelivery() {
355 return advisoryForDelivery;
356 }
357
358 /**
359 * @param advisoryForDelivery the advisoryForDelivery to set
360 */
361 public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
362 this.advisoryForDelivery = advisoryForDelivery;
363 }
364
365 /**
366 * @return the advisoryForConsumed
367 */
368 public boolean isAdvisoryForConsumed() {
369 return advisoryForConsumed;
370 }
371
372 /**
373 * @param advisoryForConsumed the advisoryForConsumed to set
374 */
375 public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
376 this.advisoryForConsumed = advisoryForConsumed;
377 }
378
379 /**
380 * @return the advisdoryForFastProducers
381 */
382 public boolean isAdvisdoryForFastProducers() {
383 return advisdoryForFastProducers;
384 }
385
386 /**
387 * @param advisdoryForFastProducers the advisdoryForFastProducers to set
388 */
389 public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
390 this.advisdoryForFastProducers = advisdoryForFastProducers;
391 }
392
393 public boolean isSendAdvisoryIfNoConsumers() {
394 return sendAdvisoryIfNoConsumers;
395 }
396
397 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
398 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
399 }
400
401 /**
402 * @return the dead letter strategy
403 */
404 public DeadLetterStrategy getDeadLetterStrategy() {
405 return deadLetterStrategy;
406 }
407
408 /**
409 * set the dead letter strategy
410 *
411 * @param deadLetterStrategy
412 */
413 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
414 this.deadLetterStrategy = deadLetterStrategy;
415 }
416
417 public int getCursorMemoryHighWaterMark() {
418 return this.cursorMemoryHighWaterMark;
419 }
420
421 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
422 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
423 }
424
425 /**
426 * called when message is consumed
427 *
428 * @param context
429 * @param messageReference
430 */
431 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
432 if (advisoryForConsumed) {
433 broker.messageConsumed(context, messageReference);
434 }
435 }
436
437 /**
438 * Called when message is delivered to the broker
439 *
440 * @param context
441 * @param messageReference
442 */
443 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
444 if (advisoryForDelivery) {
445 broker.messageDelivered(context, messageReference);
446 }
447 }
448
449 /**
450 * Called when a message is discarded - e.g. running low on memory This will
451 * happen only if the policy is enabled - e.g. non durable topics
452 *
453 * @param context
454 * @param messageReference
455 */
456 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
457 if (advisoryForDiscardingMessages) {
458 broker.messageDiscarded(context, sub, messageReference);
459 }
460 }
461
462 /**
463 * Called when there is a slow consumer
464 *
465 * @param context
466 * @param subs
467 */
468 public void slowConsumer(ConnectionContext context, Subscription subs) {
469 if (advisoryForSlowConsumers) {
470 broker.slowConsumer(context, this, subs);
471 }
472 if (slowConsumerStrategy != null) {
473 slowConsumerStrategy.slowConsumer(context, subs);
474 }
475 }
476
477 /**
478 * Called to notify a producer is too fast
479 *
480 * @param context
481 * @param producerInfo
482 */
483 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
484 if (advisdoryForFastProducers) {
485 broker.fastProducer(context, producerInfo);
486 }
487 }
488
489 /**
490 * Called when a Usage reaches a limit
491 *
492 * @param context
493 * @param usage
494 */
495 public void isFull(ConnectionContext context, Usage usage) {
496 if (advisoryWhenFull) {
497 broker.isFull(context, this, usage);
498 }
499 }
500
501 public void dispose(ConnectionContext context) throws IOException {
502 if (this.store != null) {
503 this.store.removeAllMessages(context);
504 this.store.dispose(context);
505 }
506 this.destinationStatistics.setParent(null);
507 this.memoryUsage.stop();
508 }
509
510 /**
511 * Provides a hook to allow messages with no consumer to be processed in
512 * some way - such as to send to a dead letter queue or something..
513 */
514 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
515 if (!msg.isPersistent()) {
516 if (isSendAdvisoryIfNoConsumers()) {
517 // allow messages with no consumers to be dispatched to a dead
518 // letter queue
519 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
520
521 Message message = msg.copy();
522 // The original destination and transaction id do not get
523 // filled when the message is first sent,
524 // it is only populated if the message is routed to another
525 // destination like the DLQ
526 if (message.getOriginalDestination() != null) {
527 message.setOriginalDestination(message.getDestination());
528 }
529 if (message.getOriginalTransactionId() != null) {
530 message.setOriginalTransactionId(message.getTransactionId());
531 }
532
533 ActiveMQTopic advisoryTopic;
534 if (destination.isQueue()) {
535 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
536 } else {
537 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
538 }
539 message.setDestination(advisoryTopic);
540 message.setTransactionId(null);
541
542 // Disable flow control for this since since we don't want
543 // to block.
544 boolean originalFlowControl = context.isProducerFlowControl();
545 try {
546 context.setProducerFlowControl(false);
547 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
548 producerExchange.setMutable(false);
549 producerExchange.setConnectionContext(context);
550 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
551 context.getBroker().send(producerExchange, message);
552 } finally {
553 context.setProducerFlowControl(originalFlowControl);
554 }
555
556 }
557 }
558 }
559 }
560
561 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
562 }
563
564 public final int getStoreUsageHighWaterMark() {
565 return this.storeUsageHighWaterMark;
566 }
567
568 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
569 this.storeUsageHighWaterMark = storeUsageHighWaterMark;
570 }
571
572 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
573 waitForSpace(context, usage, 100, warning);
574 }
575
576 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
577 if (systemUsage.isSendFailIfNoSpace()) {
578 getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
579 throw new ResourceAllocationException(warning);
580 }
581 if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
582 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
583 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
584 throw new ResourceAllocationException(warning);
585 }
586 } else {
587 long start = System.currentTimeMillis();
588 long nextWarn = start;
589 while (!usage.waitForSpace(1000, highWaterMark)) {
590 if (context.getStopping().get()) {
591 throw new IOException("Connection closed, send aborted.");
592 }
593
594 long now = System.currentTimeMillis();
595 if (now >= nextWarn) {
596 getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
597 nextWarn = now + blockedProducerWarningInterval;
598 }
599 }
600 }
601 }
602
603 protected abstract Logger getLog();
604
605 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
606 this.slowConsumerStrategy = slowConsumerStrategy;
607 }
608
609 public SlowConsumerStrategy getSlowConsumerStrategy() {
610 return this.slowConsumerStrategy;
611 }
612
613
614 public boolean isPrioritizedMessages() {
615 return this.prioritizedMessages;
616 }
617
618 public void setPrioritizedMessages(boolean prioritizedMessages) {
619 this.prioritizedMessages = prioritizedMessages;
620 if (store != null) {
621 store.setPrioritizedMessages(prioritizedMessages);
622 }
623 }
624
625 /**
626 * @return the inactiveTimoutBeforeGC
627 */
628 public long getInactiveTimoutBeforeGC() {
629 return this.inactiveTimoutBeforeGC;
630 }
631
632 /**
633 * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
634 */
635 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
636 this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
637 }
638
639 /**
640 * @return the gcIfInactive
641 */
642 public boolean isGcIfInactive() {
643 return this.gcIfInactive;
644 }
645
646 /**
647 * @param gcIfInactive the gcIfInactive to set
648 */
649 public void setGcIfInactive(boolean gcIfInactive) {
650 this.gcIfInactive = gcIfInactive;
651 }
652
653 public void markForGC(long timeStamp) {
654 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
655 && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
656 this.lastActiveTime = timeStamp;
657 }
658 }
659
660 public boolean canGC() {
661 boolean result = false;
662 if (isGcIfInactive()&& this.lastActiveTime != 0l) {
663 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
664 result = true;
665 }
666 }
667 return result;
668 }
669
670 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
671 this.reduceMemoryFootprint = reduceMemoryFootprint;
672 }
673
674 protected boolean isReduceMemoryFootprint() {
675 return this.reduceMemoryFootprint;
676 }
677 }