001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.net.UnknownHostException;
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.CountDownLatch;
032 import java.util.concurrent.LinkedBlockingQueue;
033 import java.util.concurrent.SynchronousQueue;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.ThreadPoolExecutor;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.atomic.AtomicBoolean;
038
039 import javax.annotation.PostConstruct;
040 import javax.annotation.PreDestroy;
041 import javax.management.MalformedObjectNameException;
042 import javax.management.ObjectName;
043
044 import org.apache.activemq.ActiveMQConnectionMetaData;
045 import org.apache.activemq.ConfigurationException;
046 import org.apache.activemq.Service;
047 import org.apache.activemq.advisory.AdvisoryBroker;
048 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
049 import org.apache.activemq.broker.ft.MasterConnector;
050 import org.apache.activemq.broker.jmx.AnnotatedMBean;
051 import org.apache.activemq.broker.jmx.BrokerView;
052 import org.apache.activemq.broker.jmx.ConnectorView;
053 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
054 import org.apache.activemq.broker.jmx.FTConnectorView;
055 import org.apache.activemq.broker.jmx.JmsConnectorView;
056 import org.apache.activemq.broker.jmx.JobSchedulerView;
057 import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
058 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
059 import org.apache.activemq.broker.jmx.ManagementContext;
060 import org.apache.activemq.broker.jmx.NetworkConnectorView;
061 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
062 import org.apache.activemq.broker.jmx.ProxyConnectorView;
063 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
064 import org.apache.activemq.broker.region.Destination;
065 import org.apache.activemq.broker.region.DestinationFactory;
066 import org.apache.activemq.broker.region.DestinationFactoryImpl;
067 import org.apache.activemq.broker.region.DestinationInterceptor;
068 import org.apache.activemq.broker.region.RegionBroker;
069 import org.apache.activemq.broker.region.policy.PolicyMap;
070 import org.apache.activemq.broker.region.virtual.MirroredQueue;
071 import org.apache.activemq.broker.region.virtual.VirtualDestination;
072 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
073 import org.apache.activemq.broker.region.virtual.VirtualTopic;
074 import org.apache.activemq.broker.scheduler.SchedulerBroker;
075 import org.apache.activemq.command.ActiveMQDestination;
076 import org.apache.activemq.command.BrokerId;
077 import org.apache.activemq.network.ConnectionFilter;
078 import org.apache.activemq.network.DiscoveryNetworkConnector;
079 import org.apache.activemq.network.NetworkConnector;
080 import org.apache.activemq.network.jms.JmsConnector;
081 import org.apache.activemq.proxy.ProxyConnector;
082 import org.apache.activemq.security.MessageAuthorizationPolicy;
083 import org.apache.activemq.selector.SelectorParser;
084 import org.apache.activemq.store.PersistenceAdapter;
085 import org.apache.activemq.store.PersistenceAdapterFactory;
086 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
087 import org.apache.activemq.store.kahadb.plist.PListStore;
088 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
089 import org.apache.activemq.thread.Scheduler;
090 import org.apache.activemq.thread.TaskRunnerFactory;
091 import org.apache.activemq.transport.TransportFactory;
092 import org.apache.activemq.transport.TransportServer;
093 import org.apache.activemq.transport.vm.VMTransportFactory;
094 import org.apache.activemq.usage.SystemUsage;
095 import org.apache.activemq.util.*;
096 import org.slf4j.Logger;
097 import org.slf4j.LoggerFactory;
098 import org.slf4j.MDC;
099
100 /**
101 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
102 * number of transport connectors, network connectors and a bunch of properties
103 * which can be used to configure the broker as its lazily created.
104 *
105 *
106 * @org.apache.xbean.XBean
107 */
108 public class BrokerService implements Service {
109 protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
110 public static final String DEFAULT_PORT = "61616";
111 public static final String LOCAL_HOST_NAME;
112 public static final String DEFAULT_BROKER_NAME = "localhost";
113 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
114 private static final long serialVersionUID = 7353129142305630237L;
115 private boolean useJmx = true;
116 private boolean enableStatistics = true;
117 private boolean persistent = true;
118 private boolean populateJMSXUserID;
119 private boolean useAuthenticatedPrincipalForJMSXUserID;
120
121 private boolean useShutdownHook = true;
122 private boolean useLoggingForShutdownErrors;
123 private boolean shutdownOnMasterFailure;
124 private boolean shutdownOnSlaveFailure;
125 private boolean waitForSlave;
126 private long waitForSlaveTimeout = 600000L;
127 private boolean passiveSlave;
128 private String brokerName = DEFAULT_BROKER_NAME;
129 private File dataDirectoryFile;
130 private File tmpDataDirectory;
131 private Broker broker;
132 private BrokerView adminView;
133 private ManagementContext managementContext;
134 private ObjectName brokerObjectName;
135 private TaskRunnerFactory taskRunnerFactory;
136 private TaskRunnerFactory persistenceTaskRunnerFactory;
137 private SystemUsage systemUsage;
138 private SystemUsage producerSystemUsage;
139 private SystemUsage consumerSystemUsaage;
140 private PersistenceAdapter persistenceAdapter;
141 private PersistenceAdapterFactory persistenceFactory;
142 protected DestinationFactory destinationFactory;
143 private MessageAuthorizationPolicy messageAuthorizationPolicy;
144 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
145 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
146 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
147 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
148 private final List<Service> services = new ArrayList<Service>();
149 private MasterConnector masterConnector;
150 private String masterConnectorURI;
151 private transient Thread shutdownHook;
152 private String[] transportConnectorURIs;
153 private String[] networkConnectorURIs;
154 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
155 // to other jms messaging
156 // systems
157 private boolean deleteAllMessagesOnStartup;
158 private boolean advisorySupport = true;
159 private URI vmConnectorURI;
160 private String defaultSocketURIString;
161 private PolicyMap destinationPolicy;
162 private final AtomicBoolean started = new AtomicBoolean(false);
163 private final AtomicBoolean stopped = new AtomicBoolean(false);
164 private BrokerPlugin[] plugins;
165 private boolean keepDurableSubsActive = true;
166 private boolean useVirtualTopics = true;
167 private boolean useMirroredQueues = false;
168 private boolean useTempMirroredQueues = true;
169 private BrokerId brokerId;
170 private DestinationInterceptor[] destinationInterceptors;
171 private ActiveMQDestination[] destinations;
172 private PListStore tempDataStore;
173 private int persistenceThreadPriority = Thread.MAX_PRIORITY;
174 private boolean useLocalHostBrokerName;
175 private final CountDownLatch stoppedLatch = new CountDownLatch(1);
176 private final CountDownLatch startedLatch = new CountDownLatch(1);
177 private boolean supportFailOver;
178 private Broker regionBroker;
179 private int producerSystemUsagePortion = 60;
180 private int consumerSystemUsagePortion = 40;
181 private boolean splitSystemUsageForProducersConsumers;
182 private boolean monitorConnectionSplits = false;
183 private int taskRunnerPriority = Thread.NORM_PRIORITY;
184 private boolean dedicatedTaskRunner;
185 private boolean cacheTempDestinations = false;// useful for failover
186 private int timeBeforePurgeTempDestinations = 5000;
187 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
188 private boolean systemExitOnShutdown;
189 private int systemExitOnShutdownExitCode;
190 private SslContext sslContext;
191 private boolean forceStart = false;
192 private IOExceptionHandler ioExceptionHandler;
193 private boolean schedulerSupport = false;
194 private File schedulerDirectoryFile;
195 private Scheduler scheduler;
196 private ThreadPoolExecutor executor;
197 private boolean slave = true;
198 private int schedulePeriodForDestinationPurge=5000;
199 private BrokerContext brokerContext;
200 private boolean networkConnectorStartAsync = false;
201
202 static {
203 String localHostName = "localhost";
204 try {
205 localHostName = InetAddressUtil.getLocalHostName();
206 } catch (UnknownHostException e) {
207 LOG.error("Failed to resolve localhost");
208 }
209 LOCAL_HOST_NAME = localHostName;
210 }
211
212 @Override
213 public String toString() {
214 return "BrokerService[" + getBrokerName() + "]";
215 }
216
217 /**
218 * Adds a new transport connector for the given bind address
219 *
220 * @return the newly created and added transport connector
221 * @throws Exception
222 */
223 public TransportConnector addConnector(String bindAddress) throws Exception {
224 return addConnector(new URI(bindAddress));
225 }
226
227 /**
228 * Adds a new transport connector for the given bind address
229 *
230 * @return the newly created and added transport connector
231 * @throws Exception
232 */
233 public TransportConnector addConnector(URI bindAddress) throws Exception {
234 return addConnector(createTransportConnector(bindAddress));
235 }
236
237 /**
238 * Adds a new transport connector for the given TransportServer transport
239 *
240 * @return the newly created and added transport connector
241 * @throws Exception
242 */
243 public TransportConnector addConnector(TransportServer transport) throws Exception {
244 return addConnector(new TransportConnector(transport));
245 }
246
247 /**
248 * Adds a new transport connector
249 *
250 * @return the transport connector
251 * @throws Exception
252 */
253 public TransportConnector addConnector(TransportConnector connector) throws Exception {
254 transportConnectors.add(connector);
255 return connector;
256 }
257
258 /**
259 * Stops and removes a transport connector from the broker.
260 *
261 * @param connector
262 * @return true if the connector has been previously added to the broker
263 * @throws Exception
264 */
265 public boolean removeConnector(TransportConnector connector) throws Exception {
266 boolean rc = transportConnectors.remove(connector);
267 if (rc) {
268 unregisterConnectorMBean(connector);
269 }
270 return rc;
271 }
272
273 /**
274 * Adds a new network connector using the given discovery address
275 *
276 * @return the newly created and added network connector
277 * @throws Exception
278 */
279 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
280 return addNetworkConnector(new URI(discoveryAddress));
281 }
282
283 /**
284 * Adds a new proxy connector using the given bind address
285 *
286 * @return the newly created and added network connector
287 * @throws Exception
288 */
289 public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
290 return addProxyConnector(new URI(bindAddress));
291 }
292
293 /**
294 * Adds a new network connector using the given discovery address
295 *
296 * @return the newly created and added network connector
297 * @throws Exception
298 */
299 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
300 if (!isAdvisorySupport()) {
301 throw new javax.jms.IllegalStateException(
302 "Networks require advisory messages to function - advisories are currently disabled");
303 }
304 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
305 return addNetworkConnector(connector);
306 }
307
308 /**
309 * Adds a new proxy connector using the given bind address
310 *
311 * @return the newly created and added network connector
312 * @throws Exception
313 */
314 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
315 ProxyConnector connector = new ProxyConnector();
316 connector.setBind(bindAddress);
317 connector.setRemote(new URI("fanout:multicast://default"));
318 return addProxyConnector(connector);
319 }
320
321 /**
322 * Adds a new network connector to connect this broker to a federated
323 * network
324 */
325 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
326 connector.setBrokerService(this);
327 URI uri = getVmConnectorURI();
328 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
329 map.put("network", "true");
330 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
331 connector.setLocalUri(uri);
332 // Set a connection filter so that the connector does not establish loop
333 // back connections.
334 connector.setConnectionFilter(new ConnectionFilter() {
335 public boolean connectTo(URI location) {
336 List<TransportConnector> transportConnectors = getTransportConnectors();
337 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
338 try {
339 TransportConnector tc = iter.next();
340 if (location.equals(tc.getConnectUri())) {
341 return false;
342 }
343 } catch (Throwable e) {
344 }
345 }
346 return true;
347 }
348 });
349 networkConnectors.add(connector);
350 if (isUseJmx()) {
351 registerNetworkConnectorMBean(connector);
352 }
353 return connector;
354 }
355
356 /**
357 * Removes the given network connector without stopping it. The caller
358 * should call {@link NetworkConnector#stop()} to close the connector
359 */
360 public boolean removeNetworkConnector(NetworkConnector connector) {
361 boolean answer = networkConnectors.remove(connector);
362 if (answer) {
363 unregisterNetworkConnectorMBean(connector);
364 }
365 return answer;
366 }
367
368 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
369 URI uri = getVmConnectorURI();
370 connector.setLocalUri(uri);
371 proxyConnectors.add(connector);
372 if (isUseJmx()) {
373 registerProxyConnectorMBean(connector);
374 }
375 return connector;
376 }
377
378 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
379 connector.setBrokerService(this);
380 jmsConnectors.add(connector);
381 if (isUseJmx()) {
382 registerJmsConnectorMBean(connector);
383 }
384 return connector;
385 }
386
387 public JmsConnector removeJmsConnector(JmsConnector connector) {
388 if (jmsConnectors.remove(connector)) {
389 return connector;
390 }
391 return null;
392 }
393
394 /**
395 * @return Returns the masterConnectorURI.
396 */
397 public String getMasterConnectorURI() {
398 return masterConnectorURI;
399 }
400
401 /**
402 * @param masterConnectorURI
403 * The masterConnectorURI to set.
404 */
405 public void setMasterConnectorURI(String masterConnectorURI) {
406 this.masterConnectorURI = masterConnectorURI;
407 }
408
409 /**
410 * @return true if this Broker is a slave to a Master
411 */
412 public boolean isSlave() {
413 return (masterConnector != null && masterConnector.isSlave()) ||
414 (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
415 (masterConnector == null && slave);
416 }
417
418 public void masterFailed() {
419 if (shutdownOnMasterFailure) {
420 LOG.error("The Master has failed ... shutting down");
421 try {
422 stop();
423 } catch (Exception e) {
424 LOG.error("Failed to stop for master failure", e);
425 }
426 } else {
427 LOG.warn("Master Failed - starting all connectors");
428 try {
429 startAllConnectors();
430 broker.nowMasterBroker();
431 } catch (Exception e) {
432 LOG.error("Failed to startAllConnectors", e);
433 }
434 }
435 }
436
437 public boolean isStarted() {
438 return started.get();
439 }
440
441 public void start(boolean force) throws Exception {
442 forceStart = force;
443 stopped.set(false);
444 started.set(false);
445 start();
446 }
447
448 // Service interface
449 // -------------------------------------------------------------------------
450
451 protected boolean shouldAutostart() {
452 return true;
453 }
454
455 /**
456 *
457 * @throws Exception
458 * @org. apache.xbean.InitMethod
459 */
460 @PostConstruct
461 public void autoStart() throws Exception {
462 if(shouldAutostart()) {
463 start();
464 }
465 }
466
467 public void start() throws Exception {
468 if (stopped.get() || !started.compareAndSet(false, true)) {
469 // lets just ignore redundant start() calls
470 // as its way too easy to not be completely sure if start() has been
471 // called or not with the gazillion of different configuration
472 // mechanisms
473 // throw new IllegalStateException("Allready started.");
474 return;
475 }
476
477 MDC.put("activemq.broker", brokerName);
478
479 try {
480 if (systemExitOnShutdown && useShutdownHook) {
481 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
482 }
483 processHelperProperties();
484 if (isUseJmx()) {
485 startManagementContext();
486 }
487 getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
488 getPersistenceAdapter().setBrokerName(getBrokerName());
489 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
490 if (deleteAllMessagesOnStartup) {
491 deleteAllMessages();
492 }
493 getPersistenceAdapter().start();
494 slave = false;
495 startDestinations();
496 addShutdownHook();
497 getBroker().start();
498 if (isUseJmx()) {
499 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
500 // try to restart management context
501 // typical for slaves that use the same ports as master
502 managementContext.stop();
503 startManagementContext();
504 }
505 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
506 managedBroker.setContextBroker(broker);
507 adminView.setBroker(managedBroker);
508 }
509 BrokerRegistry.getInstance().bind(getBrokerName(), this);
510 // see if there is a MasterBroker service and if so, configure
511 // it and start it.
512 for (Service service : services) {
513 if (service instanceof MasterConnector) {
514 configureService(service);
515 service.start();
516 }
517 }
518 if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
519 startAllConnectors();
520 }
521 if (!stopped.get()) {
522 if (isUseJmx() && masterConnector != null) {
523 registerFTConnectorMBean(masterConnector);
524 }
525 }
526 if (brokerId == null) {
527 brokerId = broker.getBrokerId();
528 }
529 if (ioExceptionHandler == null) {
530 setIoExceptionHandler(new DefaultIOExceptionHandler());
531 }
532 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
533 getBroker().brokerServiceStarted();
534 startedLatch.countDown();
535 } catch (Exception e) {
536 LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
537 try {
538 if (!stopped.get()) {
539 stop();
540 }
541 } catch (Exception ex) {
542 LOG.warn("Failed to stop broker after failure in start ", ex);
543 }
544 throw e;
545 } finally {
546 MDC.remove("activemq.broker");
547 }
548 }
549
550 /**
551 *
552 * @throws Exception
553 * @org.apache .xbean.DestroyMethod
554 */
555 @PreDestroy
556 public void stop() throws Exception {
557 if (!started.get()) {
558 return;
559 }
560
561 MDC.put("activemq.broker", brokerName);
562
563 if (systemExitOnShutdown) {
564 new Thread() {
565 @Override
566 public void run() {
567 System.exit(systemExitOnShutdownExitCode);
568 }
569 }.start();
570 }
571
572 LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
573 removeShutdownHook();
574 ServiceStopper stopper = new ServiceStopper();
575 if (services != null) {
576 for (Service service : services) {
577 stopper.stop(service);
578 }
579 }
580 stopAllConnectors(stopper);
581 // remove any VMTransports connected
582 // this has to be done after services are stopped,
583 // to avoid timimg issue with discovery (spinning up a new instance)
584 BrokerRegistry.getInstance().unbind(getBrokerName());
585 VMTransportFactory.stopped(getBrokerName());
586 if (broker != null) {
587 stopper.stop(broker);
588 broker = null;
589 }
590
591 if (tempDataStore != null) {
592 tempDataStore.stop();
593 tempDataStore = null;
594 }
595 stopper.stop(persistenceAdapter);
596 persistenceAdapter = null;
597 slave = true;
598 if (isUseJmx()) {
599 stopper.stop(getManagementContext());
600 managementContext = null;
601 }
602 // Clear SelectorParser cache to free memory
603 SelectorParser.clearCache();
604 stopped.set(true);
605 stoppedLatch.countDown();
606 if (masterConnectorURI == null) {
607 // master start has not finished yet
608 if (slaveStartSignal.getCount() == 1) {
609 started.set(false);
610 slaveStartSignal.countDown();
611 }
612 } else {
613 for (Service service : services) {
614 if (service instanceof MasterConnector) {
615 MasterConnector mConnector = (MasterConnector) service;
616 if (!mConnector.isSlave()) {
617 // means should be slave but not connected to master yet
618 started.set(false);
619 mConnector.stopBeforeConnected();
620 }
621 }
622 }
623 }
624 if (this.taskRunnerFactory != null) {
625 this.taskRunnerFactory.shutdown();
626 this.taskRunnerFactory = null;
627 }
628 if (this.scheduler != null) {
629 this.scheduler.stop();
630 this.scheduler = null;
631 }
632 if (this.executor != null) {
633 this.executor.shutdownNow();
634 this.executor = null;
635 }
636
637 this.destinationInterceptors = null;
638 this.destinationFactory = null;
639
640 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
641 synchronized (shutdownHooks) {
642 for (Runnable hook : shutdownHooks) {
643 try {
644 hook.run();
645 } catch (Throwable e) {
646 stopper.onException(hook, e);
647 }
648 }
649 }
650
651 MDC.remove("activemq.broker");
652
653 stopper.throwFirstException();
654 }
655
656 public boolean checkQueueSize(String queueName) {
657 long count = 0;
658 long queueSize = 0;
659 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
660 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
661 if (entry.getKey().isQueue()) {
662 if (entry.getValue().getName().matches(queueName)) {
663 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
664 count += queueSize;
665 if (queueSize > 0) {
666 LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
667 + queueSize);
668 }
669 }
670 }
671 }
672 return count == 0;
673 }
674
675 /**
676 * This method (both connectorName and queueName are using regex to match)
677 * 1. stop the connector (supposed the user input the connector which the
678 * clients connect to) 2. to check whether there is any pending message on
679 * the queues defined by queueName 3. supposedly, after stop the connector,
680 * client should failover to other broker and pending messages should be
681 * forwarded. if no pending messages, the method finally call stop to stop
682 * the broker.
683 *
684 * @param connectorName
685 * @param queueName
686 * @param timeout
687 * @param pollInterval
688 * @throws Exception
689 */
690 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
691 throws Exception {
692 if (isUseJmx()) {
693 if (connectorName == null || queueName == null || timeout <= 0) {
694 throw new Exception(
695 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
696 }
697 if (pollInterval <= 0) {
698 pollInterval = 30;
699 }
700 LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
701 + timeout + " pollInterval:" + pollInterval);
702 TransportConnector connector;
703 for (int i = 0; i < transportConnectors.size(); i++) {
704 connector = transportConnectors.get(i);
705 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
706 connector.stop();
707 }
708 }
709 long start = System.currentTimeMillis();
710 while (System.currentTimeMillis() - start < timeout * 1000) {
711 // check quesize until it gets zero
712 if (checkQueueSize(queueName)) {
713 stop();
714 break;
715 } else {
716 Thread.sleep(pollInterval * 1000);
717 }
718 }
719 if (stopped.get()) {
720 LOG.info("Successfully stop the broker.");
721 } else {
722 LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
723 }
724 }
725 }
726
727 /**
728 * A helper method to block the caller thread until the broker has been
729 * stopped
730 */
731 public void waitUntilStopped() {
732 while (isStarted() && !stopped.get()) {
733 try {
734 stoppedLatch.await();
735 } catch (InterruptedException e) {
736 // ignore
737 }
738 }
739 }
740
741 /**
742 * A helper method to block the caller thread until the broker has fully started
743 * @return boolean true if wait succeeded false if broker was not started or was stopped
744 */
745 public boolean waitUntilStarted() {
746 boolean waitSucceeded = false;
747 while (isStarted() && !stopped.get() && !waitSucceeded) {
748 try {
749 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
750 } catch (InterruptedException ignore) {
751 }
752 }
753 return waitSucceeded;
754 }
755
756 // Properties
757 // -------------------------------------------------------------------------
758 /**
759 * Returns the message broker
760 */
761 public Broker getBroker() throws Exception {
762 if (broker == null) {
763 LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
764 + getBrokerName() + ") is starting");
765 LOG.info("For help or more information please see: http://activemq.apache.org/");
766 broker = createBroker();
767 }
768 return broker;
769 }
770
771 /**
772 * Returns the administration view of the broker; used to create and destroy
773 * resources such as queues and topics. Note this method returns null if JMX
774 * is disabled.
775 */
776 public BrokerView getAdminView() throws Exception {
777 if (adminView == null) {
778 // force lazy creation
779 getBroker();
780 }
781 return adminView;
782 }
783
784 public void setAdminView(BrokerView adminView) {
785 this.adminView = adminView;
786 }
787
788 public String getBrokerName() {
789 return brokerName;
790 }
791
792 /**
793 * Sets the name of this broker; which must be unique in the network
794 *
795 * @param brokerName
796 */
797 public void setBrokerName(String brokerName) {
798 if (brokerName == null) {
799 throw new NullPointerException("The broker name cannot be null");
800 }
801 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
802 if (!str.equals(brokerName)) {
803 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
804 }
805 this.brokerName = str.trim();
806 }
807
808 public PersistenceAdapterFactory getPersistenceFactory() {
809 return persistenceFactory;
810 }
811
812 public File getDataDirectoryFile() {
813 if (dataDirectoryFile == null) {
814 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
815 }
816 return dataDirectoryFile;
817 }
818
819 public File getBrokerDataDirectory() {
820 String brokerDir = getBrokerName();
821 return new File(getDataDirectoryFile(), brokerDir);
822 }
823
824 /**
825 * Sets the directory in which the data files will be stored by default for
826 * the JDBC and Journal persistence adaptors.
827 *
828 * @param dataDirectory
829 * the directory to store data files
830 */
831 public void setDataDirectory(String dataDirectory) {
832 setDataDirectoryFile(new File(dataDirectory));
833 }
834
835 /**
836 * Sets the directory in which the data files will be stored by default for
837 * the JDBC and Journal persistence adaptors.
838 *
839 * @param dataDirectoryFile
840 * the directory to store data files
841 */
842 public void setDataDirectoryFile(File dataDirectoryFile) {
843 this.dataDirectoryFile = dataDirectoryFile;
844 }
845
846 /**
847 * @return the tmpDataDirectory
848 */
849 public File getTmpDataDirectory() {
850 if (tmpDataDirectory == null) {
851 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
852 }
853 return tmpDataDirectory;
854 }
855
856 /**
857 * @param tmpDataDirectory
858 * the tmpDataDirectory to set
859 */
860 public void setTmpDataDirectory(File tmpDataDirectory) {
861 this.tmpDataDirectory = tmpDataDirectory;
862 }
863
864 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
865 this.persistenceFactory = persistenceFactory;
866 }
867
868 public void setDestinationFactory(DestinationFactory destinationFactory) {
869 this.destinationFactory = destinationFactory;
870 }
871
872 public boolean isPersistent() {
873 return persistent;
874 }
875
876 /**
877 * Sets whether or not persistence is enabled or disabled.
878 */
879 public void setPersistent(boolean persistent) {
880 this.persistent = persistent;
881 }
882
883 public boolean isPopulateJMSXUserID() {
884 return populateJMSXUserID;
885 }
886
887 /**
888 * Sets whether or not the broker should populate the JMSXUserID header.
889 */
890 public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
891 this.populateJMSXUserID = populateJMSXUserID;
892 }
893
894 public SystemUsage getSystemUsage() {
895 try {
896 if (systemUsage == null) {
897 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
898 systemUsage.setExecutor(getExecutor());
899 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
900 // 64
901 // Meg
902 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10
903 // Gb
904 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100
905 // GB
906 addService(this.systemUsage);
907 }
908 return systemUsage;
909 } catch (IOException e) {
910 LOG.error("Cannot create SystemUsage", e);
911 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
912 }
913 }
914
915 public void setSystemUsage(SystemUsage memoryManager) {
916 if (this.systemUsage != null) {
917 removeService(this.systemUsage);
918 }
919 this.systemUsage = memoryManager;
920 if (this.systemUsage.getExecutor()==null) {
921 this.systemUsage.setExecutor(getExecutor());
922 }
923 addService(this.systemUsage);
924 }
925
926 /**
927 * @return the consumerUsageManager
928 * @throws IOException
929 */
930 public SystemUsage getConsumerSystemUsage() throws IOException {
931 if (this.consumerSystemUsaage == null) {
932 if (splitSystemUsageForProducersConsumers) {
933 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
934 float portion = consumerSystemUsagePortion / 100f;
935 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
936 addService(this.consumerSystemUsaage);
937 } else {
938 consumerSystemUsaage = getSystemUsage();
939 }
940 }
941 return this.consumerSystemUsaage;
942 }
943
944 /**
945 * @param consumerSystemUsaage
946 * the storeSystemUsage to set
947 */
948 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
949 if (this.consumerSystemUsaage != null) {
950 removeService(this.consumerSystemUsaage);
951 }
952 this.consumerSystemUsaage = consumerSystemUsaage;
953 addService(this.consumerSystemUsaage);
954 }
955
956 /**
957 * @return the producerUsageManager
958 * @throws IOException
959 */
960 public SystemUsage getProducerSystemUsage() throws IOException {
961 if (producerSystemUsage == null) {
962 if (splitSystemUsageForProducersConsumers) {
963 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
964 float portion = producerSystemUsagePortion / 100f;
965 producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
966 addService(producerSystemUsage);
967 } else {
968 producerSystemUsage = getSystemUsage();
969 }
970 }
971 return producerSystemUsage;
972 }
973
974 /**
975 * @param producerUsageManager
976 * the producerUsageManager to set
977 */
978 public void setProducerSystemUsage(SystemUsage producerUsageManager) {
979 if (this.producerSystemUsage != null) {
980 removeService(this.producerSystemUsage);
981 }
982 this.producerSystemUsage = producerUsageManager;
983 addService(this.producerSystemUsage);
984 }
985
986 public PersistenceAdapter getPersistenceAdapter() throws IOException {
987 if (persistenceAdapter == null) {
988 persistenceAdapter = createPersistenceAdapter();
989 configureService(persistenceAdapter);
990 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
991 }
992 return persistenceAdapter;
993 }
994
995 /**
996 * Sets the persistence adaptor implementation to use for this broker
997 *
998 * @throws IOException
999 */
1000 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1001 this.persistenceAdapter = persistenceAdapter;
1002 configureService(this.persistenceAdapter);
1003 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1004 }
1005
1006 public TaskRunnerFactory getTaskRunnerFactory() {
1007 if (this.taskRunnerFactory == null) {
1008 this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1009 isDedicatedTaskRunner());
1010 }
1011 return this.taskRunnerFactory;
1012 }
1013
1014 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1015 this.taskRunnerFactory = taskRunnerFactory;
1016 }
1017
1018 public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1019 if (taskRunnerFactory == null) {
1020 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1021 true, 1000, isDedicatedTaskRunner());
1022 }
1023 return persistenceTaskRunnerFactory;
1024 }
1025
1026 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1027 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1028 }
1029
1030 public boolean isUseJmx() {
1031 return useJmx;
1032 }
1033
1034 public boolean isEnableStatistics() {
1035 return enableStatistics;
1036 }
1037
1038 /**
1039 * Sets whether or not the Broker's services enable statistics or not.
1040 */
1041 public void setEnableStatistics(boolean enableStatistics) {
1042 this.enableStatistics = enableStatistics;
1043 }
1044
1045 /**
1046 * Sets whether or not the Broker's services should be exposed into JMX or
1047 * not.
1048 */
1049 public void setUseJmx(boolean useJmx) {
1050 this.useJmx = useJmx;
1051 }
1052
1053 public ObjectName getBrokerObjectName() throws IOException {
1054 if (brokerObjectName == null) {
1055 brokerObjectName = createBrokerObjectName();
1056 }
1057 return brokerObjectName;
1058 }
1059
1060 /**
1061 * Sets the JMX ObjectName for this broker
1062 */
1063 public void setBrokerObjectName(ObjectName brokerObjectName) {
1064 this.brokerObjectName = brokerObjectName;
1065 }
1066
1067 public ManagementContext getManagementContext() {
1068 if (managementContext == null) {
1069 managementContext = new ManagementContext();
1070 }
1071 return managementContext;
1072 }
1073
1074 public void setManagementContext(ManagementContext managementContext) {
1075 this.managementContext = managementContext;
1076 }
1077
1078 public NetworkConnector getNetworkConnectorByName(String connectorName) {
1079 for (NetworkConnector connector : networkConnectors) {
1080 if (connector.getName().equals(connectorName)) {
1081 return connector;
1082 }
1083 }
1084 return null;
1085 }
1086
1087 public String[] getNetworkConnectorURIs() {
1088 return networkConnectorURIs;
1089 }
1090
1091 public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1092 this.networkConnectorURIs = networkConnectorURIs;
1093 }
1094
1095 public TransportConnector getConnectorByName(String connectorName) {
1096 for (TransportConnector connector : transportConnectors) {
1097 if (connector.getName().equals(connectorName)) {
1098 return connector;
1099 }
1100 }
1101 return null;
1102 }
1103
1104 public Map<String, String> getTransportConnectorURIsAsMap() {
1105 Map<String, String> answer = new HashMap<String, String>();
1106 for (TransportConnector connector : transportConnectors) {
1107 try {
1108 URI uri = connector.getConnectUri();
1109 String scheme = uri.getScheme();
1110 if (scheme != null) {
1111 answer.put(scheme.toLowerCase(), uri.toString());
1112 }
1113 } catch (Exception e) {
1114 LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1115 }
1116 }
1117 return answer;
1118 }
1119
1120 public String[] getTransportConnectorURIs() {
1121 return transportConnectorURIs;
1122 }
1123
1124 public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1125 this.transportConnectorURIs = transportConnectorURIs;
1126 }
1127
1128 /**
1129 * @return Returns the jmsBridgeConnectors.
1130 */
1131 public JmsConnector[] getJmsBridgeConnectors() {
1132 return jmsBridgeConnectors;
1133 }
1134
1135 /**
1136 * @param jmsConnectors
1137 * The jmsBridgeConnectors to set.
1138 */
1139 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1140 this.jmsBridgeConnectors = jmsConnectors;
1141 }
1142
1143 public Service[] getServices() {
1144 return services.toArray(new Service[0]);
1145 }
1146
1147 /**
1148 * Sets the services associated with this broker such as a
1149 * {@link MasterConnector}
1150 */
1151 public void setServices(Service[] services) {
1152 this.services.clear();
1153 if (services != null) {
1154 for (int i = 0; i < services.length; i++) {
1155 this.services.add(services[i]);
1156 }
1157 }
1158 }
1159
1160 /**
1161 * Adds a new service so that it will be started as part of the broker
1162 * lifecycle
1163 */
1164 public void addService(Service service) {
1165 services.add(service);
1166 }
1167
1168 public void removeService(Service service) {
1169 services.remove(service);
1170 }
1171
1172 public boolean isUseLoggingForShutdownErrors() {
1173 return useLoggingForShutdownErrors;
1174 }
1175
1176 /**
1177 * Sets whether or not we should use commons-logging when reporting errors
1178 * when shutting down the broker
1179 */
1180 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1181 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1182 }
1183
1184 public boolean isUseShutdownHook() {
1185 return useShutdownHook;
1186 }
1187
1188 /**
1189 * Sets whether or not we should use a shutdown handler to close down the
1190 * broker cleanly if the JVM is terminated. It is recommended you leave this
1191 * enabled.
1192 */
1193 public void setUseShutdownHook(boolean useShutdownHook) {
1194 this.useShutdownHook = useShutdownHook;
1195 }
1196
1197 public boolean isAdvisorySupport() {
1198 return advisorySupport;
1199 }
1200
1201 /**
1202 * Allows the support of advisory messages to be disabled for performance
1203 * reasons.
1204 */
1205 public void setAdvisorySupport(boolean advisorySupport) {
1206 this.advisorySupport = advisorySupport;
1207 }
1208
1209 public List<TransportConnector> getTransportConnectors() {
1210 return new ArrayList<TransportConnector>(transportConnectors);
1211 }
1212
1213 /**
1214 * Sets the transport connectors which this broker will listen on for new
1215 * clients
1216 *
1217 * @org.apache.xbean.Property
1218 * nestedType="org.apache.activemq.broker.TransportConnector"
1219 */
1220 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1221 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1222 TransportConnector connector = iter.next();
1223 addConnector(connector);
1224 }
1225 }
1226
1227 public List<NetworkConnector> getNetworkConnectors() {
1228 return new ArrayList<NetworkConnector>(networkConnectors);
1229 }
1230
1231 public List<ProxyConnector> getProxyConnectors() {
1232 return new ArrayList<ProxyConnector>(proxyConnectors);
1233 }
1234
1235 /**
1236 * Sets the network connectors which this broker will use to connect to
1237 * other brokers in a federated network
1238 *
1239 * @org.apache.xbean.Property
1240 * nestedType="org.apache.activemq.network.NetworkConnector"
1241 */
1242 public void setNetworkConnectors(List networkConnectors) throws Exception {
1243 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1244 NetworkConnector connector = (NetworkConnector) iter.next();
1245 addNetworkConnector(connector);
1246 }
1247 }
1248
1249 /**
1250 * Sets the network connectors which this broker will use to connect to
1251 * other brokers in a federated network
1252 */
1253 public void setProxyConnectors(List proxyConnectors) throws Exception {
1254 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1255 ProxyConnector connector = (ProxyConnector) iter.next();
1256 addProxyConnector(connector);
1257 }
1258 }
1259
1260 public PolicyMap getDestinationPolicy() {
1261 return destinationPolicy;
1262 }
1263
1264 /**
1265 * Sets the destination specific policies available either for exact
1266 * destinations or for wildcard areas of destinations.
1267 */
1268 public void setDestinationPolicy(PolicyMap policyMap) {
1269 this.destinationPolicy = policyMap;
1270 }
1271
1272 public BrokerPlugin[] getPlugins() {
1273 return plugins;
1274 }
1275
1276 /**
1277 * Sets a number of broker plugins to install such as for security
1278 * authentication or authorization
1279 */
1280 public void setPlugins(BrokerPlugin[] plugins) {
1281 this.plugins = plugins;
1282 }
1283
1284 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1285 return messageAuthorizationPolicy;
1286 }
1287
1288 /**
1289 * Sets the policy used to decide if the current connection is authorized to
1290 * consume a given message
1291 */
1292 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1293 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1294 }
1295
1296 /**
1297 * Delete all messages from the persistent store
1298 *
1299 * @throws IOException
1300 */
1301 public void deleteAllMessages() throws IOException {
1302 getPersistenceAdapter().deleteAllMessages();
1303 }
1304
1305 public boolean isDeleteAllMessagesOnStartup() {
1306 return deleteAllMessagesOnStartup;
1307 }
1308
1309 /**
1310 * Sets whether or not all messages are deleted on startup - mostly only
1311 * useful for testing.
1312 */
1313 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1314 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1315 }
1316
1317 public URI getVmConnectorURI() {
1318 if (vmConnectorURI == null) {
1319 try {
1320 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1321 } catch (URISyntaxException e) {
1322 LOG.error("Badly formed URI from " + getBrokerName(), e);
1323 }
1324 }
1325 return vmConnectorURI;
1326 }
1327
1328 public void setVmConnectorURI(URI vmConnectorURI) {
1329 this.vmConnectorURI = vmConnectorURI;
1330 }
1331
1332 public String getDefaultSocketURIString() {
1333
1334 if (started.get()) {
1335 if (this.defaultSocketURIString ==null) {
1336 for (TransportConnector tc:this.transportConnectors) {
1337 String result = null;
1338 try {
1339 result = tc.getPublishableConnectString();
1340 } catch (Exception e) {
1341 LOG.warn("Failed to get the ConnectURI for "+tc,e);
1342 }
1343 if (result != null) {
1344 this.defaultSocketURIString =result;
1345 break;
1346 }
1347 }
1348 }
1349 return this.defaultSocketURIString;
1350 }
1351 return null;
1352 }
1353
1354 /**
1355 * @return Returns the shutdownOnMasterFailure.
1356 */
1357 public boolean isShutdownOnMasterFailure() {
1358 return shutdownOnMasterFailure;
1359 }
1360
1361 /**
1362 * @param shutdownOnMasterFailure
1363 * The shutdownOnMasterFailure to set.
1364 */
1365 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1366 this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1367 }
1368
1369 public boolean isKeepDurableSubsActive() {
1370 return keepDurableSubsActive;
1371 }
1372
1373 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1374 this.keepDurableSubsActive = keepDurableSubsActive;
1375 }
1376
1377 public boolean isUseVirtualTopics() {
1378 return useVirtualTopics;
1379 }
1380
1381 /**
1382 * Sets whether or not <a
1383 * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1384 * Topics</a> should be supported by default if they have not been
1385 * explicitly configured.
1386 */
1387 public void setUseVirtualTopics(boolean useVirtualTopics) {
1388 this.useVirtualTopics = useVirtualTopics;
1389 }
1390
1391 public DestinationInterceptor[] getDestinationInterceptors() {
1392 return destinationInterceptors;
1393 }
1394
1395 public boolean isUseMirroredQueues() {
1396 return useMirroredQueues;
1397 }
1398
1399 /**
1400 * Sets whether or not <a
1401 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1402 * Queues</a> should be supported by default if they have not been
1403 * explicitly configured.
1404 */
1405 public void setUseMirroredQueues(boolean useMirroredQueues) {
1406 this.useMirroredQueues = useMirroredQueues;
1407 }
1408
1409 /**
1410 * Sets the destination interceptors to use
1411 */
1412 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1413 this.destinationInterceptors = destinationInterceptors;
1414 }
1415
1416 public ActiveMQDestination[] getDestinations() {
1417 return destinations;
1418 }
1419
1420 /**
1421 * Sets the destinations which should be loaded/created on startup
1422 */
1423 public void setDestinations(ActiveMQDestination[] destinations) {
1424 this.destinations = destinations;
1425 }
1426
1427 /**
1428 * @return the tempDataStore
1429 */
1430 public synchronized PListStore getTempDataStore() {
1431 if (tempDataStore == null) {
1432 if (!isPersistent()) {
1433 return null;
1434 }
1435 boolean result = true;
1436 boolean empty = true;
1437 try {
1438 File directory = getTmpDataDirectory();
1439 if (directory.exists() && directory.isDirectory()) {
1440 File[] files = directory.listFiles();
1441 if (files != null && files.length > 0) {
1442 empty = false;
1443 for (int i = 0; i < files.length; i++) {
1444 File file = files[i];
1445 if (!file.isDirectory()) {
1446 result &= file.delete();
1447 }
1448 }
1449 }
1450 }
1451 if (!empty) {
1452 String str = result ? "Successfully deleted" : "Failed to delete";
1453 LOG.info(str + " temporary storage");
1454 }
1455 this.tempDataStore = new PListStore();
1456 this.tempDataStore.setDirectory(getTmpDataDirectory());
1457 this.tempDataStore.start();
1458 } catch (Exception e) {
1459 throw new RuntimeException(e);
1460 }
1461 }
1462 return tempDataStore;
1463 }
1464
1465 /**
1466 * @param tempDataStore
1467 * the tempDataStore to set
1468 */
1469 public void setTempDataStore(PListStore tempDataStore) {
1470 this.tempDataStore = tempDataStore;
1471 }
1472
1473 public int getPersistenceThreadPriority() {
1474 return persistenceThreadPriority;
1475 }
1476
1477 public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1478 this.persistenceThreadPriority = persistenceThreadPriority;
1479 }
1480
1481 /**
1482 * @return the useLocalHostBrokerName
1483 */
1484 public boolean isUseLocalHostBrokerName() {
1485 return this.useLocalHostBrokerName;
1486 }
1487
1488 /**
1489 * @param useLocalHostBrokerName
1490 * the useLocalHostBrokerName to set
1491 */
1492 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1493 this.useLocalHostBrokerName = useLocalHostBrokerName;
1494 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1495 brokerName = LOCAL_HOST_NAME;
1496 }
1497 }
1498
1499 /**
1500 * @return the supportFailOver
1501 */
1502 public boolean isSupportFailOver() {
1503 return this.supportFailOver;
1504 }
1505
1506 /**
1507 * @param supportFailOver
1508 * the supportFailOver to set
1509 */
1510 public void setSupportFailOver(boolean supportFailOver) {
1511 this.supportFailOver = supportFailOver;
1512 }
1513
1514 /**
1515 * Looks up and lazily creates if necessary the destination for the given
1516 * JMS name
1517 */
1518 public Destination getDestination(ActiveMQDestination destination) throws Exception {
1519 return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1520 }
1521
1522 public void removeDestination(ActiveMQDestination destination) throws Exception {
1523 getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1524 }
1525
1526 public int getProducerSystemUsagePortion() {
1527 return producerSystemUsagePortion;
1528 }
1529
1530 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1531 this.producerSystemUsagePortion = producerSystemUsagePortion;
1532 }
1533
1534 public int getConsumerSystemUsagePortion() {
1535 return consumerSystemUsagePortion;
1536 }
1537
1538 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1539 this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1540 }
1541
1542 public boolean isSplitSystemUsageForProducersConsumers() {
1543 return splitSystemUsageForProducersConsumers;
1544 }
1545
1546 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1547 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1548 }
1549
1550 public boolean isMonitorConnectionSplits() {
1551 return monitorConnectionSplits;
1552 }
1553
1554 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1555 this.monitorConnectionSplits = monitorConnectionSplits;
1556 }
1557
1558 public int getTaskRunnerPriority() {
1559 return taskRunnerPriority;
1560 }
1561
1562 public void setTaskRunnerPriority(int taskRunnerPriority) {
1563 this.taskRunnerPriority = taskRunnerPriority;
1564 }
1565
1566 public boolean isDedicatedTaskRunner() {
1567 return dedicatedTaskRunner;
1568 }
1569
1570 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1571 this.dedicatedTaskRunner = dedicatedTaskRunner;
1572 }
1573
1574 public boolean isCacheTempDestinations() {
1575 return cacheTempDestinations;
1576 }
1577
1578 public void setCacheTempDestinations(boolean cacheTempDestinations) {
1579 this.cacheTempDestinations = cacheTempDestinations;
1580 }
1581
1582 public int getTimeBeforePurgeTempDestinations() {
1583 return timeBeforePurgeTempDestinations;
1584 }
1585
1586 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1587 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1588 }
1589
1590 public boolean isUseTempMirroredQueues() {
1591 return useTempMirroredQueues;
1592 }
1593
1594 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1595 this.useTempMirroredQueues = useTempMirroredQueues;
1596 }
1597
1598 //
1599 // Implementation methods
1600 // -------------------------------------------------------------------------
1601 /**
1602 * Handles any lazy-creation helper properties which are added to make
1603 * things easier to configure inside environments such as Spring
1604 *
1605 * @throws Exception
1606 */
1607 protected void processHelperProperties() throws Exception {
1608 boolean masterServiceExists = false;
1609 if (transportConnectorURIs != null) {
1610 for (int i = 0; i < transportConnectorURIs.length; i++) {
1611 String uri = transportConnectorURIs[i];
1612 addConnector(uri);
1613 }
1614 }
1615 if (networkConnectorURIs != null) {
1616 for (int i = 0; i < networkConnectorURIs.length; i++) {
1617 String uri = networkConnectorURIs[i];
1618 addNetworkConnector(uri);
1619 }
1620 }
1621 if (jmsBridgeConnectors != null) {
1622 for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1623 addJmsConnector(jmsBridgeConnectors[i]);
1624 }
1625 }
1626 for (Service service : services) {
1627 if (service instanceof MasterConnector) {
1628 masterServiceExists = true;
1629 break;
1630 }
1631 }
1632 if (masterConnectorURI != null) {
1633 if (masterServiceExists) {
1634 throw new IllegalStateException(
1635 "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1636 } else {
1637 addService(new MasterConnector(masterConnectorURI));
1638 }
1639 }
1640 }
1641
1642 public void stopAllConnectors(ServiceStopper stopper) {
1643 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1644 NetworkConnector connector = iter.next();
1645 unregisterNetworkConnectorMBean(connector);
1646 stopper.stop(connector);
1647 }
1648 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1649 ProxyConnector connector = iter.next();
1650 stopper.stop(connector);
1651 }
1652 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1653 JmsConnector connector = iter.next();
1654 stopper.stop(connector);
1655 }
1656 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1657 TransportConnector connector = iter.next();
1658 stopper.stop(connector);
1659 }
1660 }
1661
1662 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1663 try {
1664 ObjectName objectName = createConnectorObjectName(connector);
1665 connector = connector.asManagedConnector(getManagementContext(), objectName);
1666 ConnectorViewMBean view = new ConnectorView(connector);
1667 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1668 return connector;
1669 } catch (Throwable e) {
1670 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1671 }
1672 }
1673
1674 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1675 if (isUseJmx()) {
1676 try {
1677 ObjectName objectName = createConnectorObjectName(connector);
1678 getManagementContext().unregisterMBean(objectName);
1679 } catch (Throwable e) {
1680 throw IOExceptionSupport.create(
1681 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1682 }
1683 }
1684 }
1685
1686 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1687 return adaptor;
1688 }
1689
1690 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1691 if (isUseJmx()) {
1692 }
1693 }
1694
1695 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1696 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1697 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1698 + JMXSupport.encodeObjectNamePart(connector.getName()));
1699 }
1700
1701 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1702 NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1703 try {
1704 ObjectName objectName = createNetworkConnectorObjectName(connector);
1705 connector.setObjectName(objectName);
1706 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1707 } catch (Throwable e) {
1708 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1709 }
1710 }
1711
1712 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1713 throws MalformedObjectNameException {
1714 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1715 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1716 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1717 }
1718
1719
1720 public ObjectName createDuplexNetworkConnectorObjectName(String transport)
1721 throws MalformedObjectNameException {
1722 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1723 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1724 + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
1725 }
1726
1727 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1728 if (isUseJmx()) {
1729 try {
1730 ObjectName objectName = createNetworkConnectorObjectName(connector);
1731 getManagementContext().unregisterMBean(objectName);
1732 } catch (Exception e) {
1733 LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1734 }
1735 }
1736 }
1737
1738 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1739 ProxyConnectorView view = new ProxyConnectorView(connector);
1740 try {
1741 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1742 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1743 + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1744 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1745 } catch (Throwable e) {
1746 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1747 }
1748 }
1749
1750 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1751 FTConnectorView view = new FTConnectorView(connector);
1752 try {
1753 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1754 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1755 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1756 } catch (Throwable e) {
1757 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1758 }
1759 }
1760
1761 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1762 JmsConnectorView view = new JmsConnectorView(connector);
1763 try {
1764 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1765 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1766 + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1767 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1768 } catch (Throwable e) {
1769 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1770 }
1771 }
1772
1773 /**
1774 * Factory method to create a new broker
1775 *
1776 * @throws Exception
1777 * @throws
1778 * @throws
1779 */
1780 protected Broker createBroker() throws Exception {
1781 regionBroker = createRegionBroker();
1782 Broker broker = addInterceptors(regionBroker);
1783 // Add a filter that will stop access to the broker once stopped
1784 broker = new MutableBrokerFilter(broker) {
1785 Broker old;
1786
1787 @Override
1788 public void stop() throws Exception {
1789 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1790 // Just ignore additional stop actions.
1791 @Override
1792 public void stop() throws Exception {
1793 }
1794 });
1795 old.stop();
1796 }
1797
1798 @Override
1799 public void start() throws Exception {
1800 if (forceStart && old != null) {
1801 this.next.set(old);
1802 }
1803 getNext().start();
1804 }
1805 };
1806 return broker;
1807 }
1808
1809 /**
1810 * Factory method to create the core region broker onto which interceptors
1811 * are added
1812 *
1813 * @throws Exception
1814 */
1815 protected Broker createRegionBroker() throws Exception {
1816 if (destinationInterceptors == null) {
1817 destinationInterceptors = createDefaultDestinationInterceptor();
1818 }
1819 configureServices(destinationInterceptors);
1820 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1821 if (destinationFactory == null) {
1822 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1823 }
1824 return createRegionBroker(destinationInterceptor);
1825 }
1826
1827 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1828 RegionBroker regionBroker;
1829 if (isUseJmx()) {
1830 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1831 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
1832 } else {
1833 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
1834 destinationInterceptor,getScheduler(),getExecutor());
1835 }
1836 destinationFactory.setRegionBroker(regionBroker);
1837 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1838 regionBroker.setBrokerName(getBrokerName());
1839 regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
1840 if (brokerId != null) {
1841 regionBroker.setBrokerId(brokerId);
1842 }
1843 return regionBroker;
1844 }
1845
1846 /**
1847 * Create the default destination interceptor
1848 */
1849 protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
1850 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
1851 if (isUseVirtualTopics()) {
1852 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
1853 VirtualTopic virtualTopic = new VirtualTopic();
1854 virtualTopic.setName("VirtualTopic.>");
1855 VirtualDestination[] virtualDestinations = { virtualTopic };
1856 interceptor.setVirtualDestinations(virtualDestinations);
1857 answer.add(interceptor);
1858 }
1859 if (isUseMirroredQueues()) {
1860 MirroredQueue interceptor = new MirroredQueue();
1861 answer.add(interceptor);
1862 }
1863 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
1864 answer.toArray(array);
1865 return array;
1866 }
1867
1868 /**
1869 * Strategy method to add interceptors to the broker
1870 *
1871 * @throws IOException
1872 */
1873 protected Broker addInterceptors(Broker broker) throws Exception {
1874 if (isSchedulerSupport()) {
1875 SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
1876 if (isUseJmx()) {
1877 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
1878 try {
1879 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
1880 + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1881 + "Type=jobScheduler," + "jobSchedulerName=JMS");
1882
1883 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1884 this.adminView.setJMSJobScheduler(objectName);
1885 } catch (Throwable e) {
1886 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
1887 + e.getMessage(), e);
1888 }
1889
1890 }
1891 broker = sb;
1892 }
1893 if (isAdvisorySupport()) {
1894 broker = new AdvisoryBroker(broker);
1895 }
1896 broker = new CompositeDestinationBroker(broker);
1897 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1898 if (isPopulateJMSXUserID()) {
1899 UserIDBroker userIDBroker = new UserIDBroker(broker);
1900 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
1901 broker = userIDBroker;
1902 }
1903 if (isMonitorConnectionSplits()) {
1904 broker = new ConnectionSplitBroker(broker);
1905 }
1906 if (plugins != null) {
1907 for (int i = 0; i < plugins.length; i++) {
1908 BrokerPlugin plugin = plugins[i];
1909 broker = plugin.installPlugin(broker);
1910 }
1911 }
1912 return broker;
1913 }
1914
1915 protected PersistenceAdapter createPersistenceAdapter() throws IOException {
1916 if (isPersistent()) {
1917 PersistenceAdapterFactory fac = getPersistenceFactory();
1918 if (fac != null) {
1919 return fac.createPersistenceAdapter();
1920 }else {
1921 KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
1922 File dir = new File(getBrokerDataDirectory(),"KahaDB");
1923 adaptor.setDirectory(dir);
1924 return adaptor;
1925 }
1926 } else {
1927 return new MemoryPersistenceAdapter();
1928 }
1929 }
1930
1931 protected ObjectName createBrokerObjectName() throws IOException {
1932 try {
1933 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1934 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
1935 } catch (Throwable e) {
1936 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1937 }
1938 }
1939
1940 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
1941 TransportServer transport = TransportFactory.bind(this, brokerURI);
1942 return new TransportConnector(transport);
1943 }
1944
1945 /**
1946 * Extracts the port from the options
1947 */
1948 protected Object getPort(Map options) {
1949 Object port = options.get("port");
1950 if (port == null) {
1951 port = DEFAULT_PORT;
1952 LOG.warn("No port specified so defaulting to: " + port);
1953 }
1954 return port;
1955 }
1956
1957 protected void addShutdownHook() {
1958 if (useShutdownHook) {
1959 shutdownHook = new Thread("ActiveMQ ShutdownHook") {
1960 @Override
1961 public void run() {
1962 containerShutdown();
1963 }
1964 };
1965 Runtime.getRuntime().addShutdownHook(shutdownHook);
1966 }
1967 }
1968
1969 protected void removeShutdownHook() {
1970 if (shutdownHook != null) {
1971 try {
1972 Runtime.getRuntime().removeShutdownHook(shutdownHook);
1973 } catch (Exception e) {
1974 LOG.debug("Caught exception, must be shutting down: " + e);
1975 }
1976 }
1977 }
1978
1979 /**
1980 * Sets hooks to be executed when broker shut down
1981 *
1982 * @org.apache.xbean.Property
1983 */
1984 public void setShutdownHooks(List<Runnable> hooks) throws Exception {
1985 for (Runnable hook : hooks) {
1986 addShutdownHook(hook);
1987 }
1988 }
1989
1990 /**
1991 * Causes a clean shutdown of the container when the VM is being shut down
1992 */
1993 protected void containerShutdown() {
1994 try {
1995 stop();
1996 } catch (IOException e) {
1997 Throwable linkedException = e.getCause();
1998 if (linkedException != null) {
1999 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2000 } else {
2001 logError("Failed to shut down: " + e, e);
2002 }
2003 if (!useLoggingForShutdownErrors) {
2004 e.printStackTrace(System.err);
2005 }
2006 } catch (Exception e) {
2007 logError("Failed to shut down: " + e, e);
2008 }
2009 }
2010
2011 protected void logError(String message, Throwable e) {
2012 if (useLoggingForShutdownErrors) {
2013 LOG.error("Failed to shut down: " + e);
2014 } else {
2015 System.err.println("Failed to shut down: " + e);
2016 }
2017 }
2018
2019 /**
2020 * Starts any configured destinations on startup
2021 */
2022 protected void startDestinations() throws Exception {
2023 if (destinations != null) {
2024 ConnectionContext adminConnectionContext = getAdminConnectionContext();
2025 for (int i = 0; i < destinations.length; i++) {
2026 ActiveMQDestination destination = destinations[i];
2027 getBroker().addDestination(adminConnectionContext, destination,true);
2028 }
2029 }
2030 }
2031
2032 /**
2033 * Returns the broker's administration connection context used for
2034 * configuring the broker at startup
2035 */
2036 public ConnectionContext getAdminConnectionContext() throws Exception {
2037 return BrokerSupport.getConnectionContext(getBroker());
2038 }
2039
2040 protected void waitForSlave() {
2041 try {
2042 if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2043 throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
2044 }
2045 } catch (InterruptedException e) {
2046 LOG.error("Exception waiting for slave:" + e);
2047 }
2048 }
2049
2050 protected void slaveConnectionEstablished() {
2051 slaveStartSignal.countDown();
2052 }
2053
2054 protected void startManagementContext() throws Exception {
2055 getManagementContext().start();
2056 adminView = new BrokerView(this, null);
2057 ObjectName objectName = getBrokerObjectName();
2058 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2059 }
2060
2061 /**
2062 * Start all transport and network connections, proxies and bridges
2063 *
2064 * @throws Exception
2065 */
2066 public void startAllConnectors() throws Exception {
2067 if (!isSlave()) {
2068 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2069 List<TransportConnector> al = new ArrayList<TransportConnector>();
2070 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2071 TransportConnector connector = iter.next();
2072 connector.setBrokerService(this);
2073 al.add(startTransportConnector(connector));
2074 }
2075 if (al.size() > 0) {
2076 // let's clear the transportConnectors list and replace it with
2077 // the started transportConnector instances
2078 this.transportConnectors.clear();
2079 setTransportConnectors(al);
2080 }
2081 URI uri = getVmConnectorURI();
2082 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2083 map.put("network", "true");
2084 map.put("async", "false");
2085 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2086 if (isWaitForSlave()) {
2087 waitForSlave();
2088 }
2089 if (!stopped.get()) {
2090 ThreadPoolExecutor networkConnectorStartExecutor = null;
2091 if (isNetworkConnectorStartAsync()) {
2092 // spin up as many threads as needed
2093 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2094 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2095 new ThreadFactory() {
2096 int count=0;
2097 public Thread newThread(Runnable runnable) {
2098 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2099 thread.setDaemon(true);
2100 return thread;
2101 }
2102 });
2103 }
2104
2105 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2106 final NetworkConnector connector = iter.next();
2107 connector.setLocalUri(uri);
2108 connector.setBrokerName(getBrokerName());
2109 connector.setDurableDestinations(durableDestinations);
2110 if (getDefaultSocketURIString() != null) {
2111 connector.setBrokerURL(getDefaultSocketURIString());
2112 }
2113 if (networkConnectorStartExecutor != null) {
2114 final Map context = MDCHelper.getCopyOfContextMap();
2115 networkConnectorStartExecutor.execute(new Runnable() {
2116 public void run() {
2117 try {
2118 MDCHelper.setContextMap(context);
2119 LOG.info("Async start of " + connector);
2120 connector.start();
2121 } catch(Exception e) {
2122 LOG.error("Async start of network connector: " + connector + " failed", e);
2123 }
2124 }
2125 });
2126 } else {
2127 connector.start();
2128 }
2129 }
2130 if (networkConnectorStartExecutor != null) {
2131 // executor done when enqueued tasks are complete
2132 networkConnectorStartExecutor.shutdown();
2133 networkConnectorStartExecutor = null;
2134 }
2135
2136 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2137 ProxyConnector connector = iter.next();
2138 connector.start();
2139 }
2140 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2141 JmsConnector connector = iter.next();
2142 connector.start();
2143 }
2144 for (Service service : services) {
2145 configureService(service);
2146 service.start();
2147 }
2148 }
2149 }
2150 }
2151
2152 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2153 connector.setTaskRunnerFactory(getTaskRunnerFactory());
2154 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2155 if (policy != null) {
2156 connector.setMessageAuthorizationPolicy(policy);
2157 }
2158 if (isUseJmx()) {
2159 connector = registerConnectorMBean(connector);
2160 }
2161 connector.getStatistics().setEnabled(enableStatistics);
2162 connector.start();
2163 return connector;
2164 }
2165
2166 /**
2167 * Perform any custom dependency injection
2168 */
2169 protected void configureServices(Object[] services) {
2170 for (Object service : services) {
2171 configureService(service);
2172 }
2173 }
2174
2175 /**
2176 * Perform any custom dependency injection
2177 */
2178 protected void configureService(Object service) {
2179 if (service instanceof BrokerServiceAware) {
2180 BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2181 serviceAware.setBrokerService(this);
2182 }
2183 if (masterConnector == null) {
2184 if (service instanceof MasterConnector) {
2185 masterConnector = (MasterConnector) service;
2186 supportFailOver = true;
2187 }
2188 }
2189 }
2190
2191 public void handleIOException(IOException exception) {
2192 if (ioExceptionHandler != null) {
2193 ioExceptionHandler.handle(exception);
2194 } else {
2195 LOG.info("Ignoring IO exception, " + exception, exception);
2196 }
2197 }
2198
2199 /**
2200 * Starts all destiantions in persistence store. This includes all inactive
2201 * destinations
2202 */
2203 protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
2204 Set destinations = destinationFactory.getDestinations();
2205 if (destinations != null) {
2206 Iterator iter = destinations.iterator();
2207 ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
2208 if (adminConnectionContext == null) {
2209 ConnectionContext context = new ConnectionContext();
2210 context.setBroker(broker);
2211 adminConnectionContext = context;
2212 broker.setAdminConnectionContext(adminConnectionContext);
2213 }
2214 while (iter.hasNext()) {
2215 ActiveMQDestination destination = (ActiveMQDestination) iter.next();
2216 broker.addDestination(adminConnectionContext, destination,false);
2217 }
2218 }
2219 }
2220
2221 protected synchronized ThreadPoolExecutor getExecutor() {
2222 if (this.executor == null) {
2223 this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2224 public Thread newThread(Runnable runnable) {
2225 Thread thread = new Thread(runnable, "Usage Async Task");
2226 thread.setDaemon(true);
2227 return thread;
2228 }
2229 });
2230 }
2231 return this.executor;
2232 }
2233
2234 public synchronized Scheduler getScheduler() {
2235 if (this.scheduler==null) {
2236 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2237 try {
2238 this.scheduler.start();
2239 } catch (Exception e) {
2240 LOG.error("Failed to start Scheduler ",e);
2241 }
2242 }
2243 return this.scheduler;
2244 }
2245
2246 public Broker getRegionBroker() {
2247 return regionBroker;
2248 }
2249
2250 public void setRegionBroker(Broker regionBroker) {
2251 this.regionBroker = regionBroker;
2252 }
2253
2254 public void addShutdownHook(Runnable hook) {
2255 synchronized (shutdownHooks) {
2256 shutdownHooks.add(hook);
2257 }
2258 }
2259
2260 public void removeShutdownHook(Runnable hook) {
2261 synchronized (shutdownHooks) {
2262 shutdownHooks.remove(hook);
2263 }
2264 }
2265
2266 public boolean isSystemExitOnShutdown() {
2267 return systemExitOnShutdown;
2268 }
2269
2270 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2271 this.systemExitOnShutdown = systemExitOnShutdown;
2272 }
2273
2274 public int getSystemExitOnShutdownExitCode() {
2275 return systemExitOnShutdownExitCode;
2276 }
2277
2278 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2279 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2280 }
2281
2282 public SslContext getSslContext() {
2283 return sslContext;
2284 }
2285
2286 public void setSslContext(SslContext sslContext) {
2287 this.sslContext = sslContext;
2288 }
2289
2290 public boolean isShutdownOnSlaveFailure() {
2291 return shutdownOnSlaveFailure;
2292 }
2293
2294 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2295 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2296 }
2297
2298 public boolean isWaitForSlave() {
2299 return waitForSlave;
2300 }
2301
2302 public void setWaitForSlave(boolean waitForSlave) {
2303 this.waitForSlave = waitForSlave;
2304 }
2305
2306 public long getWaitForSlaveTimeout() {
2307 return this.waitForSlaveTimeout;
2308 }
2309
2310 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2311 this.waitForSlaveTimeout = waitForSlaveTimeout;
2312 }
2313
2314 public CountDownLatch getSlaveStartSignal() {
2315 return slaveStartSignal;
2316 }
2317
2318 /**
2319 * Get the passiveSlave
2320 * @return the passiveSlave
2321 */
2322 public boolean isPassiveSlave() {
2323 return this.passiveSlave;
2324 }
2325
2326 /**
2327 * Set the passiveSlave
2328 * @param passiveSlave the passiveSlave to set
2329 */
2330 public void setPassiveSlave(boolean passiveSlave) {
2331 this.passiveSlave = passiveSlave;
2332 }
2333
2334 /**
2335 * override the Default IOException handler, called when persistence adapter
2336 * has experiences File or JDBC I/O Exceptions
2337 *
2338 * @param ioExceptionHandler
2339 */
2340 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2341 configureService(ioExceptionHandler);
2342 this.ioExceptionHandler = ioExceptionHandler;
2343 }
2344
2345 public IOExceptionHandler getIoExceptionHandler() {
2346 return ioExceptionHandler;
2347 }
2348
2349 /**
2350 * @return the schedulerSupport
2351 */
2352 public boolean isSchedulerSupport() {
2353 return this.schedulerSupport;
2354 }
2355
2356 /**
2357 * @param schedulerSupport the schedulerSupport to set
2358 */
2359 public void setSchedulerSupport(boolean schedulerSupport) {
2360 this.schedulerSupport = schedulerSupport;
2361 }
2362
2363 /**
2364 * @return the schedulerDirectory
2365 */
2366 public File getSchedulerDirectoryFile() {
2367 if (this.schedulerDirectoryFile == null) {
2368 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2369 }
2370 return schedulerDirectoryFile;
2371 }
2372
2373 /**
2374 * @param schedulerDirectory the schedulerDirectory to set
2375 */
2376 public void setSchedulerDirectoryFile(File schedulerDirectory) {
2377 this.schedulerDirectoryFile = schedulerDirectory;
2378 }
2379
2380 public void setSchedulerDirectory(String schedulerDirectory) {
2381 setSchedulerDirectoryFile(new File(schedulerDirectory));
2382 }
2383
2384 public int getSchedulePeriodForDestinationPurge() {
2385 return this.schedulePeriodForDestinationPurge;
2386 }
2387
2388 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2389 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2390 }
2391
2392 public BrokerContext getBrokerContext() {
2393 return brokerContext;
2394 }
2395
2396 public void setBrokerContext(BrokerContext brokerContext) {
2397 this.brokerContext = brokerContext;
2398 }
2399
2400 public void setBrokerId(String brokerId) {
2401 this.brokerId = new BrokerId(brokerId);
2402 }
2403
2404 public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2405 return useAuthenticatedPrincipalForJMSXUserID;
2406 }
2407
2408 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2409 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2410 }
2411
2412 public boolean isNetworkConnectorStartAsync() {
2413 return networkConnectorStartAsync;
2414 }
2415
2416 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2417 this.networkConnectorStartAsync = networkConnectorStartAsync;
2418 }
2419 }