001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.LinkedList;
022 import java.util.List;
023 import java.util.Set;
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.CopyOnWriteArrayList;
027 import java.util.concurrent.CopyOnWriteArraySet;
028 import java.util.concurrent.Future;
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.broker.ProducerBrokerExchange;
032 import org.apache.activemq.broker.region.policy.DispatchPolicy;
033 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
034 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
035 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
036 import org.apache.activemq.command.ActiveMQDestination;
037 import org.apache.activemq.command.ExceptionResponse;
038 import org.apache.activemq.command.Message;
039 import org.apache.activemq.command.MessageAck;
040 import org.apache.activemq.command.MessageId;
041 import org.apache.activemq.command.ProducerAck;
042 import org.apache.activemq.command.ProducerInfo;
043 import org.apache.activemq.command.Response;
044 import org.apache.activemq.command.SubscriptionInfo;
045 import org.apache.activemq.filter.MessageEvaluationContext;
046 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
047 import org.apache.activemq.store.MessageRecoveryListener;
048 import org.apache.activemq.store.TopicMessageStore;
049 import org.apache.activemq.thread.Task;
050 import org.apache.activemq.thread.TaskRunner;
051 import org.apache.activemq.thread.TaskRunnerFactory;
052 import org.apache.activemq.thread.Valve;
053 import org.apache.activemq.transaction.Synchronization;
054 import org.apache.activemq.util.SubscriptionKey;
055 import org.slf4j.Logger;
056 import org.slf4j.LoggerFactory;
057
058 /**
059 * The Topic is a destination that sends a copy of a message to every active
060 * Subscription registered.
061 *
062 *
063 */
064 public class Topic extends BaseDestination implements Task {
065 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
066 private final TopicMessageStore topicStore;
067 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
068 protected final Valve dispatchValve = new Valve(true);
069 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
070 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
071 private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
072 private final TaskRunner taskRunner;
073 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
074 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
075 public void run() {
076 try {
077 Topic.this.taskRunner.wakeup();
078 } catch (InterruptedException e) {
079 }
080 };
081 };
082
083 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
084 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
085 super(brokerService, store, destination, parentStats);
086 this.topicStore = store;
087 // set default subscription recovery policy
088 subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
089 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
090 }
091
092 @Override
093 public void initialize() throws Exception {
094 super.initialize();
095 if (store != null) {
096 // AMQ-2586: Better to leave this stat at zero than to give the user
097 // misleading metrics.
098 // int messageCount = store.getMessageCount();
099 // destinationStatistics.getMessages().setCount(messageCount);
100 }
101 }
102
103 public List<Subscription> getConsumers() {
104 synchronized (consumers) {
105 return new ArrayList<Subscription>(consumers);
106 }
107 }
108
109 public boolean lock(MessageReference node, LockOwner sub) {
110 return true;
111 }
112
113 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
114
115 super.addSubscription(context, sub);
116
117 if (!sub.getConsumerInfo().isDurable()) {
118
119 // Do a retroactive recovery if needed.
120 if (sub.getConsumerInfo().isRetroactive()) {
121
122 // synchronize with dispatch method so that no new messages are
123 // sent
124 // while we are recovering a subscription to avoid out of order
125 // messages.
126 dispatchValve.turnOff();
127 try {
128
129 synchronized (consumers) {
130 sub.add(context, this);
131 consumers.add(sub);
132 }
133 subscriptionRecoveryPolicy.recover(context, this, sub);
134
135 } finally {
136 dispatchValve.turnOn();
137 }
138
139 } else {
140 synchronized (consumers) {
141 sub.add(context, this);
142 consumers.add(sub);
143 }
144 }
145 } else {
146 sub.add(context, this);
147 DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
148 durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
149 }
150 }
151
152 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
153 throws Exception {
154 if (!sub.getConsumerInfo().isDurable()) {
155 super.removeSubscription(context, sub, lastDeliveredSequenceId);
156 synchronized (consumers) {
157 consumers.remove(sub);
158 }
159 }
160 sub.remove(context, this);
161 }
162
163 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
164 if (topicStore != null) {
165 topicStore.deleteSubscription(key.clientId, key.subscriptionName);
166 DurableTopicSubscription removed = durableSubcribers.remove(key);
167 if (removed != null) {
168 destinationStatistics.getConsumers().decrement();
169 // deactivate and remove
170 removed.deactivate(false);
171 consumers.remove(removed);
172 }
173 }
174 }
175
176 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
177 // synchronize with dispatch method so that no new messages are sent
178 // while
179 // we are recovering a subscription to avoid out of order messages.
180 dispatchValve.turnOff();
181 try {
182
183 if (topicStore == null) {
184 return;
185 }
186
187 // Recover the durable subscription.
188 String clientId = subscription.getSubscriptionKey().getClientId();
189 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
190 String selector = subscription.getConsumerInfo().getSelector();
191 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
192 if (info != null) {
193 // Check to see if selector changed.
194 String s1 = info.getSelector();
195 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
196 // Need to delete the subscription
197 topicStore.deleteSubscription(clientId, subscriptionName);
198 info = null;
199 } else {
200 synchronized (consumers) {
201 consumers.add(subscription);
202 }
203 }
204 }
205 // Do we need to create the subscription?
206 if (info == null) {
207 info = new SubscriptionInfo();
208 info.setClientId(clientId);
209 info.setSelector(selector);
210 info.setSubscriptionName(subscriptionName);
211 info.setDestination(getActiveMQDestination());
212 // This destination is an actual destination id.
213 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
214 // This destination might be a pattern
215 synchronized (consumers) {
216 consumers.add(subscription);
217 topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
218 }
219 }
220
221 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
222 msgContext.setDestination(destination);
223 if (subscription.isRecoveryRequired()) {
224 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
225 public boolean recoverMessage(Message message) throws Exception {
226 message.setRegionDestination(Topic.this);
227 try {
228 msgContext.setMessageReference(message);
229 if (subscription.matches(message, msgContext)) {
230 subscription.add(message);
231 }
232 } catch (IOException e) {
233 LOG.error("Failed to recover this message " + message);
234 }
235 return true;
236 }
237
238 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
239 throw new RuntimeException("Should not be called.");
240 }
241
242 public boolean hasSpace() {
243 return true;
244 }
245
246 public boolean isDuplicate(MessageId id) {
247 return false;
248 }
249 });
250 }
251 } finally {
252 dispatchValve.turnOn();
253 }
254 }
255
256 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
257 synchronized (consumers) {
258 consumers.remove(sub);
259 }
260 sub.remove(context, this);
261 }
262
263 protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
264 if (subscription.getConsumerInfo().isRetroactive()) {
265 subscriptionRecoveryPolicy.recover(context, this, subscription);
266 }
267 }
268
269 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
270 final ConnectionContext context = producerExchange.getConnectionContext();
271
272 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
273 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
274 && !context.isInRecoveryMode();
275
276 // There is delay between the client sending it and it arriving at the
277 // destination.. it may have expired.
278 if (message.isExpired()) {
279 broker.messageExpired(context, message, null);
280 getDestinationStatistics().getExpired().increment();
281 if (sendProducerAck) {
282 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
283 context.getConnection().dispatchAsync(ack);
284 }
285 return;
286 }
287
288 if (memoryUsage.isFull()) {
289 isFull(context, memoryUsage);
290 fastProducer(context, producerInfo);
291
292 if (isProducerFlowControl() && context.isProducerFlowControl()) {
293
294 if (warnOnProducerFlowControl) {
295 warnOnProducerFlowControl = false;
296 LOG
297 .info("Usage Manager memory limit ("
298 + memoryUsage.getLimit()
299 + ") reached for "
300 + getActiveMQDestination().getQualifiedName()
301 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
302 + " See http://activemq.apache.org/producer-flow-control.html for more info");
303 }
304
305 if (systemUsage.isSendFailIfNoSpace()) {
306 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
307 + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
308 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
309 + " See http://activemq.apache.org/producer-flow-control.html for more info");
310 }
311
312 // We can avoid blocking due to low usage if the producer is
313 // sending
314 // a sync message or
315 // if it is using a producer window
316 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
317 synchronized (messagesWaitingForSpace) {
318 messagesWaitingForSpace.add(new Runnable() {
319 public void run() {
320 try {
321
322 // While waiting for space to free up... the
323 // message may have expired.
324 if (message.isExpired()) {
325 broker.messageExpired(context, message, null);
326 getDestinationStatistics().getExpired().increment();
327 } else {
328 doMessageSend(producerExchange, message);
329 }
330
331 if (sendProducerAck) {
332 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
333 .getSize());
334 context.getConnection().dispatchAsync(ack);
335 } else {
336 Response response = new Response();
337 response.setCorrelationId(message.getCommandId());
338 context.getConnection().dispatchAsync(response);
339 }
340
341 } catch (Exception e) {
342 if (!sendProducerAck && !context.isInRecoveryMode()) {
343 ExceptionResponse response = new ExceptionResponse(e);
344 response.setCorrelationId(message.getCommandId());
345 context.getConnection().dispatchAsync(response);
346 }
347 }
348
349 }
350 });
351
352 registerCallbackForNotFullNotification();
353 context.setDontSendReponse(true);
354 return;
355 }
356
357 } else {
358 // Producer flow control cannot be used, so we have do the
359 // flow
360 // control at the broker
361 // by blocking this thread until there is space available.
362
363 if (memoryUsage.isFull()) {
364 if (context.isInTransaction()) {
365
366 int count = 0;
367 while (!memoryUsage.waitForSpace(1000)) {
368 if (context.getStopping().get()) {
369 throw new IOException("Connection closed, send aborted.");
370 }
371 if (count > 2 && context.isInTransaction()) {
372 count = 0;
373 int size = context.getTransaction().size();
374 LOG.warn("Waiting for space to send transacted message - transaction elements = "
375 + size + " need more space to commit. Message = " + message);
376 }
377 }
378 } else {
379 waitForSpace(
380 context,
381 memoryUsage,
382 "Usage Manager memory limit reached. Stopping producer ("
383 + message.getProducerId()
384 + ") to prevent flooding "
385 + getActiveMQDestination().getQualifiedName()
386 + "."
387 + " See http://activemq.apache.org/producer-flow-control.html for more info");
388 }
389 }
390
391 // The usage manager could have delayed us by the time
392 // we unblock the message could have expired..
393 if (message.isExpired()) {
394 getDestinationStatistics().getExpired().increment();
395 if (LOG.isDebugEnabled()) {
396 LOG.debug("Expired message: " + message);
397 }
398 return;
399 }
400 }
401 }
402 }
403
404 doMessageSend(producerExchange, message);
405 messageDelivered(context, message);
406 if (sendProducerAck) {
407 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
408 context.getConnection().dispatchAsync(ack);
409 }
410 }
411
412 /**
413 * do send the message - this needs to be synchronized to ensure messages
414 * are stored AND dispatched in the right order
415 *
416 * @param producerExchange
417 * @param message
418 * @throws IOException
419 * @throws Exception
420 */
421 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
422 throws IOException, Exception {
423 final ConnectionContext context = producerExchange.getConnectionContext();
424 message.setRegionDestination(this);
425 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
426 Future<Object> result = null;
427
428 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
429 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
430 final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
431 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
432 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
433 + " See http://activemq.apache.org/producer-flow-control.html for more info";
434 if (systemUsage.isSendFailIfNoSpace()) {
435 throw new javax.jms.ResourceAllocationException(logMessage);
436 }
437
438 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
439 }
440 result = topicStore.asyncAddTopicMessage(context, message);
441 }
442
443 message.incrementReferenceCount();
444
445 if (context.isInTransaction()) {
446 context.getTransaction().addSynchronization(new Synchronization() {
447 @Override
448 public void afterCommit() throws Exception {
449 // It could take while before we receive the commit
450 // operration.. by that time the message could have
451 // expired..
452 if (broker.isExpired(message)) {
453 getDestinationStatistics().getExpired().increment();
454 broker.messageExpired(context, message, null);
455 message.decrementReferenceCount();
456 return;
457 }
458 try {
459 dispatch(context, message);
460 } finally {
461 message.decrementReferenceCount();
462 }
463 }
464 });
465
466 } else {
467 try {
468 dispatch(context, message);
469 } finally {
470 message.decrementReferenceCount();
471 }
472 }
473 if (result != null && !result.isCancelled()) {
474 try {
475 result.get();
476 } catch (CancellationException e) {
477 // ignore - the task has been cancelled if the message
478 // has already been deleted
479 }
480 }
481
482 }
483
484 private boolean canOptimizeOutPersistence() {
485 return durableSubcribers.size() == 0;
486 }
487
488 @Override
489 public String toString() {
490 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
491 }
492
493 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
494 final MessageReference node) throws IOException {
495 if (topicStore != null && node.isPersistent()) {
496 DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
497 SubscriptionKey key = dsub.getSubscriptionKey();
498 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
499 }
500 messageConsumed(context, node);
501 }
502
503 public void gc() {
504 }
505
506 public Message loadMessage(MessageId messageId) throws IOException {
507 return topicStore != null ? topicStore.getMessage(messageId) : null;
508 }
509
510 public void start() throws Exception {
511 this.subscriptionRecoveryPolicy.start();
512 if (memoryUsage != null) {
513 memoryUsage.start();
514 }
515
516 }
517
518 public void stop() throws Exception {
519 if (taskRunner != null) {
520 taskRunner.shutdown();
521 }
522 this.subscriptionRecoveryPolicy.stop();
523 if (memoryUsage != null) {
524 memoryUsage.stop();
525 }
526 if (this.topicStore != null) {
527 this.topicStore.stop();
528 }
529 }
530
531 public Message[] browse() {
532 final Set<Message> result = new CopyOnWriteArraySet<Message>();
533 try {
534 if (topicStore != null) {
535 topicStore.recover(new MessageRecoveryListener() {
536 public boolean recoverMessage(Message message) throws Exception {
537 result.add(message);
538 return true;
539 }
540
541 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
542 return true;
543 }
544
545 public boolean hasSpace() {
546 return true;
547 }
548
549 public boolean isDuplicate(MessageId id) {
550 return false;
551 }
552 });
553 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
554 if (msgs != null) {
555 for (int i = 0; i < msgs.length; i++) {
556 result.add(msgs[i]);
557 }
558 }
559 }
560 } catch (Throwable e) {
561 LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
562 }
563 return result.toArray(new Message[result.size()]);
564 }
565
566 public boolean iterate() {
567 synchronized (messagesWaitingForSpace) {
568 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
569 Runnable op = messagesWaitingForSpace.removeFirst();
570 op.run();
571 }
572
573 if (!messagesWaitingForSpace.isEmpty()) {
574 registerCallbackForNotFullNotification();
575 }
576 }
577 return false;
578 }
579
580 private void registerCallbackForNotFullNotification() {
581 // If the usage manager is not full, then the task will not
582 // get called..
583 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
584 // so call it directly here.
585 sendMessagesWaitingForSpaceTask.run();
586 }
587 }
588
589 // Properties
590 // -------------------------------------------------------------------------
591
592 public DispatchPolicy getDispatchPolicy() {
593 return dispatchPolicy;
594 }
595
596 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
597 this.dispatchPolicy = dispatchPolicy;
598 }
599
600 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
601 return subscriptionRecoveryPolicy;
602 }
603
604 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
605 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
606 }
607
608 // Implementation methods
609 // -------------------------------------------------------------------------
610
611 public final void wakeup() {
612 }
613
614 protected void dispatch(final ConnectionContext context, Message message) throws Exception {
615 // AMQ-2586: Better to leave this stat at zero than to give the user
616 // misleading metrics.
617 // destinationStatistics.getMessages().increment();
618 destinationStatistics.getEnqueues().increment();
619 dispatchValve.increment();
620 MessageEvaluationContext msgContext = null;
621 try {
622 if (!subscriptionRecoveryPolicy.add(context, message)) {
623 return;
624 }
625 synchronized (consumers) {
626 if (consumers.isEmpty()) {
627 onMessageWithNoConsumers(context, message);
628 return;
629 }
630 }
631 msgContext = context.getMessageEvaluationContext();
632 msgContext.setDestination(destination);
633 msgContext.setMessageReference(message);
634 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
635 onMessageWithNoConsumers(context, message);
636 }
637
638 } finally {
639 dispatchValve.decrement();
640 if (msgContext != null) {
641 msgContext.clear();
642 }
643 }
644 }
645
646 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
647 broker.messageExpired(context, reference, subs);
648 // AMQ-2586: Better to leave this stat at zero than to give the user
649 // misleading metrics.
650 // destinationStatistics.getMessages().decrement();
651 destinationStatistics.getEnqueues().decrement();
652 destinationStatistics.getExpired().increment();
653 MessageAck ack = new MessageAck();
654 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
655 ack.setDestination(destination);
656 ack.setMessageID(reference.getMessageId());
657 try {
658 acknowledge(context, subs, ack, reference);
659 } catch (IOException e) {
660 LOG.error("Failed to remove expired Message from the store ", e);
661 }
662 }
663
664 @Override
665 protected Logger getLog() {
666 return LOG;
667 }
668
669 }