001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Hashtable;
023 import java.util.Iterator;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Set;
027 import java.util.Map.Entry;
028 import java.util.concurrent.ConcurrentHashMap;
029 import java.util.concurrent.CopyOnWriteArraySet;
030 import java.util.concurrent.ThreadPoolExecutor;
031 import javax.management.InstanceNotFoundException;
032 import javax.management.MalformedObjectNameException;
033 import javax.management.ObjectName;
034 import javax.management.openmbean.CompositeData;
035 import javax.management.openmbean.CompositeDataSupport;
036 import javax.management.openmbean.CompositeType;
037 import javax.management.openmbean.OpenDataException;
038 import javax.management.openmbean.TabularData;
039 import javax.management.openmbean.TabularDataSupport;
040 import javax.management.openmbean.TabularType;
041 import org.apache.activemq.broker.Broker;
042 import org.apache.activemq.broker.BrokerService;
043 import org.apache.activemq.broker.ConnectionContext;
044 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
045 import org.apache.activemq.broker.region.Destination;
046 import org.apache.activemq.broker.region.DestinationFactory;
047 import org.apache.activemq.broker.region.DestinationFactoryImpl;
048 import org.apache.activemq.broker.region.DestinationInterceptor;
049 import org.apache.activemq.broker.region.Queue;
050 import org.apache.activemq.broker.region.Region;
051 import org.apache.activemq.broker.region.RegionBroker;
052 import org.apache.activemq.broker.region.Subscription;
053 import org.apache.activemq.broker.region.Topic;
054 import org.apache.activemq.broker.region.TopicRegion;
055 import org.apache.activemq.broker.region.TopicSubscription;
056 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
057 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
058 import org.apache.activemq.command.ActiveMQDestination;
059 import org.apache.activemq.command.ActiveMQMessage;
060 import org.apache.activemq.command.ActiveMQTopic;
061 import org.apache.activemq.command.ConsumerInfo;
062 import org.apache.activemq.command.Message;
063 import org.apache.activemq.command.MessageId;
064 import org.apache.activemq.command.SubscriptionInfo;
065 import org.apache.activemq.store.MessageRecoveryListener;
066 import org.apache.activemq.store.PersistenceAdapter;
067 import org.apache.activemq.store.TopicMessageStore;
068 import org.apache.activemq.thread.Scheduler;
069 import org.apache.activemq.thread.TaskRunnerFactory;
070 import org.apache.activemq.usage.SystemUsage;
071 import org.apache.activemq.util.JMXSupport;
072 import org.apache.activemq.util.ServiceStopper;
073 import org.apache.activemq.util.SubscriptionKey;
074 import org.slf4j.Logger;
075 import org.slf4j.LoggerFactory;
076
077 public class ManagedRegionBroker extends RegionBroker {
078 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
079 private final ManagementContext managementContext;
080 private final ObjectName brokerObjectName;
081 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
082 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
083 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
084 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
085 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
086 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
087 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
088 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
092 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
093 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
094 /* This is the first broker in the broker interceptor chain. */
095 private Broker contextBroker;
096
097 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
098 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
099 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
100 this.managementContext = context;
101 this.brokerObjectName = brokerObjectName;
102 }
103
104 @Override
105 public void start() throws Exception {
106 super.start();
107 // build all existing durable subscriptions
108 buildExistingSubscriptions();
109 }
110
111 @Override
112 protected void doStop(ServiceStopper stopper) {
113 super.doStop(stopper);
114 // lets remove any mbeans not yet removed
115 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
116 ObjectName name = iter.next();
117 try {
118 managementContext.unregisterMBean(name);
119 } catch (InstanceNotFoundException e) {
120 LOG.warn("The MBean: " + name + " is no longer registered with JMX");
121 } catch (Exception e) {
122 stopper.onException(this, e);
123 }
124 }
125 registeredMBeans.clear();
126 }
127
128 @Override
129 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
130 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
131 }
132
133 @Override
134 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
135 return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
136 }
137
138 @Override
139 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
140 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
141 }
142
143 @Override
144 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
145 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
146 }
147
148 public void register(ActiveMQDestination destName, Destination destination) {
149 // TODO refactor to allow views for custom destinations
150 try {
151 ObjectName objectName = createObjectName(destName);
152 DestinationView view;
153 if (destination instanceof Queue) {
154 view = new QueueView(this, (Queue)destination);
155 } else if (destination instanceof Topic) {
156 view = new TopicView(this, (Topic)destination);
157 } else {
158 view = null;
159 LOG.warn("JMX View is not supported for custom destination: " + destination);
160 }
161 if (view != null) {
162 registerDestination(objectName, destName, view);
163 }
164 } catch (Exception e) {
165 LOG.error("Failed to register destination " + destName, e);
166 }
167 }
168
169 public void unregister(ActiveMQDestination destName) {
170 try {
171 ObjectName objectName = createObjectName(destName);
172 unregisterDestination(objectName);
173 } catch (Exception e) {
174 LOG.error("Failed to unregister " + destName, e);
175 }
176 }
177
178 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
179 String connectionClientId = context.getClientId();
180 ObjectName brokerJmxObjectName = brokerObjectName;
181 String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
182 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
183 try {
184 ObjectName objectName = new ObjectName(objectNameStr);
185 SubscriptionView view;
186 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
187 // add offline subscribers to inactive list
188 SubscriptionInfo info = new SubscriptionInfo();
189 info.setClientId(context.getClientId());
190 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
191 info.setDestination(sub.getConsumerInfo().getDestination());
192 addInactiveSubscription(key, info);
193 } else {
194 if (sub.getConsumerInfo().isDurable()) {
195 view = new DurableSubscriptionView(this, context.getClientId(), sub);
196 } else {
197 if (sub instanceof TopicSubscription) {
198 view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
199 } else {
200 view = new SubscriptionView(context.getClientId(), sub);
201 }
202 }
203 registerSubscription(objectName, sub.getConsumerInfo(), key, view);
204 }
205 subscriptionMap.put(sub, objectName);
206 return objectName;
207 } catch (Exception e) {
208 LOG.error("Failed to register subscription " + sub, e);
209 return null;
210 }
211 }
212
213 public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
214 Hashtable map = brokerJmxObjectName.getKeyPropertyList();
215 String brokerDomain = brokerJmxObjectName.getDomain();
216 String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
217 String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
218 String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
219 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
220 String persistentMode = "persistentMode=";
221 String consumerId = "";
222 if (sub.getConsumerInfo().isDurable()) {
223 persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
224 } else {
225 persistentMode += "Non-Durable";
226 if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
227 consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
228 }
229 }
230 objectNameStr += persistentMode + ",";
231 objectNameStr += destinationType + ",";
232 objectNameStr += destinationName + ",";
233 objectNameStr += clientId;
234 objectNameStr += consumerId;
235 return objectNameStr;
236 }
237
238 @Override
239 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
240 Subscription sub = super.addConsumer(context, info);
241 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
242 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
243 if (inactiveName != null) {
244 // if it was inactive, register it
245 registerSubscription(context, sub);
246 }
247 return sub;
248 }
249
250 @Override
251 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
252 for (Subscription sub : subscriptionMap.keySet()) {
253 if (sub.getConsumerInfo().equals(info)) {
254 // unregister all consumer subs
255 unregisterSubscription(subscriptionMap.get(sub), true);
256 }
257 }
258 super.removeConsumer(context, info);
259 }
260
261 public void unregisterSubscription(Subscription sub) {
262 ObjectName name = subscriptionMap.remove(sub);
263 if (name != null) {
264 try {
265 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
266 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
267 if (inactiveName != null) {
268 inactiveDurableTopicSubscribers.remove(inactiveName);
269 managementContext.unregisterMBean(inactiveName);
270 }
271 } catch (Exception e) {
272 LOG.error("Failed to unregister subscription " + sub, e);
273 }
274 }
275 }
276
277 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
278 if (dest.isQueue()) {
279 if (dest.isTemporary()) {
280 temporaryQueues.put(key, view);
281 } else {
282 queues.put(key, view);
283 }
284 } else {
285 if (dest.isTemporary()) {
286 temporaryTopics.put(key, view);
287 } else {
288 topics.put(key, view);
289 }
290 }
291 try {
292 AnnotatedMBean.registerMBean(managementContext, view, key);
293 registeredMBeans.add(key);
294 } catch (Throwable e) {
295 LOG.warn("Failed to register MBean: " + key);
296 LOG.debug("Failure reason: " + e, e);
297 }
298 }
299
300 protected void unregisterDestination(ObjectName key) throws Exception {
301
302 DestinationView view = null;
303 removeAndRemember(topics, key, view);
304 removeAndRemember(queues, key, view);
305 removeAndRemember(temporaryQueues, key, view);
306 removeAndRemember(temporaryTopics, key, view);
307 if (registeredMBeans.remove(key)) {
308 try {
309 managementContext.unregisterMBean(key);
310 } catch (Throwable e) {
311 LOG.warn("Failed to unregister MBean: " + key);
312 LOG.debug("Failure reason: " + e, e);
313 }
314 }
315 if (view != null) {
316 key = view.getSlowConsumerStrategy();
317 if (key!= null && registeredMBeans.remove(key)) {
318 try {
319 managementContext.unregisterMBean(key);
320 } catch (Throwable e) {
321 LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
322 LOG.debug("Failure reason: " + e, e);
323 }
324 }
325 }
326 }
327
328 private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
329 DestinationView candidate = map.remove(key);
330 if (candidate != null && view == null) {
331 view = candidate;
332 }
333 }
334
335 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
336 ActiveMQDestination dest = info.getDestination();
337 if (dest.isQueue()) {
338 if (dest.isTemporary()) {
339 temporaryQueueSubscribers.put(key, view);
340 } else {
341 queueSubscribers.put(key, view);
342 }
343 } else {
344 if (dest.isTemporary()) {
345 temporaryTopicSubscribers.put(key, view);
346 } else {
347 if (info.isDurable()) {
348 durableTopicSubscribers.put(key, view);
349 // unregister any inactive durable subs
350 try {
351 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
352 if (inactiveName != null) {
353 inactiveDurableTopicSubscribers.remove(inactiveName);
354 registeredMBeans.remove(inactiveName);
355 managementContext.unregisterMBean(inactiveName);
356 }
357 } catch (Throwable e) {
358 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
359 }
360 } else {
361 topicSubscribers.put(key, view);
362 }
363 }
364 }
365
366 try {
367 AnnotatedMBean.registerMBean(managementContext, view, key);
368 registeredMBeans.add(key);
369 } catch (Throwable e) {
370 LOG.warn("Failed to register MBean: " + key);
371 LOG.debug("Failure reason: " + e, e);
372 }
373
374 }
375
376 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
377 queueSubscribers.remove(key);
378 topicSubscribers.remove(key);
379 temporaryQueueSubscribers.remove(key);
380 temporaryTopicSubscribers.remove(key);
381 if (registeredMBeans.remove(key)) {
382 try {
383 managementContext.unregisterMBean(key);
384 } catch (Throwable e) {
385 LOG.warn("Failed to unregister MBean: " + key);
386 LOG.debug("Failure reason: " + e, e);
387 }
388 }
389 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
390 if (view != null) {
391 // need to put this back in the inactive list
392 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
393 if (addToInactive) {
394 SubscriptionInfo info = new SubscriptionInfo();
395 info.setClientId(subscriptionKey.getClientId());
396 info.setSubscriptionName(subscriptionKey.getSubscriptionName());
397 info.setDestination(new ActiveMQTopic(view.getDestinationName()));
398 addInactiveSubscription(subscriptionKey, info);
399 }
400 }
401 }
402
403 protected void buildExistingSubscriptions() throws Exception {
404 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
405 Set destinations = destinationFactory.getDestinations();
406 if (destinations != null) {
407 for (Iterator iter = destinations.iterator(); iter.hasNext();) {
408 ActiveMQDestination dest = (ActiveMQDestination)iter.next();
409 if (dest.isTopic()) {
410 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
411 if (infos != null) {
412 for (int i = 0; i < infos.length; i++) {
413 SubscriptionInfo info = infos[i];
414 SubscriptionKey key = new SubscriptionKey(info);
415 if (!alreadyKnown(key)) {
416 LOG.debug("Restoring durable subscription mbean: " + info);
417 subscriptions.put(key, info);
418 }
419 }
420 }
421 }
422 }
423 }
424 for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) {
425 Map.Entry entry = (Entry)i.next();
426 SubscriptionKey key = (SubscriptionKey)entry.getKey();
427 SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
428 addInactiveSubscription(key, info);
429 }
430 }
431
432 private boolean alreadyKnown(SubscriptionKey key) {
433 boolean known = false;
434 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
435 if (LOG.isTraceEnabled()) {
436 LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
437 }
438 return known;
439 }
440
441 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
442 Hashtable map = brokerObjectName.getKeyPropertyList();
443 try {
444 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
445 + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
446 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
447
448 try {
449 AnnotatedMBean.registerMBean(managementContext, view, objectName);
450 registeredMBeans.add(objectName);
451 } catch (Throwable e) {
452 LOG.warn("Failed to register MBean: " + key);
453 LOG.debug("Failure reason: " + e, e);
454 }
455
456 inactiveDurableTopicSubscribers.put(objectName, view);
457 subscriptionKeys.put(key, objectName);
458 } catch (Exception e) {
459 LOG.error("Failed to register subscription " + info, e);
460 }
461 }
462
463 public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
464 List<Message> messages = getSubscriberMessages(view);
465 CompositeData c[] = new CompositeData[messages.size()];
466 for (int i = 0; i < c.length; i++) {
467 try {
468 c[i] = OpenTypeSupport.convert(messages.get(i));
469 } catch (Throwable e) {
470 LOG.error("failed to browse : " + view, e);
471 }
472 }
473 return c;
474 }
475
476 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
477 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
478 List<Message> messages = getSubscriberMessages(view);
479 CompositeType ct = factory.getCompositeType();
480 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
481 TabularDataSupport rc = new TabularDataSupport(tt);
482 for (int i = 0; i < messages.size(); i++) {
483 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
484 }
485 return rc;
486 }
487
488 protected List<Message> getSubscriberMessages(SubscriptionView view) {
489 // TODO It is very dangerous operation for big backlogs
490 if (!(destinationFactory instanceof DestinationFactoryImpl)) {
491 throw new RuntimeException("unsupported by " + destinationFactory);
492 }
493 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
494 final List<Message> result = new ArrayList<Message>();
495 try {
496 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
497 TopicMessageStore store = adapter.createTopicMessageStore(topic);
498 store.recover(new MessageRecoveryListener() {
499 public boolean recoverMessage(Message message) throws Exception {
500 result.add(message);
501 return true;
502 }
503
504 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
505 throw new RuntimeException("Should not be called.");
506 }
507
508 public boolean hasSpace() {
509 return true;
510 }
511
512 public boolean isDuplicate(MessageId id) {
513 return false;
514 }
515 });
516 } catch (Throwable e) {
517 LOG.error("Failed to browse messages for Subscription " + view, e);
518 }
519 return result;
520
521 }
522
523 protected ObjectName[] getTopics() {
524 Set<ObjectName> set = topics.keySet();
525 return set.toArray(new ObjectName[set.size()]);
526 }
527
528 protected ObjectName[] getQueues() {
529 Set<ObjectName> set = queues.keySet();
530 return set.toArray(new ObjectName[set.size()]);
531 }
532
533 protected ObjectName[] getTemporaryTopics() {
534 Set<ObjectName> set = temporaryTopics.keySet();
535 return set.toArray(new ObjectName[set.size()]);
536 }
537
538 protected ObjectName[] getTemporaryQueues() {
539 Set<ObjectName> set = temporaryQueues.keySet();
540 return set.toArray(new ObjectName[set.size()]);
541 }
542
543 protected ObjectName[] getTopicSubscribers() {
544 Set<ObjectName> set = topicSubscribers.keySet();
545 return set.toArray(new ObjectName[set.size()]);
546 }
547
548 protected ObjectName[] getDurableTopicSubscribers() {
549 Set<ObjectName> set = durableTopicSubscribers.keySet();
550 return set.toArray(new ObjectName[set.size()]);
551 }
552
553 protected ObjectName[] getQueueSubscribers() {
554 Set<ObjectName> set = queueSubscribers.keySet();
555 return set.toArray(new ObjectName[set.size()]);
556 }
557
558 protected ObjectName[] getTemporaryTopicSubscribers() {
559 Set<ObjectName> set = temporaryTopicSubscribers.keySet();
560 return set.toArray(new ObjectName[set.size()]);
561 }
562
563 protected ObjectName[] getTemporaryQueueSubscribers() {
564 Set<ObjectName> set = temporaryQueueSubscribers.keySet();
565 return set.toArray(new ObjectName[set.size()]);
566 }
567
568 protected ObjectName[] getInactiveDurableTopicSubscribers() {
569 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
570 return set.toArray(new ObjectName[set.size()]);
571 }
572
573 public Broker getContextBroker() {
574 return contextBroker;
575 }
576
577 public void setContextBroker(Broker contextBroker) {
578 this.contextBroker = contextBroker;
579 }
580
581 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
582 // Build the object name for the destination
583 Hashtable map = brokerObjectName.getKeyPropertyList();
584 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
585 + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
586 + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
587 return objectName;
588 }
589
590 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
591 ObjectName objectName = null;
592 try {
593 objectName = createObjectName(strategy);
594 if (!registeredMBeans.contains(objectName)) {
595 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
596 AnnotatedMBean.registerMBean(managementContext, view, objectName);
597 registeredMBeans.add(objectName);
598 }
599 } catch (Exception e) {
600 LOG.warn("Failed to register MBean: " + strategy);
601 LOG.debug("Failure reason: " + e, e);
602 }
603 return objectName;
604 }
605
606 private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
607 Hashtable map = brokerObjectName.getKeyPropertyList();
608 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
609 + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
610 return objectName;
611 }
612
613 public ObjectName getSubscriberObjectName(Subscription key) {
614 return subscriptionMap.get(key);
615 }
616
617 public Subscription getSubscriber(ObjectName key) {
618 Subscription sub = null;
619 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
620 if (entry.getValue().equals(key)) {
621 sub = entry.getKey();
622 break;
623 }
624 }
625 return sub;
626 }
627 }