001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.Map;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArrayList;
029 import java.util.concurrent.CountDownLatch;
030 import java.util.concurrent.LinkedBlockingQueue;
031 import java.util.concurrent.ThreadFactory;
032 import java.util.concurrent.ThreadPoolExecutor;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicBoolean;
035 import java.util.concurrent.atomic.AtomicInteger;
036 import javax.jms.Connection;
037 import javax.jms.ConnectionConsumer;
038 import javax.jms.ConnectionMetaData;
039 import javax.jms.DeliveryMode;
040 import javax.jms.Destination;
041 import javax.jms.ExceptionListener;
042 import javax.jms.IllegalStateException;
043 import javax.jms.InvalidDestinationException;
044 import javax.jms.JMSException;
045 import javax.jms.Queue;
046 import javax.jms.QueueConnection;
047 import javax.jms.QueueSession;
048 import javax.jms.ServerSessionPool;
049 import javax.jms.Session;
050 import javax.jms.Topic;
051 import javax.jms.TopicConnection;
052 import javax.jms.TopicSession;
053 import javax.jms.XAConnection;
054 import org.apache.activemq.advisory.DestinationSource;
055 import org.apache.activemq.blob.BlobTransferPolicy;
056 import org.apache.activemq.command.ActiveMQDestination;
057 import org.apache.activemq.command.ActiveMQMessage;
058 import org.apache.activemq.command.ActiveMQTempDestination;
059 import org.apache.activemq.command.ActiveMQTempQueue;
060 import org.apache.activemq.command.ActiveMQTempTopic;
061 import org.apache.activemq.command.BrokerInfo;
062 import org.apache.activemq.command.Command;
063 import org.apache.activemq.command.CommandTypes;
064 import org.apache.activemq.command.ConnectionControl;
065 import org.apache.activemq.command.ConnectionError;
066 import org.apache.activemq.command.ConnectionId;
067 import org.apache.activemq.command.ConnectionInfo;
068 import org.apache.activemq.command.ConsumerControl;
069 import org.apache.activemq.command.ConsumerId;
070 import org.apache.activemq.command.ConsumerInfo;
071 import org.apache.activemq.command.ControlCommand;
072 import org.apache.activemq.command.DestinationInfo;
073 import org.apache.activemq.command.ExceptionResponse;
074 import org.apache.activemq.command.Message;
075 import org.apache.activemq.command.MessageDispatch;
076 import org.apache.activemq.command.MessageId;
077 import org.apache.activemq.command.ProducerAck;
078 import org.apache.activemq.command.ProducerId;
079 import org.apache.activemq.command.RemoveInfo;
080 import org.apache.activemq.command.RemoveSubscriptionInfo;
081 import org.apache.activemq.command.Response;
082 import org.apache.activemq.command.SessionId;
083 import org.apache.activemq.command.ShutdownInfo;
084 import org.apache.activemq.command.WireFormatInfo;
085 import org.apache.activemq.management.JMSConnectionStatsImpl;
086 import org.apache.activemq.management.JMSStatsImpl;
087 import org.apache.activemq.management.StatsCapable;
088 import org.apache.activemq.management.StatsImpl;
089 import org.apache.activemq.state.CommandVisitorAdapter;
090 import org.apache.activemq.thread.Scheduler;
091 import org.apache.activemq.thread.TaskRunnerFactory;
092 import org.apache.activemq.transport.Transport;
093 import org.apache.activemq.transport.TransportListener;
094 import org.apache.activemq.transport.failover.FailoverTransport;
095 import org.apache.activemq.util.IdGenerator;
096 import org.apache.activemq.util.IntrospectionSupport;
097 import org.apache.activemq.util.JMSExceptionSupport;
098 import org.apache.activemq.util.LongSequenceGenerator;
099 import org.apache.activemq.util.ServiceSupport;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
102
103 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
104
105 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
106 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
107 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
108
109 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
110 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
111
112 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
113
114 protected boolean dispatchAsync=true;
115 protected boolean alwaysSessionAsync = true;
116
117 private TaskRunnerFactory sessionTaskRunner;
118 private final ThreadPoolExecutor executor;
119
120 // Connection state variables
121 private final ConnectionInfo info;
122 private ExceptionListener exceptionListener;
123 private ClientInternalExceptionListener clientInternalExceptionListener;
124 private boolean clientIDSet;
125 private boolean isConnectionInfoSentToBroker;
126 private boolean userSpecifiedClientID;
127
128 // Configuration options variables
129 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
130 private BlobTransferPolicy blobTransferPolicy;
131 private RedeliveryPolicy redeliveryPolicy;
132 private MessageTransformer transformer;
133
134 private boolean disableTimeStampsByDefault;
135 private boolean optimizedMessageDispatch = true;
136 private boolean copyMessageOnSend = true;
137 private boolean useCompression;
138 private boolean objectMessageSerializationDefered;
139 private boolean useAsyncSend;
140 private boolean optimizeAcknowledge;
141 private boolean nestedMapAndListEnabled = true;
142 private boolean useRetroactiveConsumer;
143 private boolean exclusiveConsumer;
144 private boolean alwaysSyncSend;
145 private int closeTimeout = 15000;
146 private boolean watchTopicAdvisories = true;
147 private long warnAboutUnstartedConnectionTimeout = 500L;
148 private int sendTimeout =0;
149 private boolean sendAcksAsync=true;
150 private boolean checkForDuplicates = true;
151
152 private final Transport transport;
153 private final IdGenerator clientIdGenerator;
154 private final JMSStatsImpl factoryStats;
155 private final JMSConnectionStatsImpl stats;
156
157 private final AtomicBoolean started = new AtomicBoolean(false);
158 private final AtomicBoolean closing = new AtomicBoolean(false);
159 private final AtomicBoolean closed = new AtomicBoolean(false);
160 private final AtomicBoolean transportFailed = new AtomicBoolean(false);
161 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
162 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
163 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
164 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
165 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
166
167 // Maps ConsumerIds to ActiveMQConsumer objects
168 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
169 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
170 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
171 private final SessionId connectionSessionId;
172 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
173 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
174 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
175 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
176
177 private AdvisoryConsumer advisoryConsumer;
178 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
179 private BrokerInfo brokerInfo;
180 private IOException firstFailureError;
181 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
182
183 // Assume that protocol is the latest. Change to the actual protocol
184 // version when a WireFormatInfo is received.
185 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
186 private final long timeCreated;
187 private final ConnectionAudit connectionAudit = new ConnectionAudit();
188 private DestinationSource destinationSource;
189 private final Object ensureConnectionInfoSentMutex = new Object();
190 private boolean useDedicatedTaskRunner;
191 protected volatile CountDownLatch transportInterruptionProcessingComplete;
192 private long consumerFailoverRedeliveryWaitPeriod;
193 private final Scheduler scheduler;
194 private boolean messagePrioritySupported=true;
195
196 /**
197 * Construct an <code>ActiveMQConnection</code>
198 *
199 * @param transport
200 * @param factoryStats
201 * @throws Exception
202 */
203 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204
205 this.transport = transport;
206 this.clientIdGenerator = clientIdGenerator;
207 this.factoryStats = factoryStats;
208
209 // Configure a single threaded executor who's core thread can timeout if
210 // idle
211 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212 public Thread newThread(Runnable r) {
213 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214 thread.setDaemon(true);
215 return thread;
216 }
217 });
218 // asyncConnectionThread.allowCoreThreadTimeOut(true);
219 String uniqueId = CONNECTION_ID_GENERATOR.generateId();
220 this.info = new ConnectionInfo(new ConnectionId(uniqueId));
221 this.info.setManageable(true);
222 this.info.setFaultTolerant(transport.isFaultTolerant());
223 this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
224
225 this.transport.setTransportListener(this);
226
227 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
228 this.factoryStats.addConnection(this);
229 this.timeCreated = System.currentTimeMillis();
230 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
231 this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
232 this.scheduler.start();
233 }
234
235 protected void setUserName(String userName) {
236 this.info.setUserName(userName);
237 }
238
239 protected void setPassword(String password) {
240 this.info.setPassword(password);
241 }
242
243 /**
244 * A static helper method to create a new connection
245 *
246 * @return an ActiveMQConnection
247 * @throws JMSException
248 */
249 public static ActiveMQConnection makeConnection() throws JMSException {
250 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
251 return (ActiveMQConnection)factory.createConnection();
252 }
253
254 /**
255 * A static helper method to create a new connection
256 *
257 * @param uri
258 * @return and ActiveMQConnection
259 * @throws JMSException
260 */
261 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
262 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
263 return (ActiveMQConnection)factory.createConnection();
264 }
265
266 /**
267 * A static helper method to create a new connection
268 *
269 * @param user
270 * @param password
271 * @param uri
272 * @return an ActiveMQConnection
273 * @throws JMSException
274 */
275 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
276 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
277 return (ActiveMQConnection)factory.createConnection();
278 }
279
280 /**
281 * @return a number unique for this connection
282 */
283 public JMSConnectionStatsImpl getConnectionStats() {
284 return stats;
285 }
286
287 /**
288 * Creates a <CODE>Session</CODE> object.
289 *
290 * @param transacted indicates whether the session is transacted
291 * @param acknowledgeMode indicates whether the consumer or the client will
292 * acknowledge any messages it receives; ignored if the
293 * session is transacted. Legal values are
294 * <code>Session.AUTO_ACKNOWLEDGE</code>,
295 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
296 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
297 * @return a newly created session
298 * @throws JMSException if the <CODE>Connection</CODE> object fails to
299 * create a session due to some internal error or lack of
300 * support for the specific transaction and acknowledgement
301 * mode.
302 * @see Session#AUTO_ACKNOWLEDGE
303 * @see Session#CLIENT_ACKNOWLEDGE
304 * @see Session#DUPS_OK_ACKNOWLEDGE
305 * @since 1.1
306 */
307 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
308 checkClosedOrFailed();
309 ensureConnectionInfoSent();
310 if(!transacted) {
311 if (acknowledgeMode==Session.SESSION_TRANSACTED) {
312 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
313 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
314 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
315 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
316 }
317 }
318 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
319 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
320 }
321
322 /**
323 * @return sessionId
324 */
325 protected SessionId getNextSessionId() {
326 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
327 }
328
329 /**
330 * Gets the client identifier for this connection.
331 * <P>
332 * This value is specific to the JMS provider. It is either preconfigured by
333 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
334 * dynamically by the application by calling the <code>setClientID</code>
335 * method.
336 *
337 * @return the unique client identifier
338 * @throws JMSException if the JMS provider fails to return the client ID
339 * for this connection due to some internal error.
340 */
341 public String getClientID() throws JMSException {
342 checkClosedOrFailed();
343 return this.info.getClientId();
344 }
345
346 /**
347 * Sets the client identifier for this connection.
348 * <P>
349 * The preferred way to assign a JMS client's client identifier is for it to
350 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
351 * object and transparently assigned to the <CODE>Connection</CODE> object
352 * it creates.
353 * <P>
354 * Alternatively, a client can set a connection's client identifier using a
355 * provider-specific value. The facility to set a connection's client
356 * identifier explicitly is not a mechanism for overriding the identifier
357 * that has been administratively configured. It is provided for the case
358 * where no administratively specified identifier exists. If one does exist,
359 * an attempt to change it by setting it must throw an
360 * <CODE>IllegalStateException</CODE>. If a client sets the client
361 * identifier explicitly, it must do so immediately after it creates the
362 * connection and before any other action on the connection is taken. After
363 * this point, setting the client identifier is a programming error that
364 * should throw an <CODE>IllegalStateException</CODE>.
365 * <P>
366 * The purpose of the client identifier is to associate a connection and its
367 * objects with a state maintained on behalf of the client by a provider.
368 * The only such state identified by the JMS API is that required to support
369 * durable subscriptions.
370 * <P>
371 * If another connection with the same <code>clientID</code> is already
372 * running when this method is called, the JMS provider should detect the
373 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
374 *
375 * @param newClientID the unique client identifier
376 * @throws JMSException if the JMS provider fails to set the client ID for
377 * this connection due to some internal error.
378 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
379 * invalid or duplicate client ID.
380 * @throws javax.jms.IllegalStateException if the JMS client attempts to set
381 * a connection's client ID at the wrong time or when it has
382 * been administratively configured.
383 */
384 public void setClientID(String newClientID) throws JMSException {
385 checkClosedOrFailed();
386
387 if (this.clientIDSet) {
388 throw new IllegalStateException("The clientID has already been set");
389 }
390
391 if (this.isConnectionInfoSentToBroker) {
392 throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
393 }
394
395 this.info.setClientId(newClientID);
396 this.userSpecifiedClientID = true;
397 ensureConnectionInfoSent();
398 }
399
400 /**
401 * Sets the default client id that the connection will use if explicitly not
402 * set with the setClientId() call.
403 */
404 public void setDefaultClientID(String clientID) throws JMSException {
405 this.info.setClientId(clientID);
406 this.userSpecifiedClientID = true;
407 }
408
409 /**
410 * Gets the metadata for this connection.
411 *
412 * @return the connection metadata
413 * @throws JMSException if the JMS provider fails to get the connection
414 * metadata for this connection.
415 * @see javax.jms.ConnectionMetaData
416 */
417 public ConnectionMetaData getMetaData() throws JMSException {
418 checkClosedOrFailed();
419 return ActiveMQConnectionMetaData.INSTANCE;
420 }
421
422 /**
423 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
424 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
425 * associated with it.
426 *
427 * @return the <CODE>ExceptionListener</CODE> for this connection, or
428 * null, if no <CODE>ExceptionListener</CODE> is associated with
429 * this connection.
430 * @throws JMSException if the JMS provider fails to get the
431 * <CODE>ExceptionListener</CODE> for this connection.
432 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
433 */
434 public ExceptionListener getExceptionListener() throws JMSException {
435 checkClosedOrFailed();
436 return this.exceptionListener;
437 }
438
439 /**
440 * Sets an exception listener for this connection.
441 * <P>
442 * If a JMS provider detects a serious problem with a connection, it informs
443 * the connection's <CODE> ExceptionListener</CODE>, if one has been
444 * registered. It does this by calling the listener's <CODE>onException
445 * </CODE>
446 * method, passing it a <CODE>JMSException</CODE> object describing the
447 * problem.
448 * <P>
449 * An exception listener allows a client to be notified of a problem
450 * asynchronously. Some connections only consume messages, so they would
451 * have no other way to learn their connection has failed.
452 * <P>
453 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
454 * <P>
455 * A JMS provider should attempt to resolve connection problems itself
456 * before it notifies the client of them.
457 *
458 * @param listener the exception listener
459 * @throws JMSException if the JMS provider fails to set the exception
460 * listener for this connection.
461 */
462 public void setExceptionListener(ExceptionListener listener) throws JMSException {
463 checkClosedOrFailed();
464 this.exceptionListener = listener;
465 }
466
467 /**
468 * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
469 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
470 * associated with it.
471 *
472 * @return the listener or <code>null</code> if no listener is registered with the connection.
473 */
474 public ClientInternalExceptionListener getClientInternalExceptionListener()
475 {
476 return clientInternalExceptionListener;
477 }
478
479 /**
480 * Sets a client internal exception listener for this connection.
481 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
482 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
483 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
484 * describing the problem.
485 *
486 * @param listener the exception listener
487 */
488 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
489 {
490 this.clientInternalExceptionListener = listener;
491 }
492
493 /**
494 * Starts (or restarts) a connection's delivery of incoming messages. A call
495 * to <CODE>start</CODE> on a connection that has already been started is
496 * ignored.
497 *
498 * @throws JMSException if the JMS provider fails to start message delivery
499 * due to some internal error.
500 * @see javax.jms.Connection#stop()
501 */
502 public void start() throws JMSException {
503 checkClosedOrFailed();
504 ensureConnectionInfoSent();
505 if (started.compareAndSet(false, true)) {
506 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
507 ActiveMQSession session = i.next();
508 session.start();
509 }
510 }
511 }
512
513 /**
514 * Temporarily stops a connection's delivery of incoming messages. Delivery
515 * can be restarted using the connection's <CODE>start</CODE> method. When
516 * the connection is stopped, delivery to all the connection's message
517 * consumers is inhibited: synchronous receives block, and messages are not
518 * delivered to message listeners.
519 * <P>
520 * This call blocks until receives and/or message listeners in progress have
521 * completed.
522 * <P>
523 * Stopping a connection has no effect on its ability to send messages. A
524 * call to <CODE>stop</CODE> on a connection that has already been stopped
525 * is ignored.
526 * <P>
527 * A call to <CODE>stop</CODE> must not return until delivery of messages
528 * has paused. This means that a client can rely on the fact that none of
529 * its message listeners will be called and that all threads of control
530 * waiting for <CODE>receive</CODE> calls to return will not return with a
531 * message until the connection is restarted. The receive timers for a
532 * stopped connection continue to advance, so receives may time out while
533 * the connection is stopped.
534 * <P>
535 * If message listeners are running when <CODE>stop</CODE> is invoked, the
536 * <CODE>stop</CODE> call must wait until all of them have returned before
537 * it may return. While these message listeners are completing, they must
538 * have the full services of the connection available to them.
539 *
540 * @throws JMSException if the JMS provider fails to stop message delivery
541 * due to some internal error.
542 * @see javax.jms.Connection#start()
543 */
544 public void stop() throws JMSException {
545 checkClosedOrFailed();
546 if (started.compareAndSet(true, false)) {
547 synchronized(sessions) {
548 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
549 ActiveMQSession s = i.next();
550 s.stop();
551 }
552 }
553 }
554 }
555
556 /**
557 * Closes the connection.
558 * <P>
559 * Since a provider typically allocates significant resources outside the
560 * JVM on behalf of a connection, clients should close these resources when
561 * they are not needed. Relying on garbage collection to eventually reclaim
562 * these resources may not be timely enough.
563 * <P>
564 * There is no need to close the sessions, producers, and consumers of a
565 * closed connection.
566 * <P>
567 * Closing a connection causes all temporary destinations to be deleted.
568 * <P>
569 * When this method is invoked, it should not return until message
570 * processing has been shut down in an orderly fashion. This means that all
571 * message listeners that may have been running have returned, and that all
572 * pending receives have returned. A close terminates all pending message
573 * receives on the connection's sessions' consumers. The receives may return
574 * with a message or with null, depending on whether there was a message
575 * available at the time of the close. If one or more of the connection's
576 * sessions' message listeners is processing a message at the time when
577 * connection <CODE>close</CODE> is invoked, all the facilities of the
578 * connection and its sessions must remain available to those listeners
579 * until they return control to the JMS provider.
580 * <P>
581 * Closing a connection causes any of its sessions' transactions in progress
582 * to be rolled back. In the case where a session's work is coordinated by
583 * an external transaction manager, a session's <CODE>commit</CODE> and
584 * <CODE> rollback</CODE> methods are not used and the result of a closed
585 * session's work is determined later by the transaction manager. Closing a
586 * connection does NOT force an acknowledgment of client-acknowledged
587 * sessions.
588 * <P>
589 * Invoking the <CODE>acknowledge</CODE> method of a received message from
590 * a closed connection's session must throw an
591 * <CODE>IllegalStateException</CODE>. Closing a closed connection must
592 * NOT throw an exception.
593 *
594 * @throws JMSException if the JMS provider fails to close the connection
595 * due to some internal error. For example, a failure to
596 * release resources or to close a socket connection can
597 * cause this exception to be thrown.
598 */
599 public void close() throws JMSException {
600 try {
601 // If we were running, lets stop first.
602 if (!closed.get() && !transportFailed.get()) {
603 stop();
604 }
605
606 synchronized (this) {
607 if (!closed.get()) {
608 closing.set(true);
609
610 if (destinationSource != null) {
611 destinationSource.stop();
612 destinationSource = null;
613 }
614 if (advisoryConsumer != null) {
615 advisoryConsumer.dispose();
616 advisoryConsumer = null;
617 }
618 if (this.scheduler != null) {
619 try {
620 this.scheduler.stop();
621 } catch (Exception e) {
622 JMSException ex = JMSExceptionSupport.create(e);
623 throw ex;
624 }
625 }
626
627 long lastDeliveredSequenceId = 0;
628 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
629 ActiveMQSession s = i.next();
630 s.dispose();
631 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
632 }
633 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
634 ActiveMQConnectionConsumer c = i.next();
635 c.dispose();
636 }
637 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
638 ActiveMQInputStream c = i.next();
639 c.dispose();
640 }
641 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
642 ActiveMQOutputStream c = i.next();
643 c.dispose();
644 }
645
646 // As TemporaryQueue and TemporaryTopic instances are bound
647 // to a connection we should just delete them after the connection
648 // is closed to free up memory
649 for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
650 ActiveMQTempDestination c = i.next();
651 c.delete();
652 }
653
654 if (isConnectionInfoSentToBroker) {
655 // If we announced ourselfs to the broker.. Try to let
656 // the broker
657 // know that the connection is being shutdown.
658 RemoveInfo removeCommand = info.createRemoveCommand();
659 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
660 doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
661 doAsyncSendPacket(new ShutdownInfo());
662 }
663
664 ServiceSupport.dispose(this.transport);
665
666 started.set(false);
667
668 // TODO if we move the TaskRunnerFactory to the connection
669 // factory
670 // then we may need to call
671 // factory.onConnectionClose(this);
672 if (sessionTaskRunner != null) {
673 sessionTaskRunner.shutdown();
674 }
675 closed.set(true);
676 closing.set(false);
677 }
678 }
679 } finally {
680 try {
681 if (executor != null){
682 executor.shutdown();
683 }
684 }catch(Throwable e) {
685 LOG.error("Error shutting down thread pool " + e,e);
686 }
687 factoryStats.removeConnection(this);
688 }
689 }
690
691 /**
692 * Tells the broker to terminate its VM. This can be used to cleanly
693 * terminate a broker running in a standalone java process. Server must have
694 * property enable.vm.shutdown=true defined to allow this to work.
695 */
696 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
697 // implemented.
698 /*
699 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
700 * command = new BrokerAdminCommand();
701 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
702 * asyncSendPacket(command); }
703 */
704
705 /**
706 * Create a durable connection consumer for this connection (optional
707 * operation). This is an expert facility not used by regular JMS clients.
708 *
709 * @param topic topic to access
710 * @param subscriptionName durable subscription name
711 * @param messageSelector only messages with properties matching the message
712 * selector expression are delivered. A value of null or an
713 * empty string indicates that there is no message selector
714 * for the message consumer.
715 * @param sessionPool the server session pool to associate with this durable
716 * connection consumer
717 * @param maxMessages the maximum number of messages that can be assigned to
718 * a server session at one time
719 * @return the durable connection consumer
720 * @throws JMSException if the <CODE>Connection</CODE> object fails to
721 * create a connection consumer due to some internal error
722 * or invalid arguments for <CODE>sessionPool</CODE> and
723 * <CODE>messageSelector</CODE>.
724 * @throws javax.jms.InvalidDestinationException if an invalid destination
725 * is specified.
726 * @throws javax.jms.InvalidSelectorException if the message selector is
727 * invalid.
728 * @see javax.jms.ConnectionConsumer
729 * @since 1.1
730 */
731 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
732 throws JMSException {
733 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
734 }
735
736 /**
737 * Create a durable connection consumer for this connection (optional
738 * operation). This is an expert facility not used by regular JMS clients.
739 *
740 * @param topic topic to access
741 * @param subscriptionName durable subscription name
742 * @param messageSelector only messages with properties matching the message
743 * selector expression are delivered. A value of null or an
744 * empty string indicates that there is no message selector
745 * for the message consumer.
746 * @param sessionPool the server session pool to associate with this durable
747 * connection consumer
748 * @param maxMessages the maximum number of messages that can be assigned to
749 * a server session at one time
750 * @param noLocal set true if you want to filter out messages published
751 * locally
752 * @return the durable connection consumer
753 * @throws JMSException if the <CODE>Connection</CODE> object fails to
754 * create a connection consumer due to some internal error
755 * or invalid arguments for <CODE>sessionPool</CODE> and
756 * <CODE>messageSelector</CODE>.
757 * @throws javax.jms.InvalidDestinationException if an invalid destination
758 * is specified.
759 * @throws javax.jms.InvalidSelectorException if the message selector is
760 * invalid.
761 * @see javax.jms.ConnectionConsumer
762 * @since 1.1
763 */
764 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
765 boolean noLocal) throws JMSException {
766 checkClosedOrFailed();
767 ensureConnectionInfoSent();
768 SessionId sessionId = new SessionId(info.getConnectionId(), -1);
769 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
770 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
771 info.setSubscriptionName(subscriptionName);
772 info.setSelector(messageSelector);
773 info.setPrefetchSize(maxMessages);
774 info.setDispatchAsync(isDispatchAsync());
775
776 // Allows the options on the destination to configure the consumerInfo
777 if (info.getDestination().getOptions() != null) {
778 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
779 IntrospectionSupport.setProperties(this.info, options, "consumer.");
780 }
781
782 return new ActiveMQConnectionConsumer(this, sessionPool, info);
783 }
784
785 // Properties
786 // -------------------------------------------------------------------------
787
788 /**
789 * Returns true if this connection has been started
790 *
791 * @return true if this Connection is started
792 */
793 public boolean isStarted() {
794 return started.get();
795 }
796
797 /**
798 * Returns true if the connection is closed
799 */
800 public boolean isClosed() {
801 return closed.get();
802 }
803
804 /**
805 * Returns true if the connection is in the process of being closed
806 */
807 public boolean isClosing() {
808 return closing.get();
809 }
810
811 /**
812 * Returns true if the underlying transport has failed
813 */
814 public boolean isTransportFailed() {
815 return transportFailed.get();
816 }
817
818 /**
819 * @return Returns the prefetchPolicy.
820 */
821 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
822 return prefetchPolicy;
823 }
824
825 /**
826 * Sets the <a
827 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
828 * policy</a> for consumers created by this connection.
829 */
830 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
831 this.prefetchPolicy = prefetchPolicy;
832 }
833
834 /**
835 */
836 public Transport getTransportChannel() {
837 return transport;
838 }
839
840 /**
841 * @return Returns the clientID of the connection, forcing one to be
842 * generated if one has not yet been configured.
843 */
844 public String getInitializedClientID() throws JMSException {
845 ensureConnectionInfoSent();
846 return info.getClientId();
847 }
848
849 /**
850 * @return Returns the timeStampsDisableByDefault.
851 */
852 public boolean isDisableTimeStampsByDefault() {
853 return disableTimeStampsByDefault;
854 }
855
856 /**
857 * Sets whether or not timestamps on messages should be disabled or not. If
858 * you disable them it adds a small performance boost.
859 */
860 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
861 this.disableTimeStampsByDefault = timeStampsDisableByDefault;
862 }
863
864 /**
865 * @return Returns the dispatchOptimizedMessage.
866 */
867 public boolean isOptimizedMessageDispatch() {
868 return optimizedMessageDispatch;
869 }
870
871 /**
872 * If this flag is set then an larger prefetch limit is used - only
873 * applicable for durable topic subscribers.
874 */
875 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
876 this.optimizedMessageDispatch = dispatchOptimizedMessage;
877 }
878
879 /**
880 * @return Returns the closeTimeout.
881 */
882 public int getCloseTimeout() {
883 return closeTimeout;
884 }
885
886 /**
887 * Sets the timeout before a close is considered complete. Normally a
888 * close() on a connection waits for confirmation from the broker; this
889 * allows that operation to timeout to save the client hanging if there is
890 * no broker
891 */
892 public void setCloseTimeout(int closeTimeout) {
893 this.closeTimeout = closeTimeout;
894 }
895
896 /**
897 * @return ConnectionInfo
898 */
899 public ConnectionInfo getConnectionInfo() {
900 return this.info;
901 }
902
903 public boolean isUseRetroactiveConsumer() {
904 return useRetroactiveConsumer;
905 }
906
907 /**
908 * Sets whether or not retroactive consumers are enabled. Retroactive
909 * consumers allow non-durable topic subscribers to receive old messages
910 * that were published before the non-durable subscriber started.
911 */
912 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
913 this.useRetroactiveConsumer = useRetroactiveConsumer;
914 }
915
916 public boolean isNestedMapAndListEnabled() {
917 return nestedMapAndListEnabled;
918 }
919
920 /**
921 * Enables/disables whether or not Message properties and MapMessage entries
922 * support <a
923 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
924 * Structures</a> of Map and List objects
925 */
926 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
927 this.nestedMapAndListEnabled = structuredMapsEnabled;
928 }
929
930 public boolean isExclusiveConsumer() {
931 return exclusiveConsumer;
932 }
933
934 /**
935 * Enables or disables whether or not queue consumers should be exclusive or
936 * not for example to preserve ordering when not using <a
937 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
938 *
939 * @param exclusiveConsumer
940 */
941 public void setExclusiveConsumer(boolean exclusiveConsumer) {
942 this.exclusiveConsumer = exclusiveConsumer;
943 }
944
945 /**
946 * Adds a transport listener so that a client can be notified of events in
947 * the underlying transport
948 */
949 public void addTransportListener(TransportListener transportListener) {
950 transportListeners.add(transportListener);
951 }
952
953 public void removeTransportListener(TransportListener transportListener) {
954 transportListeners.remove(transportListener);
955 }
956
957 public boolean isUseDedicatedTaskRunner() {
958 return useDedicatedTaskRunner;
959 }
960
961 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
962 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
963 }
964
965 public TaskRunnerFactory getSessionTaskRunner() {
966 synchronized (this) {
967 if (sessionTaskRunner == null) {
968 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
969 }
970 }
971 return sessionTaskRunner;
972 }
973
974 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
975 this.sessionTaskRunner = sessionTaskRunner;
976 }
977
978 public MessageTransformer getTransformer() {
979 return transformer;
980 }
981
982 /**
983 * Sets the transformer used to transform messages before they are sent on
984 * to the JMS bus or when they are received from the bus but before they are
985 * delivered to the JMS client
986 */
987 public void setTransformer(MessageTransformer transformer) {
988 this.transformer = transformer;
989 }
990
991 /**
992 * @return the statsEnabled
993 */
994 public boolean isStatsEnabled() {
995 return this.stats.isEnabled();
996 }
997
998 /**
999 * @param statsEnabled the statsEnabled to set
1000 */
1001 public void setStatsEnabled(boolean statsEnabled) {
1002 this.stats.setEnabled(statsEnabled);
1003 }
1004
1005 /**
1006 * Returns the {@link DestinationSource} object which can be used to listen to destinations
1007 * being created or destroyed or to enquire about the current destinations available on the broker
1008 *
1009 * @return a lazily created destination source
1010 * @throws JMSException
1011 */
1012 public DestinationSource getDestinationSource() throws JMSException {
1013 if (destinationSource == null) {
1014 destinationSource = new DestinationSource(this);
1015 destinationSource.start();
1016 }
1017 return destinationSource;
1018 }
1019
1020 // Implementation methods
1021 // -------------------------------------------------------------------------
1022
1023 /**
1024 * Used internally for adding Sessions to the Connection
1025 *
1026 * @param session
1027 * @throws JMSException
1028 * @throws JMSException
1029 */
1030 protected void addSession(ActiveMQSession session) throws JMSException {
1031 this.sessions.add(session);
1032 if (sessions.size() > 1 || session.isTransacted()) {
1033 optimizedMessageDispatch = false;
1034 }
1035 }
1036
1037 /**
1038 * Used interanlly for removing Sessions from a Connection
1039 *
1040 * @param session
1041 */
1042 protected void removeSession(ActiveMQSession session) {
1043 this.sessions.remove(session);
1044 this.removeDispatcher(session);
1045 }
1046
1047 /**
1048 * Add a ConnectionConsumer
1049 *
1050 * @param connectionConsumer
1051 * @throws JMSException
1052 */
1053 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1054 this.connectionConsumers.add(connectionConsumer);
1055 }
1056
1057 /**
1058 * Remove a ConnectionConsumer
1059 *
1060 * @param connectionConsumer
1061 */
1062 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1063 this.connectionConsumers.remove(connectionConsumer);
1064 this.removeDispatcher(connectionConsumer);
1065 }
1066
1067 /**
1068 * Creates a <CODE>TopicSession</CODE> object.
1069 *
1070 * @param transacted indicates whether the session is transacted
1071 * @param acknowledgeMode indicates whether the consumer or the client will
1072 * acknowledge any messages it receives; ignored if the
1073 * session is transacted. Legal values are
1074 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1075 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1076 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1077 * @return a newly created topic session
1078 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1079 * to create a session due to some internal error or lack of
1080 * support for the specific transaction and acknowledgement
1081 * mode.
1082 * @see Session#AUTO_ACKNOWLEDGE
1083 * @see Session#CLIENT_ACKNOWLEDGE
1084 * @see Session#DUPS_OK_ACKNOWLEDGE
1085 */
1086 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1087 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1088 }
1089
1090 /**
1091 * Creates a connection consumer for this connection (optional operation).
1092 * This is an expert facility not used by regular JMS clients.
1093 *
1094 * @param topic the topic to access
1095 * @param messageSelector only messages with properties matching the message
1096 * selector expression are delivered. A value of null or an
1097 * empty string indicates that there is no message selector
1098 * for the message consumer.
1099 * @param sessionPool the server session pool to associate with this
1100 * connection consumer
1101 * @param maxMessages the maximum number of messages that can be assigned to
1102 * a server session at one time
1103 * @return the connection consumer
1104 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1105 * to create a connection consumer due to some internal
1106 * error or invalid arguments for <CODE>sessionPool</CODE>
1107 * and <CODE>messageSelector</CODE>.
1108 * @throws javax.jms.InvalidDestinationException if an invalid topic is
1109 * specified.
1110 * @throws javax.jms.InvalidSelectorException if the message selector is
1111 * invalid.
1112 * @see javax.jms.ConnectionConsumer
1113 */
1114 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1115 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1116 }
1117
1118 /**
1119 * Creates a connection consumer for this connection (optional operation).
1120 * This is an expert facility not used by regular JMS clients.
1121 *
1122 * @param queue the queue to access
1123 * @param messageSelector only messages with properties matching the message
1124 * selector expression are delivered. A value of null or an
1125 * empty string indicates that there is no message selector
1126 * for the message consumer.
1127 * @param sessionPool the server session pool to associate with this
1128 * connection consumer
1129 * @param maxMessages the maximum number of messages that can be assigned to
1130 * a server session at one time
1131 * @return the connection consumer
1132 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1133 * to create a connection consumer due to some internal
1134 * error or invalid arguments for <CODE>sessionPool</CODE>
1135 * and <CODE>messageSelector</CODE>.
1136 * @throws javax.jms.InvalidDestinationException if an invalid queue is
1137 * specified.
1138 * @throws javax.jms.InvalidSelectorException if the message selector is
1139 * invalid.
1140 * @see javax.jms.ConnectionConsumer
1141 */
1142 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1143 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1144 }
1145
1146 /**
1147 * Creates a connection consumer for this connection (optional operation).
1148 * This is an expert facility not used by regular JMS clients.
1149 *
1150 * @param destination the destination to access
1151 * @param messageSelector only messages with properties matching the message
1152 * selector expression are delivered. A value of null or an
1153 * empty string indicates that there is no message selector
1154 * for the message consumer.
1155 * @param sessionPool the server session pool to associate with this
1156 * connection consumer
1157 * @param maxMessages the maximum number of messages that can be assigned to
1158 * a server session at one time
1159 * @return the connection consumer
1160 * @throws JMSException if the <CODE>Connection</CODE> object fails to
1161 * create a connection consumer due to some internal error
1162 * or invalid arguments for <CODE>sessionPool</CODE> and
1163 * <CODE>messageSelector</CODE>.
1164 * @throws javax.jms.InvalidDestinationException if an invalid destination
1165 * is specified.
1166 * @throws javax.jms.InvalidSelectorException if the message selector is
1167 * invalid.
1168 * @see javax.jms.ConnectionConsumer
1169 * @since 1.1
1170 */
1171 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1172 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1173 }
1174
1175 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1176 throws JMSException {
1177
1178 checkClosedOrFailed();
1179 ensureConnectionInfoSent();
1180
1181 ConsumerId consumerId = createConsumerId();
1182 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1183 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1184 consumerInfo.setSelector(messageSelector);
1185 consumerInfo.setPrefetchSize(maxMessages);
1186 consumerInfo.setNoLocal(noLocal);
1187 consumerInfo.setDispatchAsync(isDispatchAsync());
1188
1189 // Allows the options on the destination to configure the consumerInfo
1190 if (consumerInfo.getDestination().getOptions() != null) {
1191 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1192 IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1193 }
1194
1195 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1196 }
1197
1198 /**
1199 * @return
1200 */
1201 private ConsumerId createConsumerId() {
1202 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1203 }
1204
1205 /**
1206 * @return
1207 */
1208 private ProducerId createProducerId() {
1209 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1210 }
1211
1212 /**
1213 * Creates a <CODE>QueueSession</CODE> object.
1214 *
1215 * @param transacted indicates whether the session is transacted
1216 * @param acknowledgeMode indicates whether the consumer or the client will
1217 * acknowledge any messages it receives; ignored if the
1218 * session is transacted. Legal values are
1219 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1220 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1221 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1222 * @return a newly created queue session
1223 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1224 * to create a session due to some internal error or lack of
1225 * support for the specific transaction and acknowledgement
1226 * mode.
1227 * @see Session#AUTO_ACKNOWLEDGE
1228 * @see Session#CLIENT_ACKNOWLEDGE
1229 * @see Session#DUPS_OK_ACKNOWLEDGE
1230 */
1231 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1232 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1233 }
1234
1235 /**
1236 * Ensures that the clientID was manually specified and not auto-generated.
1237 * If the clientID was not specified this method will throw an exception.
1238 * This method is used to ensure that the clientID + durableSubscriber name
1239 * are used correctly.
1240 *
1241 * @throws JMSException
1242 */
1243 public void checkClientIDWasManuallySpecified() throws JMSException {
1244 if (!userSpecifiedClientID) {
1245 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1246 }
1247 }
1248
1249 /**
1250 * send a Packet through the Connection - for internal use only
1251 *
1252 * @param command
1253 * @throws JMSException
1254 */
1255 public void asyncSendPacket(Command command) throws JMSException {
1256 if (isClosed()) {
1257 throw new ConnectionClosedException();
1258 } else {
1259 doAsyncSendPacket(command);
1260 }
1261 }
1262
1263 private void doAsyncSendPacket(Command command) throws JMSException {
1264 try {
1265 this.transport.oneway(command);
1266 } catch (IOException e) {
1267 throw JMSExceptionSupport.create(e);
1268 }
1269 }
1270
1271 /**
1272 * Send a packet through a Connection - for internal use only
1273 *
1274 * @param command
1275 * @return
1276 * @throws JMSException
1277 */
1278 public Response syncSendPacket(Command command) throws JMSException {
1279 if (isClosed()) {
1280 throw new ConnectionClosedException();
1281 } else {
1282
1283 try {
1284 Response response = (Response)this.transport.request(command);
1285 if (response.isException()) {
1286 ExceptionResponse er = (ExceptionResponse)response;
1287 if (er.getException() instanceof JMSException) {
1288 throw (JMSException)er.getException();
1289 } else {
1290 if (isClosed()||closing.get()) {
1291 LOG.debug("Received an exception but connection is closing");
1292 }
1293 JMSException jmsEx = null;
1294 try {
1295 jmsEx = JMSExceptionSupport.create(er.getException());
1296 }catch(Throwable e) {
1297 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1298 }
1299 //dispose of transport for security exceptions
1300 if (er.getException() instanceof SecurityException){
1301 Transport t = this.transport;
1302 if (null != t){
1303 ServiceSupport.dispose(t);
1304 }
1305 }
1306 if(jmsEx !=null) {
1307 throw jmsEx;
1308 }
1309 }
1310 }
1311 return response;
1312 } catch (IOException e) {
1313 throw JMSExceptionSupport.create(e);
1314 }
1315 }
1316 }
1317
1318 /**
1319 * Send a packet through a Connection - for internal use only
1320 *
1321 * @param command
1322 * @return
1323 * @throws JMSException
1324 */
1325 public Response syncSendPacket(Command command, int timeout) throws JMSException {
1326 if (isClosed() || closing.get()) {
1327 throw new ConnectionClosedException();
1328 } else {
1329 return doSyncSendPacket(command, timeout);
1330 }
1331 }
1332
1333 private Response doSyncSendPacket(Command command, int timeout)
1334 throws JMSException {
1335 try {
1336 Response response = (Response) (timeout > 0
1337 ? this.transport.request(command, timeout)
1338 : this.transport.request(command));
1339 if (response != null && response.isException()) {
1340 ExceptionResponse er = (ExceptionResponse)response;
1341 if (er.getException() instanceof JMSException) {
1342 throw (JMSException)er.getException();
1343 } else {
1344 throw JMSExceptionSupport.create(er.getException());
1345 }
1346 }
1347 return response;
1348 } catch (IOException e) {
1349 throw JMSExceptionSupport.create(e);
1350 }
1351 }
1352
1353 /**
1354 * @return statistics for this Connection
1355 */
1356 public StatsImpl getStats() {
1357 return stats;
1358 }
1359
1360 /**
1361 * simply throws an exception if the Connection is already closed or the
1362 * Transport has failed
1363 *
1364 * @throws JMSException
1365 */
1366 protected synchronized void checkClosedOrFailed() throws JMSException {
1367 checkClosed();
1368 if (transportFailed.get()) {
1369 throw new ConnectionFailedException(firstFailureError);
1370 }
1371 }
1372
1373 /**
1374 * simply throws an exception if the Connection is already closed
1375 *
1376 * @throws JMSException
1377 */
1378 protected synchronized void checkClosed() throws JMSException {
1379 if (closed.get()) {
1380 throw new ConnectionClosedException();
1381 }
1382 }
1383
1384 /**
1385 * Send the ConnectionInfo to the Broker
1386 *
1387 * @throws JMSException
1388 */
1389 protected void ensureConnectionInfoSent() throws JMSException {
1390 synchronized(this.ensureConnectionInfoSentMutex) {
1391 // Can we skip sending the ConnectionInfo packet??
1392 if (isConnectionInfoSentToBroker || closed.get()) {
1393 return;
1394 }
1395 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1396 if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1397 info.setClientId(clientIdGenerator.generateId());
1398 }
1399 syncSendPacket(info.copy());
1400
1401 this.isConnectionInfoSentToBroker = true;
1402 // Add a temp destination advisory consumer so that
1403 // We know what the valid temporary destinations are on the
1404 // broker without having to do an RPC to the broker.
1405
1406 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1407 if (watchTopicAdvisories) {
1408 advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1409 }
1410 }
1411 }
1412
1413 public synchronized boolean isWatchTopicAdvisories() {
1414 return watchTopicAdvisories;
1415 }
1416
1417 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1418 this.watchTopicAdvisories = watchTopicAdvisories;
1419 }
1420
1421 /**
1422 * @return Returns the useAsyncSend.
1423 */
1424 public boolean isUseAsyncSend() {
1425 return useAsyncSend;
1426 }
1427
1428 /**
1429 * Forces the use of <a
1430 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1431 * adds a massive performance boost; but means that the send() method will
1432 * return immediately whether the message has been sent or not which could
1433 * lead to message loss.
1434 */
1435 public void setUseAsyncSend(boolean useAsyncSend) {
1436 this.useAsyncSend = useAsyncSend;
1437 }
1438
1439 /**
1440 * @return true if always sync send messages
1441 */
1442 public boolean isAlwaysSyncSend() {
1443 return this.alwaysSyncSend;
1444 }
1445
1446 /**
1447 * Set true if always require messages to be sync sent
1448 *
1449 * @param alwaysSyncSend
1450 */
1451 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1452 this.alwaysSyncSend = alwaysSyncSend;
1453 }
1454
1455 /**
1456 * @return the messagePrioritySupported
1457 */
1458 public boolean isMessagePrioritySupported() {
1459 return this.messagePrioritySupported;
1460 }
1461
1462 /**
1463 * @param messagePrioritySupported the messagePrioritySupported to set
1464 */
1465 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1466 this.messagePrioritySupported = messagePrioritySupported;
1467 }
1468
1469 /**
1470 * Cleans up this connection so that it's state is as if the connection was
1471 * just created. This allows the Resource Adapter to clean up a connection
1472 * so that it can be reused without having to close and recreate the
1473 * connection.
1474 */
1475 public void cleanup() throws JMSException {
1476
1477 if (advisoryConsumer != null && !isTransportFailed()) {
1478 advisoryConsumer.dispose();
1479 advisoryConsumer = null;
1480 }
1481
1482 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1483 ActiveMQSession s = i.next();
1484 s.dispose();
1485 }
1486 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1487 ActiveMQConnectionConsumer c = i.next();
1488 c.dispose();
1489 }
1490 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1491 ActiveMQInputStream c = i.next();
1492 c.dispose();
1493 }
1494 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1495 ActiveMQOutputStream c = i.next();
1496 c.dispose();
1497 }
1498
1499 if (isConnectionInfoSentToBroker) {
1500 if (!transportFailed.get() && !closing.get()) {
1501 syncSendPacket(info.createRemoveCommand());
1502 }
1503 isConnectionInfoSentToBroker = false;
1504 }
1505 if (userSpecifiedClientID) {
1506 info.setClientId(null);
1507 userSpecifiedClientID = false;
1508 }
1509 clientIDSet = false;
1510
1511 started.set(false);
1512 }
1513
1514 public void finalize() throws Throwable{
1515 if (scheduler != null){
1516 scheduler.stop();
1517 }
1518 }
1519
1520 /**
1521 * Changes the associated username/password that is associated with this
1522 * connection. If the connection has been used, you must called cleanup()
1523 * before calling this method.
1524 *
1525 * @throws IllegalStateException if the connection is in used.
1526 */
1527 public void changeUserInfo(String userName, String password) throws JMSException {
1528 if (isConnectionInfoSentToBroker) {
1529 throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1530 }
1531 this.info.setUserName(userName);
1532 this.info.setPassword(password);
1533 }
1534
1535 /**
1536 * @return Returns the resourceManagerId.
1537 * @throws JMSException
1538 */
1539 public String getResourceManagerId() throws JMSException {
1540 waitForBrokerInfo();
1541 if (brokerInfo == null) {
1542 throw new JMSException("Connection failed before Broker info was received.");
1543 }
1544 return brokerInfo.getBrokerId().getValue();
1545 }
1546
1547 /**
1548 * Returns the broker name if one is available or null if one is not
1549 * available yet.
1550 */
1551 public String getBrokerName() {
1552 try {
1553 brokerInfoReceived.await(5, TimeUnit.SECONDS);
1554 if (brokerInfo == null) {
1555 return null;
1556 }
1557 return brokerInfo.getBrokerName();
1558 } catch (InterruptedException e) {
1559 Thread.currentThread().interrupt();
1560 return null;
1561 }
1562 }
1563
1564 /**
1565 * Returns the broker information if it is available or null if it is not
1566 * available yet.
1567 */
1568 public BrokerInfo getBrokerInfo() {
1569 return brokerInfo;
1570 }
1571
1572 /**
1573 * @return Returns the RedeliveryPolicy.
1574 * @throws JMSException
1575 */
1576 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1577 return redeliveryPolicy;
1578 }
1579
1580 /**
1581 * Sets the redelivery policy to be used when messages are rolled back
1582 */
1583 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1584 this.redeliveryPolicy = redeliveryPolicy;
1585 }
1586
1587 public BlobTransferPolicy getBlobTransferPolicy() {
1588 if (blobTransferPolicy == null) {
1589 blobTransferPolicy = createBlobTransferPolicy();
1590 }
1591 return blobTransferPolicy;
1592 }
1593
1594 /**
1595 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1596 * OBjects) are transferred from producers to brokers to consumers
1597 */
1598 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1599 this.blobTransferPolicy = blobTransferPolicy;
1600 }
1601
1602 /**
1603 * @return Returns the alwaysSessionAsync.
1604 */
1605 public boolean isAlwaysSessionAsync() {
1606 return alwaysSessionAsync;
1607 }
1608
1609 /**
1610 * If this flag is set then a separate thread is not used for dispatching
1611 * messages for each Session in the Connection. However, a separate thread
1612 * is always used if there is more than one session, or the session isn't in
1613 * auto acknowledge or duplicates ok mode
1614 */
1615 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1616 this.alwaysSessionAsync = alwaysSessionAsync;
1617 }
1618
1619 /**
1620 * @return Returns the optimizeAcknowledge.
1621 */
1622 public boolean isOptimizeAcknowledge() {
1623 return optimizeAcknowledge;
1624 }
1625
1626 /**
1627 * Enables an optimised acknowledgement mode where messages are acknowledged
1628 * in batches rather than individually
1629 *
1630 * @param optimizeAcknowledge The optimizeAcknowledge to set.
1631 */
1632 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1633 this.optimizeAcknowledge = optimizeAcknowledge;
1634 }
1635
1636 public long getWarnAboutUnstartedConnectionTimeout() {
1637 return warnAboutUnstartedConnectionTimeout;
1638 }
1639
1640 /**
1641 * Enables the timeout from a connection creation to when a warning is
1642 * generated if the connection is not properly started via {@link #start()}
1643 * and a message is received by a consumer. It is a very common gotcha to
1644 * forget to <a
1645 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1646 * the connection</a> so this option makes the default case to create a
1647 * warning if the user forgets. To disable the warning just set the value to <
1648 * 0 (say -1).
1649 */
1650 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1651 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1652 }
1653
1654 /**
1655 * @return the sendTimeout
1656 */
1657 public int getSendTimeout() {
1658 return sendTimeout;
1659 }
1660
1661 /**
1662 * @param sendTimeout the sendTimeout to set
1663 */
1664 public void setSendTimeout(int sendTimeout) {
1665 this.sendTimeout = sendTimeout;
1666 }
1667
1668 /**
1669 * @return the sendAcksAsync
1670 */
1671 public boolean isSendAcksAsync() {
1672 return sendAcksAsync;
1673 }
1674
1675 /**
1676 * @param sendAcksAsync the sendAcksAsync to set
1677 */
1678 public void setSendAcksAsync(boolean sendAcksAsync) {
1679 this.sendAcksAsync = sendAcksAsync;
1680 }
1681
1682
1683 /**
1684 * Returns the time this connection was created
1685 */
1686 public long getTimeCreated() {
1687 return timeCreated;
1688 }
1689
1690 private void waitForBrokerInfo() throws JMSException {
1691 try {
1692 brokerInfoReceived.await();
1693 } catch (InterruptedException e) {
1694 Thread.currentThread().interrupt();
1695 throw JMSExceptionSupport.create(e);
1696 }
1697 }
1698
1699 // Package protected so that it can be used in unit tests
1700 public Transport getTransport() {
1701 return transport;
1702 }
1703
1704 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1705 producers.put(producerId, producer);
1706 }
1707
1708 public void removeProducer(ProducerId producerId) {
1709 producers.remove(producerId);
1710 }
1711
1712 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1713 dispatchers.put(consumerId, dispatcher);
1714 }
1715
1716 public void removeDispatcher(ConsumerId consumerId) {
1717 dispatchers.remove(consumerId);
1718 }
1719
1720 /**
1721 * @param o - the command to consume
1722 */
1723 public void onCommand(final Object o) {
1724 final Command command = (Command)o;
1725 if (!closed.get() && command != null) {
1726 try {
1727 command.visit(new CommandVisitorAdapter() {
1728 @Override
1729 public Response processMessageDispatch(MessageDispatch md) throws Exception {
1730 waitForTransportInterruptionProcessingToComplete();
1731 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1732 if (dispatcher != null) {
1733 // Copy in case a embedded broker is dispatching via
1734 // vm://
1735 // md.getMessage() == null to signal end of queue
1736 // browse.
1737 Message msg = md.getMessage();
1738 if (msg != null) {
1739 msg = msg.copy();
1740 msg.setReadOnlyBody(true);
1741 msg.setReadOnlyProperties(true);
1742 msg.setRedeliveryCounter(md.getRedeliveryCounter());
1743 msg.setConnection(ActiveMQConnection.this);
1744 md.setMessage(msg);
1745 }
1746 dispatcher.dispatch(md);
1747 }
1748 return null;
1749 }
1750
1751 @Override
1752 public Response processProducerAck(ProducerAck pa) throws Exception {
1753 if (pa != null && pa.getProducerId() != null) {
1754 ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1755 if (producer != null) {
1756 producer.onProducerAck(pa);
1757 }
1758 }
1759 return null;
1760 }
1761
1762 @Override
1763 public Response processBrokerInfo(BrokerInfo info) throws Exception {
1764 brokerInfo = info;
1765 brokerInfoReceived.countDown();
1766 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1767 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1768 return null;
1769 }
1770
1771 @Override
1772 public Response processConnectionError(final ConnectionError error) throws Exception {
1773 executor.execute(new Runnable() {
1774 public void run() {
1775 onAsyncException(error.getException());
1776 }
1777 });
1778 return null;
1779 }
1780
1781 @Override
1782 public Response processControlCommand(ControlCommand command) throws Exception {
1783 onControlCommand(command);
1784 return null;
1785 }
1786
1787 @Override
1788 public Response processConnectionControl(ConnectionControl control) throws Exception {
1789 onConnectionControl((ConnectionControl)command);
1790 return null;
1791 }
1792
1793 @Override
1794 public Response processConsumerControl(ConsumerControl control) throws Exception {
1795 onConsumerControl((ConsumerControl)command);
1796 return null;
1797 }
1798
1799 @Override
1800 public Response processWireFormat(WireFormatInfo info) throws Exception {
1801 onWireFormatInfo((WireFormatInfo)command);
1802 return null;
1803 }
1804 });
1805 } catch (Exception e) {
1806 onClientInternalException(e);
1807 }
1808
1809 }
1810 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1811 TransportListener listener = iter.next();
1812 listener.onCommand(command);
1813 }
1814 }
1815
1816 protected void onWireFormatInfo(WireFormatInfo info) {
1817 protocolVersion.set(info.getVersion());
1818 }
1819
1820 /**
1821 * Handles async client internal exceptions.
1822 * A client internal exception is usually one that has been thrown
1823 * by a container runtime component during asynchronous processing of a
1824 * message that does not affect the connection itself.
1825 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1826 * its <code>onException</code> method, if one has been registered with this connection.
1827 *
1828 * @param error the exception that the problem
1829 */
1830 public void onClientInternalException(final Throwable error) {
1831 if ( !closed.get() && !closing.get() ) {
1832 if ( this.clientInternalExceptionListener != null ) {
1833 executor.execute(new Runnable() {
1834 public void run() {
1835 ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1836 }
1837 });
1838 } else {
1839 LOG.debug("Async client internal exception occurred with no exception listener registered: "
1840 + error, error);
1841 }
1842 }
1843 }
1844 /**
1845 * Used for handling async exceptions
1846 *
1847 * @param error
1848 */
1849 public void onAsyncException(Throwable error) {
1850 if (!closed.get() && !closing.get()) {
1851 if (this.exceptionListener != null) {
1852
1853 if (!(error instanceof JMSException)) {
1854 error = JMSExceptionSupport.create(error);
1855 }
1856 final JMSException e = (JMSException)error;
1857
1858 executor.execute(new Runnable() {
1859 public void run() {
1860 ActiveMQConnection.this.exceptionListener.onException(e);
1861 }
1862 });
1863
1864 } else {
1865 LOG.debug("Async exception with no exception listener: " + error, error);
1866 }
1867 }
1868 }
1869
1870 public void onException(final IOException error) {
1871 onAsyncException(error);
1872 if (!closing.get() && !closed.get()) {
1873 executor.execute(new Runnable() {
1874 public void run() {
1875 transportFailed(error);
1876 ServiceSupport.dispose(ActiveMQConnection.this.transport);
1877 brokerInfoReceived.countDown();
1878 try {
1879 cleanup();
1880 } catch (JMSException e) {
1881 LOG.warn("Exception during connection cleanup, " + e, e);
1882 }
1883 for (Iterator<TransportListener> iter = transportListeners
1884 .iterator(); iter.hasNext();) {
1885 TransportListener listener = iter.next();
1886 listener.onException(error);
1887 }
1888 }
1889 });
1890 }
1891 }
1892
1893 public void transportInterupted() {
1894 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1895 if (LOG.isDebugEnabled()) {
1896 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1897 }
1898 signalInterruptionProcessingNeeded();
1899
1900 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1901 ActiveMQSession s = i.next();
1902 s.clearMessagesInProgress();
1903 }
1904
1905 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1906 connectionConsumer.clearMessagesInProgress();
1907 }
1908
1909 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1910 TransportListener listener = iter.next();
1911 listener.transportInterupted();
1912 }
1913 }
1914
1915 public void transportResumed() {
1916 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1917 TransportListener listener = iter.next();
1918 listener.transportResumed();
1919 }
1920 }
1921
1922 /**
1923 * Create the DestinationInfo object for the temporary destination.
1924 *
1925 * @param topic - if its true topic, else queue.
1926 * @return DestinationInfo
1927 * @throws JMSException
1928 */
1929 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1930
1931 // Check if Destination info is of temporary type.
1932 ActiveMQTempDestination dest;
1933 if (topic) {
1934 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1935 } else {
1936 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1937 }
1938
1939 DestinationInfo info = new DestinationInfo();
1940 info.setConnectionId(this.info.getConnectionId());
1941 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1942 info.setDestination(dest);
1943 syncSendPacket(info);
1944
1945 dest.setConnection(this);
1946 activeTempDestinations.put(dest, dest);
1947 return dest;
1948 }
1949
1950 /**
1951 * @param destination
1952 * @throws JMSException
1953 */
1954 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1955
1956 checkClosedOrFailed();
1957
1958 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1959 ActiveMQSession s = i.next();
1960 if (s.isInUse(destination)) {
1961 throw new JMSException("A consumer is consuming from the temporary destination");
1962 }
1963 }
1964
1965 activeTempDestinations.remove(destination);
1966
1967 DestinationInfo destInfo = new DestinationInfo();
1968 destInfo.setConnectionId(this.info.getConnectionId());
1969 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1970 destInfo.setDestination(destination);
1971 destInfo.setTimeout(0);
1972 syncSendPacket(destInfo);
1973 }
1974
1975 public boolean isDeleted(ActiveMQDestination dest) {
1976
1977 // If we are not watching the advisories.. then
1978 // we will assume that the temp destination does exist.
1979 if (advisoryConsumer == null) {
1980 return false;
1981 }
1982
1983 return !activeTempDestinations.contains(dest);
1984 }
1985
1986 public boolean isCopyMessageOnSend() {
1987 return copyMessageOnSend;
1988 }
1989
1990 public LongSequenceGenerator getLocalTransactionIdGenerator() {
1991 return localTransactionIdGenerator;
1992 }
1993
1994 public boolean isUseCompression() {
1995 return useCompression;
1996 }
1997
1998 /**
1999 * Enables the use of compression of the message bodies
2000 */
2001 public void setUseCompression(boolean useCompression) {
2002 this.useCompression = useCompression;
2003 }
2004
2005 public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2006
2007 checkClosedOrFailed();
2008 ensureConnectionInfoSent();
2009
2010 DestinationInfo info = new DestinationInfo();
2011 info.setConnectionId(this.info.getConnectionId());
2012 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2013 info.setDestination(destination);
2014 info.setTimeout(0);
2015 syncSendPacket(info);
2016
2017 }
2018
2019 public boolean isDispatchAsync() {
2020 return dispatchAsync;
2021 }
2022
2023 /**
2024 * Enables or disables the default setting of whether or not consumers have
2025 * their messages <a
2026 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2027 * synchronously or asynchronously by the broker</a>. For non-durable
2028 * topics for example we typically dispatch synchronously by default to
2029 * minimize context switches which boost performance. However sometimes its
2030 * better to go slower to ensure that a single blocked consumer socket does
2031 * not block delivery to other consumers.
2032 *
2033 * @param asyncDispatch If true then consumers created on this connection
2034 * will default to having their messages dispatched
2035 * asynchronously. The default value is false.
2036 */
2037 public void setDispatchAsync(boolean asyncDispatch) {
2038 this.dispatchAsync = asyncDispatch;
2039 }
2040
2041 public boolean isObjectMessageSerializationDefered() {
2042 return objectMessageSerializationDefered;
2043 }
2044
2045 /**
2046 * When an object is set on an ObjectMessage, the JMS spec requires the
2047 * object to be serialized by that set method. Enabling this flag causes the
2048 * object to not get serialized. The object may subsequently get serialized
2049 * if the message needs to be sent over a socket or stored to disk.
2050 */
2051 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2052 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2053 }
2054
2055 public InputStream createInputStream(Destination dest) throws JMSException {
2056 return createInputStream(dest, null);
2057 }
2058
2059 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2060 return createInputStream(dest, messageSelector, false);
2061 }
2062
2063 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2064 return createInputStream(dest, messageSelector, noLocal, -1);
2065 }
2066
2067
2068
2069 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2070 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2071 }
2072
2073 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2074 return createInputStream(dest, null, false);
2075 }
2076
2077 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2078 return createDurableInputStream(dest, name, messageSelector, false);
2079 }
2080
2081 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2082 return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2083 }
2084
2085 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2086 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2087 }
2088
2089 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2090 checkClosedOrFailed();
2091 ensureConnectionInfoSent();
2092 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2093 }
2094
2095 /**
2096 * Creates a persistent output stream; individual messages will be written
2097 * to disk/database by the broker
2098 */
2099 public OutputStream createOutputStream(Destination dest) throws JMSException {
2100 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2101 }
2102
2103 /**
2104 * Creates a non persistent output stream; messages will not be written to
2105 * disk
2106 */
2107 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2108 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2109 }
2110
2111 /**
2112 * Creates an output stream allowing full control over the delivery mode,
2113 * the priority and time to live of the messages and the properties added to
2114 * messages on the stream.
2115 *
2116 * @param streamProperties defines a map of key-value pairs where the keys
2117 * are strings and the values are primitive values (numbers
2118 * and strings) which are appended to the messages similarly
2119 * to using the
2120 * {@link javax.jms.Message#setObjectProperty(String, Object)}
2121 * method
2122 */
2123 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2124 checkClosedOrFailed();
2125 ensureConnectionInfoSent();
2126 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2127 }
2128
2129 /**
2130 * Unsubscribes a durable subscription that has been created by a client.
2131 * <P>
2132 * This method deletes the state being maintained on behalf of the
2133 * subscriber by its provider.
2134 * <P>
2135 * It is erroneous for a client to delete a durable subscription while there
2136 * is an active <CODE>MessageConsumer </CODE> or
2137 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2138 * message is part of a pending transaction or has not been acknowledged in
2139 * the session.
2140 *
2141 * @param name the name used to identify this subscription
2142 * @throws JMSException if the session fails to unsubscribe to the durable
2143 * subscription due to some internal error.
2144 * @throws InvalidDestinationException if an invalid subscription name is
2145 * specified.
2146 * @since 1.1
2147 */
2148 public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2149 checkClosedOrFailed();
2150 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2151 rsi.setConnectionId(getConnectionInfo().getConnectionId());
2152 rsi.setSubscriptionName(name);
2153 rsi.setClientId(getConnectionInfo().getClientId());
2154 syncSendPacket(rsi);
2155 }
2156
2157 /**
2158 * Internal send method optimized: - It does not copy the message - It can
2159 * only handle ActiveMQ messages. - You can specify if the send is async or
2160 * sync - Does not allow you to send /w a transaction.
2161 */
2162 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2163 checkClosedOrFailed();
2164
2165 if (destination.isTemporary() && isDeleted(destination)) {
2166 throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2167 }
2168
2169 msg.setJMSDestination(destination);
2170 msg.setJMSDeliveryMode(deliveryMode);
2171 long expiration = 0L;
2172
2173 if (!isDisableTimeStampsByDefault()) {
2174 long timeStamp = System.currentTimeMillis();
2175 msg.setJMSTimestamp(timeStamp);
2176 if (timeToLive > 0) {
2177 expiration = timeToLive + timeStamp;
2178 }
2179 }
2180
2181 msg.setJMSExpiration(expiration);
2182 msg.setJMSPriority(priority);
2183
2184 msg.setJMSRedelivered(false);
2185 msg.setMessageId(messageId);
2186
2187 msg.onSend();
2188
2189 msg.setProducerId(msg.getMessageId().getProducerId());
2190
2191 if (LOG.isDebugEnabled()) {
2192 LOG.debug("Sending message: " + msg);
2193 }
2194
2195 if (async) {
2196 asyncSendPacket(msg);
2197 } else {
2198 syncSendPacket(msg);
2199 }
2200
2201 }
2202
2203 public void addOutputStream(ActiveMQOutputStream stream) {
2204 outputStreams.add(stream);
2205 }
2206
2207 public void removeOutputStream(ActiveMQOutputStream stream) {
2208 outputStreams.remove(stream);
2209 }
2210
2211 public void addInputStream(ActiveMQInputStream stream) {
2212 inputStreams.add(stream);
2213 }
2214
2215 public void removeInputStream(ActiveMQInputStream stream) {
2216 inputStreams.remove(stream);
2217 }
2218
2219 protected void onControlCommand(ControlCommand command) {
2220 String text = command.getCommand();
2221 if (text != null) {
2222 if ("shutdown".equals(text)) {
2223 LOG.info("JVM told to shutdown");
2224 System.exit(0);
2225 }
2226 if (false && "close".equals(text)){
2227 LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2228 try {
2229 close();
2230 } catch (JMSException e) {
2231 }
2232 }
2233 }
2234 }
2235
2236 protected void onConnectionControl(ConnectionControl command) {
2237 if (command.isFaultTolerant()) {
2238 this.optimizeAcknowledge = false;
2239 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2240 ActiveMQSession s = i.next();
2241 s.setOptimizeAcknowledge(false);
2242 }
2243 }
2244 }
2245
2246 protected void onConsumerControl(ConsumerControl command) {
2247 if (command.isClose()) {
2248 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2249 ActiveMQSession s = i.next();
2250 s.close(command.getConsumerId());
2251 }
2252 } else {
2253 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2254 ActiveMQSession s = i.next();
2255 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2256 }
2257 }
2258 }
2259
2260 protected void transportFailed(IOException error) {
2261 transportFailed.set(true);
2262 if (firstFailureError == null) {
2263 firstFailureError = error;
2264 }
2265 }
2266
2267 /**
2268 * Should a JMS message be copied to a new JMS Message object as part of the
2269 * send() method in JMS. This is enabled by default to be compliant with the
2270 * JMS specification. You can disable it if you do not mutate JMS messages
2271 * after they are sent for a performance boost
2272 */
2273 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2274 this.copyMessageOnSend = copyMessageOnSend;
2275 }
2276
2277 @Override
2278 public String toString() {
2279 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2280 }
2281
2282 protected BlobTransferPolicy createBlobTransferPolicy() {
2283 return new BlobTransferPolicy();
2284 }
2285
2286 public int getProtocolVersion() {
2287 return protocolVersion.get();
2288 }
2289
2290 public int getProducerWindowSize() {
2291 return producerWindowSize;
2292 }
2293
2294 public void setProducerWindowSize(int producerWindowSize) {
2295 this.producerWindowSize = producerWindowSize;
2296 }
2297
2298 public void setAuditDepth(int auditDepth) {
2299 connectionAudit.setAuditDepth(auditDepth);
2300 }
2301
2302 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2303 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2304 }
2305
2306 protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2307 connectionAudit.removeDispatcher(dispatcher);
2308 }
2309
2310 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2311 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2312 }
2313
2314 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2315 connectionAudit.rollbackDuplicate(dispatcher, message);
2316 }
2317
2318 public IOException getFirstFailureError() {
2319 return firstFailureError;
2320 }
2321
2322 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2323 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2324 if (cdl != null) {
2325 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2326 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2327 cdl.await(10, TimeUnit.SECONDS);
2328 }
2329 signalInterruptionProcessingComplete();
2330 }
2331 }
2332
2333 protected void transportInterruptionProcessingComplete() {
2334 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2335 if (cdl != null) {
2336 cdl.countDown();
2337 try {
2338 signalInterruptionProcessingComplete();
2339 } catch (InterruptedException ignored) {}
2340 }
2341 }
2342
2343 private void signalInterruptionProcessingComplete() throws InterruptedException {
2344 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2345 if (cdl.getCount()==0) {
2346 if (LOG.isDebugEnabled()) {
2347 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2348 }
2349 this.transportInterruptionProcessingComplete = null;
2350
2351 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2352 if (failoverTransport != null) {
2353 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2354 if (LOG.isDebugEnabled()) {
2355 LOG.debug("notified failover transport (" + failoverTransport
2356 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2357 }
2358 }
2359
2360 }
2361 }
2362
2363 private void signalInterruptionProcessingNeeded() {
2364 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2365 if (failoverTransport != null) {
2366 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2367 if (LOG.isDebugEnabled()) {
2368 LOG.debug("notified failover transport (" + failoverTransport
2369 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2370 }
2371 }
2372 }
2373
2374 /*
2375 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2376 * will wait to receive re dispatched messages.
2377 * default value is 0 so there is no wait by default.
2378 */
2379 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2380 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2381 }
2382
2383 public long getConsumerFailoverRedeliveryWaitPeriod() {
2384 return consumerFailoverRedeliveryWaitPeriod;
2385 }
2386
2387 protected Scheduler getScheduler() {
2388 return this.scheduler;
2389 }
2390
2391 protected ThreadPoolExecutor getExecutor() {
2392 return this.executor;
2393 }
2394
2395 /**
2396 * @return the checkForDuplicates
2397 */
2398 public boolean isCheckForDuplicates() {
2399 return this.checkForDuplicates;
2400 }
2401
2402 /**
2403 * @param checkForDuplicates the checkForDuplicates to set
2404 */
2405 public void setCheckForDuplicates(boolean checkForDuplicates) {
2406 this.checkForDuplicates = checkForDuplicates;
2407 }
2408
2409 }