001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.net.URI;
020 import java.net.URISyntaxException;
021 import java.util.HashMap;
022 import java.util.Map;
023 import java.util.Properties;
024 import java.util.concurrent.Executor;
025 import java.util.concurrent.ScheduledThreadPoolExecutor;
026 import java.util.concurrent.ThreadFactory;
027 import javax.jms.Connection;
028 import javax.jms.ConnectionFactory;
029 import javax.jms.ExceptionListener;
030 import javax.jms.JMSException;
031 import javax.jms.QueueConnection;
032 import javax.jms.QueueConnectionFactory;
033 import javax.jms.TopicConnection;
034 import javax.jms.TopicConnectionFactory;
035 import javax.naming.Context;
036 import org.apache.activemq.blob.BlobTransferPolicy;
037 import org.apache.activemq.jndi.JNDIBaseStorable;
038 import org.apache.activemq.management.JMSStatsImpl;
039 import org.apache.activemq.management.StatsCapable;
040 import org.apache.activemq.management.StatsImpl;
041 import org.apache.activemq.transport.Transport;
042 import org.apache.activemq.transport.TransportFactory;
043 import org.apache.activemq.transport.TransportListener;
044 import org.apache.activemq.util.IdGenerator;
045 import org.apache.activemq.util.IntrospectionSupport;
046 import org.apache.activemq.util.JMSExceptionSupport;
047 import org.apache.activemq.util.URISupport;
048 import org.apache.activemq.util.URISupport.CompositeData;
049
050 /**
051 * A ConnectionFactory is an an Administered object, and is used for creating
052 * Connections. <p/> This class also implements QueueConnectionFactory and
053 * TopicConnectionFactory. You can use this connection to create both
054 * QueueConnections and TopicConnections.
055 *
056 *
057 * @see javax.jms.ConnectionFactory
058 */
059 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
060
061 public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
062 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
063 public static final String DEFAULT_USER = null;
064 public static final String DEFAULT_PASSWORD = null;
065 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
066
067 protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
068 public Thread newThread(Runnable run) {
069 Thread thread = new Thread(run);
070 thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
071 return thread;
072 }
073 });
074
075 protected URI brokerURL;
076 protected String userName;
077 protected String password;
078 protected String clientID;
079 protected boolean dispatchAsync=true;
080 protected boolean alwaysSessionAsync=true;
081
082 JMSStatsImpl factoryStats = new JMSStatsImpl();
083
084 private IdGenerator clientIdGenerator;
085 private String clientIDPrefix;
086
087 // client policies
088 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
089 private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
090 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
091 private MessageTransformer transformer;
092
093 private boolean disableTimeStampsByDefault;
094 private boolean optimizedMessageDispatch = true;
095 private boolean copyMessageOnSend = true;
096 private boolean useCompression;
097 private boolean objectMessageSerializationDefered;
098 private boolean useAsyncSend;
099 private boolean optimizeAcknowledge;
100 private int closeTimeout = 15000;
101 private boolean useRetroactiveConsumer;
102 private boolean exclusiveConsumer;
103 private boolean nestedMapAndListEnabled = true;
104 private boolean alwaysSyncSend;
105 private boolean watchTopicAdvisories = true;
106 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
107 private long warnAboutUnstartedConnectionTimeout = 500L;
108 private int sendTimeout = 0;
109 private boolean sendAcksAsync=true;
110 private TransportListener transportListener;
111 private ExceptionListener exceptionListener;
112 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
113 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
114 private boolean useDedicatedTaskRunner;
115 private long consumerFailoverRedeliveryWaitPeriod = 0;
116 private boolean checkForDuplicates = true;
117 private ClientInternalExceptionListener clientInternalExceptionListener;
118 private boolean messagePrioritySupported = true;
119
120 // /////////////////////////////////////////////
121 //
122 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
123 //
124 // /////////////////////////////////////////////
125
126 public ActiveMQConnectionFactory() {
127 this(DEFAULT_BROKER_URL);
128 }
129
130 public ActiveMQConnectionFactory(String brokerURL) {
131 this(createURI(brokerURL));
132 }
133
134 public ActiveMQConnectionFactory(URI brokerURL) {
135 setBrokerURL(brokerURL.toString());
136 }
137
138 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
139 setUserName(userName);
140 setPassword(password);
141 setBrokerURL(brokerURL.toString());
142 }
143
144 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
145 setUserName(userName);
146 setPassword(password);
147 setBrokerURL(brokerURL);
148 }
149
150 /**
151 * Returns a copy of the given connection factory
152 */
153 public ActiveMQConnectionFactory copy() {
154 try {
155 return (ActiveMQConnectionFactory)super.clone();
156 } catch (CloneNotSupportedException e) {
157 throw new RuntimeException("This should never happen: " + e, e);
158 }
159 }
160
161 /**
162 * @param brokerURL
163 * @return
164 * @throws URISyntaxException
165 */
166 private static URI createURI(String brokerURL) {
167 try {
168 return new URI(brokerURL);
169 } catch (URISyntaxException e) {
170 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
171 }
172 }
173
174 /**
175 * @return Returns the Connection.
176 */
177 public Connection createConnection() throws JMSException {
178 return createActiveMQConnection();
179 }
180
181 /**
182 * @return Returns the Connection.
183 */
184 public Connection createConnection(String userName, String password) throws JMSException {
185 return createActiveMQConnection(userName, password);
186 }
187
188 /**
189 * @return Returns the QueueConnection.
190 * @throws JMSException
191 */
192 public QueueConnection createQueueConnection() throws JMSException {
193 return createActiveMQConnection();
194 }
195
196 /**
197 * @return Returns the QueueConnection.
198 */
199 public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
200 return createActiveMQConnection(userName, password);
201 }
202
203 /**
204 * @return Returns the TopicConnection.
205 * @throws JMSException
206 */
207 public TopicConnection createTopicConnection() throws JMSException {
208 return createActiveMQConnection();
209 }
210
211 /**
212 * @return Returns the TopicConnection.
213 */
214 public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
215 return createActiveMQConnection(userName, password);
216 }
217
218 public StatsImpl getStats() {
219 // TODO
220 return null;
221 }
222
223 // /////////////////////////////////////////////
224 //
225 // Implementation methods.
226 //
227 // /////////////////////////////////////////////
228
229 protected ActiveMQConnection createActiveMQConnection() throws JMSException {
230 return createActiveMQConnection(userName, password);
231 }
232
233 /**
234 * Creates a Transport based on this object's connection settings. Separated
235 * from createActiveMQConnection to allow for subclasses to override.
236 *
237 * @return The newly created Transport.
238 * @throws JMSException If unable to create trasnport.
239 * @author sepandm@gmail.com
240 */
241 protected Transport createTransport() throws JMSException {
242 try {
243 return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
244 } catch (Exception e) {
245 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
246 }
247 }
248
249 /**
250 * @return Returns the Connection.
251 */
252 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
253 if (brokerURL == null) {
254 throw new ConfigurationException("brokerURL not set.");
255 }
256 ActiveMQConnection connection = null;
257 try {
258 Transport transport = createTransport();
259 connection = createActiveMQConnection(transport, factoryStats);
260
261 connection.setUserName(userName);
262 connection.setPassword(password);
263
264 configureConnection(connection);
265
266 transport.start();
267
268 if (clientID != null) {
269 connection.setDefaultClientID(clientID);
270 }
271
272 return connection;
273 } catch (JMSException e) {
274 // Clean up!
275 try {
276 connection.close();
277 } catch (Throwable ignore) {
278 }
279 throw e;
280 } catch (Exception e) {
281 // Clean up!
282 try {
283 connection.close();
284 } catch (Throwable ignore) {
285 }
286 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
287 }
288 }
289
290 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
291 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
292 return connection;
293 }
294
295 protected void configureConnection(ActiveMQConnection connection) throws JMSException {
296 connection.setPrefetchPolicy(getPrefetchPolicy());
297 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
298 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
299 connection.setCopyMessageOnSend(isCopyMessageOnSend());
300 connection.setUseCompression(isUseCompression());
301 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
302 connection.setDispatchAsync(isDispatchAsync());
303 connection.setUseAsyncSend(isUseAsyncSend());
304 connection.setAlwaysSyncSend(isAlwaysSyncSend());
305 connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
306 connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
307 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
308 connection.setExclusiveConsumer(isExclusiveConsumer());
309 connection.setRedeliveryPolicy(getRedeliveryPolicy());
310 connection.setTransformer(getTransformer());
311 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
312 connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
313 connection.setProducerWindowSize(getProducerWindowSize());
314 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
315 connection.setSendTimeout(getSendTimeout());
316 connection.setCloseTimeout(getCloseTimeout());
317 connection.setSendAcksAsync(isSendAcksAsync());
318 connection.setAuditDepth(getAuditDepth());
319 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
320 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
321 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
322 connection.setCheckForDuplicates(isCheckForDuplicates());
323 connection.setMessagePrioritySupported(isMessagePrioritySupported());
324 if (transportListener != null) {
325 connection.addTransportListener(transportListener);
326 }
327 if (exceptionListener != null) {
328 connection.setExceptionListener(exceptionListener);
329 }
330 if (clientInternalExceptionListener != null) {
331 connection.setClientInternalExceptionListener(clientInternalExceptionListener);
332 }
333 }
334
335 // /////////////////////////////////////////////
336 //
337 // Property Accessors
338 //
339 // /////////////////////////////////////////////
340
341 public String getBrokerURL() {
342 return brokerURL == null ? null : brokerURL.toString();
343 }
344
345 /**
346 * Sets the <a
347 * href="http://activemq.apache.org/configuring-transports.html">connection
348 * URL</a> used to connect to the ActiveMQ broker.
349 */
350 public void setBrokerURL(String brokerURL) {
351 this.brokerURL = createURI(brokerURL);
352
353 // Use all the properties prefixed with 'jms.' to set the connection
354 // factory
355 // options.
356 if (this.brokerURL.getQuery() != null) {
357 // It might be a standard URI or...
358 try {
359
360 Map map = URISupport.parseQuery(this.brokerURL.getQuery());
361 if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) {
362 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
363 }
364
365 } catch (URISyntaxException e) {
366 }
367
368 } else {
369
370 // It might be a composite URI.
371 try {
372 CompositeData data = URISupport.parseComposite(this.brokerURL);
373 if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) {
374 this.brokerURL = data.toURI();
375 }
376 } catch (URISyntaxException e) {
377 }
378 }
379 }
380
381 public String getClientID() {
382 return clientID;
383 }
384
385 /**
386 * Sets the JMS clientID to use for the created connection. Note that this
387 * can only be used by one connection at once so generally its a better idea
388 * to set the clientID on a Connection
389 */
390 public void setClientID(String clientID) {
391 this.clientID = clientID;
392 }
393
394 public boolean isCopyMessageOnSend() {
395 return copyMessageOnSend;
396 }
397
398 /**
399 * Should a JMS message be copied to a new JMS Message object as part of the
400 * send() method in JMS. This is enabled by default to be compliant with the
401 * JMS specification. You can disable it if you do not mutate JMS messages
402 * after they are sent for a performance boost
403 */
404 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
405 this.copyMessageOnSend = copyMessageOnSend;
406 }
407
408 public boolean isDisableTimeStampsByDefault() {
409 return disableTimeStampsByDefault;
410 }
411
412 /**
413 * Sets whether or not timestamps on messages should be disabled or not. If
414 * you disable them it adds a small performance boost.
415 */
416 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
417 this.disableTimeStampsByDefault = disableTimeStampsByDefault;
418 }
419
420 public boolean isOptimizedMessageDispatch() {
421 return optimizedMessageDispatch;
422 }
423
424 /**
425 * If this flag is set then an larger prefetch limit is used - only
426 * applicable for durable topic subscribers.
427 */
428 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
429 this.optimizedMessageDispatch = optimizedMessageDispatch;
430 }
431
432 public String getPassword() {
433 return password;
434 }
435
436 /**
437 * Sets the JMS password used for connections created from this factory
438 */
439 public void setPassword(String password) {
440 this.password = password;
441 }
442
443 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
444 return prefetchPolicy;
445 }
446
447 /**
448 * Sets the <a
449 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
450 * policy</a> for consumers created by this connection.
451 */
452 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
453 this.prefetchPolicy = prefetchPolicy;
454 }
455
456 public boolean isUseAsyncSend() {
457 return useAsyncSend;
458 }
459
460 public BlobTransferPolicy getBlobTransferPolicy() {
461 return blobTransferPolicy;
462 }
463
464 /**
465 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
466 * OBjects) are transferred from producers to brokers to consumers
467 */
468 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
469 this.blobTransferPolicy = blobTransferPolicy;
470 }
471
472 /**
473 * Forces the use of <a
474 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
475 * adds a massive performance boost; but means that the send() method will
476 * return immediately whether the message has been sent or not which could
477 * lead to message loss.
478 */
479 public void setUseAsyncSend(boolean useAsyncSend) {
480 this.useAsyncSend = useAsyncSend;
481 }
482
483 public synchronized boolean isWatchTopicAdvisories() {
484 return watchTopicAdvisories;
485 }
486
487 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
488 this.watchTopicAdvisories = watchTopicAdvisories;
489 }
490
491 /**
492 * @return true if always sync send messages
493 */
494 public boolean isAlwaysSyncSend() {
495 return this.alwaysSyncSend;
496 }
497
498 /**
499 * Set true if always require messages to be sync sent
500 *
501 * @param alwaysSyncSend
502 */
503 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
504 this.alwaysSyncSend = alwaysSyncSend;
505 }
506
507 public String getUserName() {
508 return userName;
509 }
510
511 /**
512 * Sets the JMS userName used by connections created by this factory
513 */
514 public void setUserName(String userName) {
515 this.userName = userName;
516 }
517
518 public boolean isUseRetroactiveConsumer() {
519 return useRetroactiveConsumer;
520 }
521
522 /**
523 * Sets whether or not retroactive consumers are enabled. Retroactive
524 * consumers allow non-durable topic subscribers to receive old messages
525 * that were published before the non-durable subscriber started.
526 */
527 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
528 this.useRetroactiveConsumer = useRetroactiveConsumer;
529 }
530
531 public boolean isExclusiveConsumer() {
532 return exclusiveConsumer;
533 }
534
535 /**
536 * Enables or disables whether or not queue consumers should be exclusive or
537 * not for example to preserve ordering when not using <a
538 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
539 *
540 * @param exclusiveConsumer
541 */
542 public void setExclusiveConsumer(boolean exclusiveConsumer) {
543 this.exclusiveConsumer = exclusiveConsumer;
544 }
545
546 public RedeliveryPolicy getRedeliveryPolicy() {
547 return redeliveryPolicy;
548 }
549
550 /**
551 * Sets the global redelivery policy to be used when a message is delivered
552 * but the session is rolled back
553 */
554 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
555 this.redeliveryPolicy = redeliveryPolicy;
556 }
557
558 public MessageTransformer getTransformer() {
559 return transformer;
560 }
561
562 /**
563 * @return the sendTimeout
564 */
565 public int getSendTimeout() {
566 return sendTimeout;
567 }
568
569 /**
570 * @param sendTimeout the sendTimeout to set
571 */
572 public void setSendTimeout(int sendTimeout) {
573 this.sendTimeout = sendTimeout;
574 }
575
576 /**
577 * @return the sendAcksAsync
578 */
579 public boolean isSendAcksAsync() {
580 return sendAcksAsync;
581 }
582
583 /**
584 * @param sendAcksAsync the sendAcksAsync to set
585 */
586 public void setSendAcksAsync(boolean sendAcksAsync) {
587 this.sendAcksAsync = sendAcksAsync;
588 }
589
590 /**
591 * @return the messagePrioritySupported
592 */
593 public boolean isMessagePrioritySupported() {
594 return this.messagePrioritySupported;
595 }
596
597 /**
598 * @param messagePrioritySupported the messagePrioritySupported to set
599 */
600 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
601 this.messagePrioritySupported = messagePrioritySupported;
602 }
603
604
605 /**
606 * Sets the transformer used to transform messages before they are sent on
607 * to the JMS bus or when they are received from the bus but before they are
608 * delivered to the JMS client
609 */
610 public void setTransformer(MessageTransformer transformer) {
611 this.transformer = transformer;
612 }
613
614 @Override
615 public void buildFromProperties(Properties properties) {
616
617 if (properties == null) {
618 properties = new Properties();
619 }
620
621 String temp = properties.getProperty(Context.PROVIDER_URL);
622 if (temp == null || temp.length() == 0) {
623 temp = properties.getProperty("brokerURL");
624 }
625 if (temp != null && temp.length() > 0) {
626 setBrokerURL(temp);
627 }
628
629 Map<String, Object> p = new HashMap(properties);
630 buildFromMap(p);
631 }
632
633 public boolean buildFromMap(Map<String, Object> properties) {
634 boolean rc = false;
635
636 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
637 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
638 setPrefetchPolicy(p);
639 rc = true;
640 }
641
642 RedeliveryPolicy rp = new RedeliveryPolicy();
643 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
644 setRedeliveryPolicy(rp);
645 rc = true;
646 }
647
648 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
649 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
650 setBlobTransferPolicy(blobTransferPolicy);
651 rc = true;
652 }
653
654 rc |= IntrospectionSupport.setProperties(this, properties);
655
656 return rc;
657 }
658
659 @Override
660 public void populateProperties(Properties props) {
661 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
662
663 if (getBrokerURL() != null) {
664 props.setProperty(Context.PROVIDER_URL, getBrokerURL());
665 props.setProperty("brokerURL", getBrokerURL());
666 }
667
668 if (getClientID() != null) {
669 props.setProperty("clientID", getClientID());
670 }
671
672 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
673 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
674 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
675
676 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
677 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
678 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
679 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
680
681 if (getPassword() != null) {
682 props.setProperty("password", getPassword());
683 }
684
685 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
686 props.setProperty("useCompression", Boolean.toString(isUseCompression()));
687 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
688 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
689
690 if (getUserName() != null) {
691 props.setProperty("userName", getUserName());
692 }
693
694 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
695 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
696 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
697 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
698 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
699 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
700 props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
701 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
702 props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
703 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
704 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
705 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
706 }
707
708 public boolean isUseCompression() {
709 return useCompression;
710 }
711
712 /**
713 * Enables the use of compression of the message bodies
714 */
715 public void setUseCompression(boolean useCompression) {
716 this.useCompression = useCompression;
717 }
718
719 public boolean isObjectMessageSerializationDefered() {
720 return objectMessageSerializationDefered;
721 }
722
723 /**
724 * When an object is set on an ObjectMessage, the JMS spec requires the
725 * object to be serialized by that set method. Enabling this flag causes the
726 * object to not get serialized. The object may subsequently get serialized
727 * if the message needs to be sent over a socket or stored to disk.
728 */
729 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
730 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
731 }
732
733 public boolean isDispatchAsync() {
734 return dispatchAsync;
735 }
736
737 /**
738 * Enables or disables the default setting of whether or not consumers have
739 * their messages <a
740 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
741 * synchronously or asynchronously by the broker</a>. For non-durable
742 * topics for example we typically dispatch synchronously by default to
743 * minimize context switches which boost performance. However sometimes its
744 * better to go slower to ensure that a single blocked consumer socket does
745 * not block delivery to other consumers.
746 *
747 * @param asyncDispatch If true then consumers created on this connection
748 * will default to having their messages dispatched
749 * asynchronously. The default value is false.
750 */
751 public void setDispatchAsync(boolean asyncDispatch) {
752 this.dispatchAsync = asyncDispatch;
753 }
754
755 /**
756 * @return Returns the closeTimeout.
757 */
758 public int getCloseTimeout() {
759 return closeTimeout;
760 }
761
762 /**
763 * Sets the timeout before a close is considered complete. Normally a
764 * close() on a connection waits for confirmation from the broker; this
765 * allows that operation to timeout to save the client hanging if there is
766 * no broker
767 */
768 public void setCloseTimeout(int closeTimeout) {
769 this.closeTimeout = closeTimeout;
770 }
771
772 /**
773 * @return Returns the alwaysSessionAsync.
774 */
775 public boolean isAlwaysSessionAsync() {
776 return alwaysSessionAsync;
777 }
778
779 /**
780 * If this flag is set then a separate thread is not used for dispatching
781 * messages for each Session in the Connection. However, a separate thread
782 * is always used if there is more than one session, or the session isn't in
783 * auto acknowledge or duplicates ok mode
784 */
785 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
786 this.alwaysSessionAsync = alwaysSessionAsync;
787 }
788
789 /**
790 * @return Returns the optimizeAcknowledge.
791 */
792 public boolean isOptimizeAcknowledge() {
793 return optimizeAcknowledge;
794 }
795
796 /**
797 * @param optimizeAcknowledge The optimizeAcknowledge to set.
798 */
799 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
800 this.optimizeAcknowledge = optimizeAcknowledge;
801 }
802
803 public boolean isNestedMapAndListEnabled() {
804 return nestedMapAndListEnabled;
805 }
806
807 /**
808 * Enables/disables whether or not Message properties and MapMessage entries
809 * support <a
810 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
811 * Structures</a> of Map and List objects
812 */
813 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
814 this.nestedMapAndListEnabled = structuredMapsEnabled;
815 }
816
817 public String getClientIDPrefix() {
818 return clientIDPrefix;
819 }
820
821 /**
822 * Sets the prefix used by autogenerated JMS Client ID values which are used
823 * if the JMS client does not explicitly specify on.
824 *
825 * @param clientIDPrefix
826 */
827 public void setClientIDPrefix(String clientIDPrefix) {
828 this.clientIDPrefix = clientIDPrefix;
829 }
830
831 protected synchronized IdGenerator getClientIdGenerator() {
832 if (clientIdGenerator == null) {
833 if (clientIDPrefix != null) {
834 clientIdGenerator = new IdGenerator(clientIDPrefix);
835 } else {
836 clientIdGenerator = new IdGenerator();
837 }
838 }
839 return clientIdGenerator;
840 }
841
842 protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
843 this.clientIdGenerator = clientIdGenerator;
844 }
845
846 /**
847 * @return the statsEnabled
848 */
849 public boolean isStatsEnabled() {
850 return this.factoryStats.isEnabled();
851 }
852
853 /**
854 * @param statsEnabled the statsEnabled to set
855 */
856 public void setStatsEnabled(boolean statsEnabled) {
857 this.factoryStats.setEnabled(statsEnabled);
858 }
859
860 public synchronized int getProducerWindowSize() {
861 return producerWindowSize;
862 }
863
864 public synchronized void setProducerWindowSize(int producerWindowSize) {
865 this.producerWindowSize = producerWindowSize;
866 }
867
868 public long getWarnAboutUnstartedConnectionTimeout() {
869 return warnAboutUnstartedConnectionTimeout;
870 }
871
872 /**
873 * Enables the timeout from a connection creation to when a warning is
874 * generated if the connection is not properly started via
875 * {@link Connection#start()} and a message is received by a consumer. It is
876 * a very common gotcha to forget to <a
877 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
878 * the connection</a> so this option makes the default case to create a
879 * warning if the user forgets. To disable the warning just set the value to <
880 * 0 (say -1).
881 */
882 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
883 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
884 }
885
886 public TransportListener getTransportListener() {
887 return transportListener;
888 }
889
890 /**
891 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
892 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
893 * a transport listener.
894 *
895 * @param transportListener sets the listener to be registered on all connections
896 * created by this factory
897 */
898 public void setTransportListener(TransportListener transportListener) {
899 this.transportListener = transportListener;
900 }
901
902
903 public ExceptionListener getExceptionListener() {
904 return exceptionListener;
905 }
906
907 /**
908 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
909 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
910 * an exception listener.
911 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
912 * on connection (as it will be if more than one connection is subsequently created by this connection factory)
913 * @param exceptionListener sets the exception listener to be registered on all connections
914 * created by this factory
915 */
916 public void setExceptionListener(ExceptionListener exceptionListener) {
917 this.exceptionListener = exceptionListener;
918 }
919
920 public int getAuditDepth() {
921 return auditDepth;
922 }
923
924 public void setAuditDepth(int auditDepth) {
925 this.auditDepth = auditDepth;
926 }
927
928 public int getAuditMaximumProducerNumber() {
929 return auditMaximumProducerNumber;
930 }
931
932 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
933 this.auditMaximumProducerNumber = auditMaximumProducerNumber;
934 }
935
936 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
937 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
938 }
939
940 public boolean isUseDedicatedTaskRunner() {
941 return useDedicatedTaskRunner;
942 }
943
944 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
945 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
946 }
947
948 public long getConsumerFailoverRedeliveryWaitPeriod() {
949 return consumerFailoverRedeliveryWaitPeriod;
950 }
951
952 public ClientInternalExceptionListener getClientInternalExceptionListener() {
953 return clientInternalExceptionListener;
954 }
955
956 /**
957 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
958 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
959 * an exception listener.
960 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
961 * on connection (as it will be if more than one connection is subsequently created by this connection factory)
962 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
963 * created by this factory
964 */
965 public void setClientInternalExceptionListener(
966 ClientInternalExceptionListener clientInternalExceptionListener) {
967 this.clientInternalExceptionListener = clientInternalExceptionListener;
968 }
969
970 /**
971 * @return the checkForDuplicates
972 */
973 public boolean isCheckForDuplicates() {
974 return this.checkForDuplicates;
975 }
976
977 /**
978 * @param checkForDuplicates the checkForDuplicates to set
979 */
980 public void setCheckForDuplicates(boolean checkForDuplicates) {
981 this.checkForDuplicates = checkForDuplicates;
982 }
983 }