001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.util.ArrayList;
022 import java.util.Collections;
023 import java.util.HashMap;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Set;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArrayList;
029 import java.util.concurrent.ThreadPoolExecutor;
030 import javax.jms.InvalidClientIDException;
031 import javax.jms.JMSException;
032 import org.apache.activemq.advisory.AdvisorySupport;
033 import org.apache.activemq.broker.Broker;
034 import org.apache.activemq.broker.BrokerService;
035 import org.apache.activemq.broker.Connection;
036 import org.apache.activemq.broker.ConnectionContext;
037 import org.apache.activemq.broker.ConsumerBrokerExchange;
038 import org.apache.activemq.broker.EmptyBroker;
039 import org.apache.activemq.broker.ProducerBrokerExchange;
040 import org.apache.activemq.broker.TransportConnector;
041 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
042 import org.apache.activemq.broker.region.policy.PolicyMap;
043 import org.apache.activemq.command.ActiveMQDestination;
044 import org.apache.activemq.command.BrokerId;
045 import org.apache.activemq.command.BrokerInfo;
046 import org.apache.activemq.command.ConnectionId;
047 import org.apache.activemq.command.ConnectionInfo;
048 import org.apache.activemq.command.ConsumerControl;
049 import org.apache.activemq.command.ConsumerInfo;
050 import org.apache.activemq.command.DestinationInfo;
051 import org.apache.activemq.command.Message;
052 import org.apache.activemq.command.MessageAck;
053 import org.apache.activemq.command.MessageDispatch;
054 import org.apache.activemq.command.MessageDispatchNotification;
055 import org.apache.activemq.command.MessagePull;
056 import org.apache.activemq.command.ProducerInfo;
057 import org.apache.activemq.command.RemoveSubscriptionInfo;
058 import org.apache.activemq.command.Response;
059 import org.apache.activemq.command.TransactionId;
060 import org.apache.activemq.state.ConnectionState;
061 import org.apache.activemq.store.kahadb.plist.PListStore;
062 import org.apache.activemq.thread.Scheduler;
063 import org.apache.activemq.thread.TaskRunnerFactory;
064 import org.apache.activemq.usage.SystemUsage;
065 import org.apache.activemq.util.BrokerSupport;
066 import org.apache.activemq.util.IdGenerator;
067 import org.apache.activemq.util.InetAddressUtil;
068 import org.apache.activemq.util.LongSequenceGenerator;
069 import org.apache.activemq.util.ServiceStopper;
070 import org.slf4j.Logger;
071 import org.slf4j.LoggerFactory;
072
073 /**
074 * Routes Broker operations to the correct messaging regions for processing.
075 *
076 *
077 */
078 public class RegionBroker extends EmptyBroker {
079 public static final String ORIGINAL_EXPIRATION = "originalExpiration";
080 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
081 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
082
083 protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
084 protected DestinationFactory destinationFactory;
085 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
086
087 private final Region queueRegion;
088 private final Region topicRegion;
089 private final Region tempQueueRegion;
090 private final Region tempTopicRegion;
091 protected final BrokerService brokerService;
092 private boolean started;
093 private boolean keepDurableSubsActive;
094
095 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
096 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
097 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
098
099 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
100 private BrokerId brokerId;
101 private String brokerName;
102 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
103 private final DestinationInterceptor destinationInterceptor;
104 private ConnectionContext adminConnectionContext;
105 private final Scheduler scheduler;
106 private final ThreadPoolExecutor executor;
107
108 private final Runnable purgeInactiveDestinationsTask = new Runnable() {
109 public void run() {
110 purgeInactiveDestinations();
111 }
112 };
113
114 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
115 DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
116 this.brokerService = brokerService;
117 this.executor=executor;
118 this.scheduler = scheduler;
119 if (destinationFactory == null) {
120 throw new IllegalArgumentException("null destinationFactory");
121 }
122 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
123 this.destinationFactory = destinationFactory;
124 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
125 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
126 this.destinationInterceptor = destinationInterceptor;
127 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
128 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
129 }
130
131 @Override
132 public Map<ActiveMQDestination, Destination> getDestinationMap() {
133 Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
134 answer.putAll(getTopicRegion().getDestinationMap());
135 return answer;
136 }
137
138 @Override
139 public Set <Destination> getDestinations(ActiveMQDestination destination) {
140 switch (destination.getDestinationType()) {
141 case ActiveMQDestination.QUEUE_TYPE:
142 return queueRegion.getDestinations(destination);
143 case ActiveMQDestination.TOPIC_TYPE:
144 return topicRegion.getDestinations(destination);
145 case ActiveMQDestination.TEMP_QUEUE_TYPE:
146 return tempQueueRegion.getDestinations(destination);
147 case ActiveMQDestination.TEMP_TOPIC_TYPE:
148 return tempTopicRegion.getDestinations(destination);
149 default:
150 return Collections.emptySet();
151 }
152 }
153
154 @Override
155 public Broker getAdaptor(Class type) {
156 if (type.isInstance(this)) {
157 return this;
158 }
159 return null;
160 }
161
162 public Region getQueueRegion() {
163 return queueRegion;
164 }
165
166 public Region getTempQueueRegion() {
167 return tempQueueRegion;
168 }
169
170 public Region getTempTopicRegion() {
171 return tempTopicRegion;
172 }
173
174 public Region getTopicRegion() {
175 return topicRegion;
176 }
177
178 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
179 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
180 }
181
182 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
183 return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
184 }
185
186 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
187 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
188 }
189
190 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
191 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
192 }
193
194 @Override
195 public void start() throws Exception {
196 ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
197 started = true;
198 queueRegion.start();
199 topicRegion.start();
200 tempQueueRegion.start();
201 tempTopicRegion.start();
202 int period = this.brokerService.getSchedulePeriodForDestinationPurge();
203 if (period > 0) {
204 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
205 }
206 }
207
208 @Override
209 public void stop() throws Exception {
210 started = false;
211 this.scheduler.cancel(purgeInactiveDestinationsTask);
212 ServiceStopper ss = new ServiceStopper();
213 doStop(ss);
214 ss.throwFirstException();
215 // clear the state
216 clientIdSet.clear();
217 connections.clear();
218 destinations.clear();
219 brokerInfos.clear();
220 }
221
222 public PolicyMap getDestinationPolicy() {
223 return brokerService != null ? brokerService.getDestinationPolicy() : null;
224 }
225
226 @Override
227 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
228 String clientId = info.getClientId();
229 if (clientId == null) {
230 throw new InvalidClientIDException("No clientID specified for connection request");
231 }
232 synchronized (clientIdSet) {
233 ConnectionContext oldContext = clientIdSet.get(clientId);
234 if (oldContext != null) {
235 if (context.isFaultTolerant() || context.isNetworkConnection()){
236 //remove the old connection
237 try{
238 removeConnection(oldContext, info, new Exception("remove stale client"));
239 }catch(Exception e){
240 LOG.warn("Failed to remove stale connection ",e);
241 }
242 }else{
243 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
244 + oldContext.getConnection().getRemoteAddress());
245 }
246 } else {
247 clientIdSet.put(clientId, context);
248 }
249 }
250
251 connections.add(context.getConnection());
252 }
253
254 @Override
255 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
256 String clientId = info.getClientId();
257 if (clientId == null) {
258 throw new InvalidClientIDException("No clientID specified for connection disconnect request");
259 }
260 synchronized (clientIdSet) {
261 ConnectionContext oldValue = clientIdSet.get(clientId);
262 // we may be removing the duplicate connection, not the first
263 // connection to be created
264 // so lets check that their connection IDs are the same
265 if (oldValue == context) {
266 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
267 clientIdSet.remove(clientId);
268 }
269 }
270 }
271 connections.remove(context.getConnection());
272 }
273
274 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
275 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
276 }
277
278 @Override
279 public Connection[] getClients() throws Exception {
280 ArrayList<Connection> l = new ArrayList<Connection>(connections);
281 Connection rc[] = new Connection[l.size()];
282 l.toArray(rc);
283 return rc;
284 }
285
286 @Override
287 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
288
289 Destination answer;
290
291 answer = destinations.get(destination);
292 if (answer != null) {
293 return answer;
294 }
295
296 switch (destination.getDestinationType()) {
297 case ActiveMQDestination.QUEUE_TYPE:
298 answer = queueRegion.addDestination(context, destination,true);
299 break;
300 case ActiveMQDestination.TOPIC_TYPE:
301 answer = topicRegion.addDestination(context, destination,true);
302 break;
303 case ActiveMQDestination.TEMP_QUEUE_TYPE:
304 answer = tempQueueRegion.addDestination(context, destination,create);
305 break;
306 case ActiveMQDestination.TEMP_TOPIC_TYPE:
307 answer = tempTopicRegion.addDestination(context, destination,create);
308 break;
309 default:
310 throw createUnknownDestinationTypeException(destination);
311 }
312
313 destinations.put(destination, answer);
314 return answer;
315
316 }
317
318 @Override
319 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
320
321 if (destinations.containsKey(destination)) {
322 switch (destination.getDestinationType()) {
323 case ActiveMQDestination.QUEUE_TYPE:
324 queueRegion.removeDestination(context, destination, timeout);
325 removeAdvisoryTopics("Queue.", context, destination, timeout);
326 break;
327 case ActiveMQDestination.TOPIC_TYPE:
328 topicRegion.removeDestination(context, destination, timeout);
329 removeAdvisoryTopics("Topic.", context, destination, timeout);
330 break;
331 case ActiveMQDestination.TEMP_QUEUE_TYPE:
332 tempQueueRegion.removeDestination(context, destination, timeout);
333 break;
334 case ActiveMQDestination.TEMP_TOPIC_TYPE:
335 tempTopicRegion.removeDestination(context, destination, timeout);
336 break;
337 default:
338 throw createUnknownDestinationTypeException(destination);
339 }
340 destinations.remove(destination);
341
342 }
343
344 }
345
346 public void removeAdvisoryTopics(String destinationType, ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
347 if (this.brokerService.isAdvisorySupport()) {
348 String producerAdvisoryTopic = AdvisorySupport.PRODUCER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
349 String consumerAdvisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
350
351 ActiveMQDestination dests[] = getDestinations();
352 for (ActiveMQDestination dest: dests) {
353 String name = dest.getPhysicalName();
354 if ( name.equals(producerAdvisoryTopic) || name.equals(consumerAdvisoryTopic) ) {
355 try {
356 removeDestination(context, dest, timeout);
357 } catch (JMSException ignore) {
358 // at least ignore the Unknown Destination Type JMSException
359 }
360 }
361 }
362 }
363 }
364
365 @Override
366 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
367 addDestination(context, info.getDestination(),true);
368
369 }
370
371 @Override
372 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
373 removeDestination(context, info.getDestination(), info.getTimeout());
374
375 }
376
377 @Override
378 public ActiveMQDestination[] getDestinations() throws Exception {
379 ArrayList<ActiveMQDestination> l;
380
381 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
382
383 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
384 l.toArray(rc);
385 return rc;
386 }
387
388 @Override
389 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
390 ActiveMQDestination destination = info.getDestination();
391 synchronized (purgeInactiveDestinationsTask) {
392 if (destination != null) {
393
394 // This seems to cause the destination to be added but without
395 // advisories firing...
396 context.getBroker().addDestination(context, destination, false);
397 switch (destination.getDestinationType()) {
398 case ActiveMQDestination.QUEUE_TYPE:
399 queueRegion.addProducer(context, info);
400 break;
401 case ActiveMQDestination.TOPIC_TYPE:
402 topicRegion.addProducer(context, info);
403 break;
404 case ActiveMQDestination.TEMP_QUEUE_TYPE:
405 tempQueueRegion.addProducer(context, info);
406 break;
407 case ActiveMQDestination.TEMP_TOPIC_TYPE:
408 tempTopicRegion.addProducer(context, info);
409 break;
410 }
411 }
412 }
413 }
414
415 @Override
416 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
417 ActiveMQDestination destination = info.getDestination();
418 synchronized (purgeInactiveDestinationsTask) {
419 if (destination != null) {
420 switch (destination.getDestinationType()) {
421 case ActiveMQDestination.QUEUE_TYPE:
422 queueRegion.removeProducer(context, info);
423 break;
424 case ActiveMQDestination.TOPIC_TYPE:
425 topicRegion.removeProducer(context, info);
426 break;
427 case ActiveMQDestination.TEMP_QUEUE_TYPE:
428 tempQueueRegion.removeProducer(context, info);
429 break;
430 case ActiveMQDestination.TEMP_TOPIC_TYPE:
431 tempTopicRegion.removeProducer(context, info);
432 break;
433 }
434 }
435 }
436 }
437
438 @Override
439 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
440 ActiveMQDestination destination = info.getDestination();
441 if (destinationInterceptor != null) {
442 destinationInterceptor.create(this, context, destination);
443 }
444 synchronized (purgeInactiveDestinationsTask) {
445 switch (destination.getDestinationType()) {
446 case ActiveMQDestination.QUEUE_TYPE:
447 return queueRegion.addConsumer(context, info);
448
449 case ActiveMQDestination.TOPIC_TYPE:
450 return topicRegion.addConsumer(context, info);
451
452 case ActiveMQDestination.TEMP_QUEUE_TYPE:
453 return tempQueueRegion.addConsumer(context, info);
454
455 case ActiveMQDestination.TEMP_TOPIC_TYPE:
456 return tempTopicRegion.addConsumer(context, info);
457
458 default:
459 throw createUnknownDestinationTypeException(destination);
460 }
461 }
462 }
463
464 @Override
465 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
466 ActiveMQDestination destination = info.getDestination();
467 synchronized (purgeInactiveDestinationsTask) {
468 switch (destination.getDestinationType()) {
469
470 case ActiveMQDestination.QUEUE_TYPE:
471 queueRegion.removeConsumer(context, info);
472 break;
473 case ActiveMQDestination.TOPIC_TYPE:
474 topicRegion.removeConsumer(context, info);
475 break;
476 case ActiveMQDestination.TEMP_QUEUE_TYPE:
477 tempQueueRegion.removeConsumer(context, info);
478 break;
479 case ActiveMQDestination.TEMP_TOPIC_TYPE:
480 tempTopicRegion.removeConsumer(context, info);
481 break;
482 default:
483 throw createUnknownDestinationTypeException(destination);
484 }
485 }
486 }
487
488 @Override
489 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
490 synchronized (purgeInactiveDestinationsTask) {
491 topicRegion.removeSubscription(context, info);
492 }
493 }
494
495 @Override
496 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
497 message.setBrokerInTime(System.currentTimeMillis());
498 if (producerExchange.isMutable() || producerExchange.getRegion() == null
499 || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
500 ActiveMQDestination destination = message.getDestination();
501 // ensure the destination is registered with the RegionBroker
502 producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
503 Region region;
504 switch (destination.getDestinationType()) {
505 case ActiveMQDestination.QUEUE_TYPE:
506 region = queueRegion;
507 break;
508 case ActiveMQDestination.TOPIC_TYPE:
509 region = topicRegion;
510 break;
511 case ActiveMQDestination.TEMP_QUEUE_TYPE:
512 region = tempQueueRegion;
513 break;
514 case ActiveMQDestination.TEMP_TOPIC_TYPE:
515 region = tempTopicRegion;
516 break;
517 default:
518 throw createUnknownDestinationTypeException(destination);
519 }
520 producerExchange.setRegion(region);
521 producerExchange.setRegionDestination(null);
522 }
523 producerExchange.getRegion().send(producerExchange, message);
524 }
525
526 @Override
527 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
528 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
529 ActiveMQDestination destination = ack.getDestination();
530 Region region;
531 switch (destination.getDestinationType()) {
532 case ActiveMQDestination.QUEUE_TYPE:
533 region = queueRegion;
534 break;
535 case ActiveMQDestination.TOPIC_TYPE:
536 region = topicRegion;
537 break;
538 case ActiveMQDestination.TEMP_QUEUE_TYPE:
539 region = tempQueueRegion;
540 break;
541 case ActiveMQDestination.TEMP_TOPIC_TYPE:
542 region = tempTopicRegion;
543 break;
544 default:
545 throw createUnknownDestinationTypeException(destination);
546 }
547 consumerExchange.setRegion(region);
548 }
549 consumerExchange.getRegion().acknowledge(consumerExchange, ack);
550 }
551
552 @Override
553 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
554 ActiveMQDestination destination = pull.getDestination();
555 switch (destination.getDestinationType()) {
556 case ActiveMQDestination.QUEUE_TYPE:
557 return queueRegion.messagePull(context, pull);
558
559 case ActiveMQDestination.TOPIC_TYPE:
560 return topicRegion.messagePull(context, pull);
561
562 case ActiveMQDestination.TEMP_QUEUE_TYPE:
563 return tempQueueRegion.messagePull(context, pull);
564
565 case ActiveMQDestination.TEMP_TOPIC_TYPE:
566 return tempTopicRegion.messagePull(context, pull);
567 default:
568 throw createUnknownDestinationTypeException(destination);
569 }
570 }
571
572 @Override
573 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
574 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
575 }
576
577 @Override
578 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
579 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
580 }
581
582 @Override
583 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
584 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
585 }
586
587 @Override
588 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
589 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
590 }
591
592 @Override
593 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
594 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
595 }
596
597 @Override
598 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
599 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
600 }
601
602 @Override
603 public void gc() {
604 queueRegion.gc();
605 topicRegion.gc();
606 }
607
608 @Override
609 public BrokerId getBrokerId() {
610 if (brokerId == null) {
611 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
612 }
613 return brokerId;
614 }
615
616 public void setBrokerId(BrokerId brokerId) {
617 this.brokerId = brokerId;
618 }
619
620 @Override
621 public String getBrokerName() {
622 if (brokerName == null) {
623 try {
624 brokerName = InetAddressUtil.getLocalHostName().toLowerCase();
625 } catch (Exception e) {
626 brokerName = "localhost";
627 }
628 }
629 return brokerName;
630 }
631
632 public void setBrokerName(String brokerName) {
633 this.brokerName = brokerName;
634 }
635
636 public DestinationStatistics getDestinationStatistics() {
637 return destinationStatistics;
638 }
639
640 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
641 return new JMSException("Unknown destination type: " + destination.getDestinationType());
642 }
643
644 @Override
645 public synchronized void addBroker(Connection connection, BrokerInfo info) {
646 BrokerInfo existing = brokerInfos.get(info.getBrokerId());
647 if (existing == null) {
648 existing = info.copy();
649 existing.setPeerBrokerInfos(null);
650 brokerInfos.put(info.getBrokerId(), existing);
651 }
652 existing.incrementRefCount();
653 LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
654 addBrokerInClusterUpdate();
655 }
656
657 @Override
658 public synchronized void removeBroker(Connection connection, BrokerInfo info) {
659 if (info != null) {
660 BrokerInfo existing = brokerInfos.get(info.getBrokerId());
661 if (existing != null && existing.decrementRefCount() == 0) {
662 brokerInfos.remove(info.getBrokerId());
663 }
664 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
665 removeBrokerInClusterUpdate();
666 }
667 }
668
669 @Override
670 public synchronized BrokerInfo[] getPeerBrokerInfos() {
671 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
672 result = brokerInfos.values().toArray(result);
673 return result;
674 }
675
676 @Override
677 public void preProcessDispatch(MessageDispatch messageDispatch) {
678 Message message = messageDispatch.getMessage();
679 if (message != null) {
680 long endTime = System.currentTimeMillis();
681 message.setBrokerOutTime(endTime);
682 if (getBrokerService().isEnableStatistics()) {
683 long totalTime = endTime - message.getBrokerInTime();
684 message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
685 }
686 }
687 }
688
689 @Override
690 public void postProcessDispatch(MessageDispatch messageDispatch) {
691 }
692
693 @Override
694 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
695 ActiveMQDestination destination = messageDispatchNotification.getDestination();
696 switch (destination.getDestinationType()) {
697 case ActiveMQDestination.QUEUE_TYPE:
698 queueRegion.processDispatchNotification(messageDispatchNotification);
699 break;
700 case ActiveMQDestination.TOPIC_TYPE:
701 topicRegion.processDispatchNotification(messageDispatchNotification);
702 break;
703 case ActiveMQDestination.TEMP_QUEUE_TYPE:
704 tempQueueRegion.processDispatchNotification(messageDispatchNotification);
705 break;
706 case ActiveMQDestination.TEMP_TOPIC_TYPE:
707 tempTopicRegion.processDispatchNotification(messageDispatchNotification);
708 break;
709 default:
710 throw createUnknownDestinationTypeException(destination);
711 }
712 }
713
714 public boolean isSlaveBroker() {
715 return brokerService.isSlave();
716 }
717
718 @Override
719 public boolean isStopped() {
720 return !started;
721 }
722
723 @Override
724 public Set<ActiveMQDestination> getDurableDestinations() {
725 return destinationFactory.getDestinations();
726 }
727
728 protected void doStop(ServiceStopper ss) {
729 ss.stop(queueRegion);
730 ss.stop(topicRegion);
731 ss.stop(tempQueueRegion);
732 ss.stop(tempTopicRegion);
733 }
734
735 public boolean isKeepDurableSubsActive() {
736 return keepDurableSubsActive;
737 }
738
739 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
740 this.keepDurableSubsActive = keepDurableSubsActive;
741 }
742
743 public DestinationInterceptor getDestinationInterceptor() {
744 return destinationInterceptor;
745 }
746
747 @Override
748 public ConnectionContext getAdminConnectionContext() {
749 return adminConnectionContext;
750 }
751
752 @Override
753 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
754 this.adminConnectionContext = adminConnectionContext;
755 }
756
757 public Map<ConnectionId, ConnectionState> getConnectionStates() {
758 return connectionStates;
759 }
760
761 @Override
762 public PListStore getTempDataStore() {
763 return brokerService.getTempDataStore();
764 }
765
766 @Override
767 public URI getVmConnectorURI() {
768 return brokerService.getVmConnectorURI();
769 }
770
771 @Override
772 public void brokerServiceStarted() {
773 }
774
775 @Override
776 public BrokerService getBrokerService() {
777 return brokerService;
778 }
779
780 @Override
781 public boolean isExpired(MessageReference messageReference) {
782 boolean expired = false;
783 if (messageReference.isExpired()) {
784 try {
785 // prevent duplicate expiry processing
786 Message message = messageReference.getMessage();
787 synchronized (message) {
788 expired = stampAsExpired(message);
789 }
790 } catch (IOException e) {
791 LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
792 }
793 }
794 return expired;
795 }
796
797 private boolean stampAsExpired(Message message) throws IOException {
798 boolean stamped=false;
799 if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
800 long expiration=message.getExpiration();
801 message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
802 stamped = true;
803 }
804 return stamped;
805 }
806
807
808 @Override
809 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
810 if (LOG.isDebugEnabled()) {
811 LOG.debug("Message expired " + node);
812 }
813 getRoot().sendToDeadLetterQueue(context, node, subscription);
814 }
815
816 @Override
817 public void sendToDeadLetterQueue(ConnectionContext context,
818 MessageReference node, Subscription subscription){
819 try{
820 if(node!=null){
821 Message message=node.getMessage();
822 if(message!=null && node.getRegionDestination()!=null){
823 DeadLetterStrategy deadLetterStrategy=node
824 .getRegionDestination().getDeadLetterStrategy();
825 if(deadLetterStrategy!=null){
826 if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
827 // message may be inflight to other subscriptions so do not modify
828 message = message.copy();
829 stampAsExpired(message);
830 message.setExpiration(0);
831 if(!message.isPersistent()){
832 message.setPersistent(true);
833 message.setProperty("originalDeliveryMode",
834 "NON_PERSISTENT");
835 }
836 // The original destination and transaction id do
837 // not get filled when the message is first sent,
838 // it is only populated if the message is routed to
839 // another destination like the DLQ
840 ActiveMQDestination deadLetterDestination=deadLetterStrategy
841 .getDeadLetterQueueFor(message, subscription);
842 if (context.getBroker()==null) {
843 context.setBroker(getRoot());
844 }
845 BrokerSupport.resendNoCopy(context,message,
846 deadLetterDestination);
847 }
848 } else {
849 if (LOG.isDebugEnabled()) {
850 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
851 + message.getMessageId() + ", destination: " + message.getDestination());
852 }
853 }
854 }
855 }
856 }catch(Exception e){
857 LOG.warn("Caught an exception sending to DLQ: "+node,e);
858 }
859 }
860
861 @Override
862 public Broker getRoot() {
863 try {
864 return getBrokerService().getBroker();
865 } catch (Exception e) {
866 LOG.error("Trying to get Root Broker " + e);
867 throw new RuntimeException("The broker from the BrokerService should not throw an exception");
868 }
869 }
870
871 /**
872 * @return the broker sequence id
873 */
874 @Override
875 public long getBrokerSequenceId() {
876 synchronized(sequenceGenerator) {
877 return sequenceGenerator.getNextSequenceId();
878 }
879 }
880
881
882 @Override
883 public Scheduler getScheduler() {
884 return this.scheduler;
885 }
886
887 public ThreadPoolExecutor getExecutor() {
888 return this.executor;
889 }
890
891 @Override
892 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
893 ActiveMQDestination destination = control.getDestination();
894 switch (destination.getDestinationType()) {
895 case ActiveMQDestination.QUEUE_TYPE:
896 queueRegion.processConsumerControl(consumerExchange, control);
897 break;
898
899 case ActiveMQDestination.TOPIC_TYPE:
900 topicRegion.processConsumerControl(consumerExchange, control);
901 break;
902
903 case ActiveMQDestination.TEMP_QUEUE_TYPE:
904 tempQueueRegion.processConsumerControl(consumerExchange, control);
905 break;
906
907 case ActiveMQDestination.TEMP_TOPIC_TYPE:
908 tempTopicRegion.processConsumerControl(consumerExchange, control);
909 break;
910
911 default:
912 LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
913 }
914 }
915
916 protected void addBrokerInClusterUpdate() {
917 List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
918 for (TransportConnector connector : connectors) {
919 if (connector.isUpdateClusterClients()) {
920 connector.updateClientClusterInfo();
921 }
922 }
923 }
924
925 protected void removeBrokerInClusterUpdate() {
926 List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
927 for (TransportConnector connector : connectors) {
928 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
929 connector.updateClientClusterInfo();
930 }
931 }
932 }
933
934 protected void purgeInactiveDestinations() {
935 synchronized (purgeInactiveDestinationsTask) {
936 List<BaseDestination> list = new ArrayList<BaseDestination>();
937 Map<ActiveMQDestination, Destination> map = getDestinationMap();
938 long timeStamp = System.currentTimeMillis();
939 for (Destination d : map.values()) {
940 if (d instanceof BaseDestination) {
941 BaseDestination bd = (BaseDestination) d;
942 bd.markForGC(timeStamp);
943 if (bd.canGC()) {
944 list.add(bd);
945 }
946 }
947 }
948
949 if (list.isEmpty() == false) {
950
951 ConnectionContext context = BrokerSupport.getConnectionContext(this);
952 context.setBroker(this);
953
954 for (BaseDestination dest : list) {
955 dest.getLog().info(
956 dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
957 + " ms - removing ...");
958 try {
959 getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
960 } catch (Exception e) {
961 LOG.error("Failed to remove inactive destination " + dest, e);
962 }
963 }
964 }
965 }
966 }
967 }