001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.File;
020 import java.io.InputStream;
021 import java.io.Serializable;
022 import java.net.URL;
023 import java.util.Collections;
024 import java.util.Iterator;
025 import java.util.List;
026 import java.util.concurrent.CopyOnWriteArrayList;
027 import java.util.concurrent.ThreadPoolExecutor;
028 import java.util.concurrent.atomic.AtomicBoolean;
029 import javax.jms.BytesMessage;
030 import javax.jms.Destination;
031 import javax.jms.IllegalStateException;
032 import javax.jms.InvalidDestinationException;
033 import javax.jms.InvalidSelectorException;
034 import javax.jms.JMSException;
035 import javax.jms.MapMessage;
036 import javax.jms.Message;
037 import javax.jms.MessageConsumer;
038 import javax.jms.MessageListener;
039 import javax.jms.MessageProducer;
040 import javax.jms.ObjectMessage;
041 import javax.jms.Queue;
042 import javax.jms.QueueBrowser;
043 import javax.jms.QueueReceiver;
044 import javax.jms.QueueSender;
045 import javax.jms.QueueSession;
046 import javax.jms.Session;
047 import javax.jms.StreamMessage;
048 import javax.jms.TemporaryQueue;
049 import javax.jms.TemporaryTopic;
050 import javax.jms.TextMessage;
051 import javax.jms.Topic;
052 import javax.jms.TopicPublisher;
053 import javax.jms.TopicSession;
054 import javax.jms.TopicSubscriber;
055 import javax.jms.TransactionRolledBackException;
056 import org.apache.activemq.blob.BlobDownloader;
057 import org.apache.activemq.blob.BlobTransferPolicy;
058 import org.apache.activemq.blob.BlobUploader;
059 import org.apache.activemq.command.ActiveMQBlobMessage;
060 import org.apache.activemq.command.ActiveMQBytesMessage;
061 import org.apache.activemq.command.ActiveMQDestination;
062 import org.apache.activemq.command.ActiveMQMapMessage;
063 import org.apache.activemq.command.ActiveMQMessage;
064 import org.apache.activemq.command.ActiveMQObjectMessage;
065 import org.apache.activemq.command.ActiveMQQueue;
066 import org.apache.activemq.command.ActiveMQStreamMessage;
067 import org.apache.activemq.command.ActiveMQTempDestination;
068 import org.apache.activemq.command.ActiveMQTempQueue;
069 import org.apache.activemq.command.ActiveMQTempTopic;
070 import org.apache.activemq.command.ActiveMQTextMessage;
071 import org.apache.activemq.command.ActiveMQTopic;
072 import org.apache.activemq.command.Command;
073 import org.apache.activemq.command.ConsumerId;
074 import org.apache.activemq.command.MessageAck;
075 import org.apache.activemq.command.MessageDispatch;
076 import org.apache.activemq.command.MessageId;
077 import org.apache.activemq.command.ProducerId;
078 import org.apache.activemq.command.RemoveInfo;
079 import org.apache.activemq.command.Response;
080 import org.apache.activemq.command.SessionId;
081 import org.apache.activemq.command.SessionInfo;
082 import org.apache.activemq.command.TransactionId;
083 import org.apache.activemq.management.JMSSessionStatsImpl;
084 import org.apache.activemq.management.StatsCapable;
085 import org.apache.activemq.management.StatsImpl;
086 import org.apache.activemq.thread.Scheduler;
087 import org.apache.activemq.transaction.Synchronization;
088 import org.apache.activemq.usage.MemoryUsage;
089 import org.apache.activemq.util.Callback;
090 import org.apache.activemq.util.LongSequenceGenerator;
091 import org.slf4j.Logger;
092 import org.slf4j.LoggerFactory;
093
094 /**
095 * <P>
096 * A <CODE>Session</CODE> object is a single-threaded context for producing
097 * and consuming messages. Although it may allocate provider resources outside
098 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
099 * <P>
100 * A session serves several purposes:
101 * <UL>
102 * <LI>It is a factory for its message producers and consumers.
103 * <LI>It supplies provider-optimized message factories.
104 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
105 * <CODE>TemporaryQueues</CODE>.
106 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
107 * objects for those clients that need to dynamically manipulate
108 * provider-specific destination names.
109 * <LI>It supports a single series of transactions that combine work spanning
110 * its producers and consumers into atomic units.
111 * <LI>It defines a serial order for the messages it consumes and the messages
112 * it produces.
113 * <LI>It retains messages it consumes until they have been acknowledged.
114 * <LI>It serializes execution of message listeners registered with its message
115 * consumers.
116 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
117 * </UL>
118 * <P>
119 * A session can create and service multiple message producers and consumers.
120 * <P>
121 * One typical use is to have a thread block on a synchronous
122 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
123 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
124 * <P>
125 * If a client desires to have one thread produce messages while others consume
126 * them, the client should use a separate session for its producing thread.
127 * <P>
128 * Once a connection has been started, any session with one or more registered
129 * message listeners is dedicated to the thread of control that delivers
130 * messages to it. It is erroneous for client code to use this session or any of
131 * its constituent objects from another thread of control. The only exception to
132 * this rule is the use of the session or connection <CODE>close</CODE>
133 * method.
134 * <P>
135 * It should be easy for most clients to partition their work naturally into
136 * sessions. This model allows clients to start simply and incrementally add
137 * message processing complexity as their need for concurrency grows.
138 * <P>
139 * The <CODE>close</CODE> method is the only session method that can be called
140 * while some other session method is being executed in another thread.
141 * <P>
142 * A session may be specified as transacted. Each transacted session supports a
143 * single series of transactions. Each transaction groups a set of message sends
144 * and a set of message receives into an atomic unit of work. In effect,
145 * transactions organize a session's input message stream and output message
146 * stream into series of atomic units. When a transaction commits, its atomic
147 * unit of input is acknowledged and its associated atomic unit of output is
148 * sent. If a transaction rollback is done, the transaction's sent messages are
149 * destroyed and the session's input is automatically recovered.
150 * <P>
151 * The content of a transaction's input and output units is simply those
152 * messages that have been produced and consumed within the session's current
153 * transaction.
154 * <P>
155 * A transaction is completed using either its session's <CODE>commit</CODE>
156 * method or its session's <CODE>rollback </CODE> method. The completion of a
157 * session's current transaction automatically begins the next. The result is
158 * that a transacted session always has a current transaction within which its
159 * work is done.
160 * <P>
161 * The Java Transaction Service (JTS) or some other transaction monitor may be
162 * used to combine a session's transaction with transactions on other resources
163 * (databases, other JMS sessions, etc.). Since Java distributed transactions
164 * are controlled via the Java Transaction API (JTA), use of the session's
165 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
166 * prohibited.
167 * <P>
168 * The JMS API does not require support for JTA; however, it does define how a
169 * provider supplies this support.
170 * <P>
171 * Although it is also possible for a JMS client to handle distributed
172 * transactions directly, it is unlikely that many JMS clients will do this.
173 * Support for JTA in the JMS API is targeted at systems vendors who will be
174 * integrating the JMS API into their application server products.
175 *
176 *
177 * @see javax.jms.Session
178 * @see javax.jms.QueueSession
179 * @see javax.jms.TopicSession
180 * @see javax.jms.XASession
181 */
182 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
183
184 /**
185 * Only acknowledge an individual message - using message.acknowledge()
186 * as opposed to CLIENT_ACKNOWLEDGE which
187 * acknowledges all messages consumed by a session at when acknowledge()
188 * is called
189 */
190 public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
191 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
192
193 public static interface DeliveryListener {
194 void beforeDelivery(ActiveMQSession session, Message msg);
195
196 void afterDelivery(ActiveMQSession session, Message msg);
197 }
198
199 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
200 private final Scheduler scheduler;
201 private final ThreadPoolExecutor connectionExecutor;
202
203 protected int acknowledgementMode;
204 protected final ActiveMQConnection connection;
205 protected final SessionInfo info;
206 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
207 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
208 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
209 protected final ActiveMQSessionExecutor executor;
210 protected final AtomicBoolean started = new AtomicBoolean(false);
211
212 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
213 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
214
215 protected boolean closed;
216 private volatile boolean synchronizationRegistered;
217 protected boolean asyncDispatch;
218 protected boolean sessionAsyncDispatch;
219 protected final boolean debug;
220 protected Object sendMutex = new Object();
221
222 private MessageListener messageListener;
223 private final JMSSessionStatsImpl stats;
224 private TransactionContext transactionContext;
225 private DeliveryListener deliveryListener;
226 private MessageTransformer transformer;
227 private BlobTransferPolicy blobTransferPolicy;
228 private long lastDeliveredSequenceId;
229
230 /**
231 * Construct the Session
232 *
233 * @param connection
234 * @param sessionId
235 * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
236 * Session.SESSION_TRANSACTED
237 * @param asyncDispatch
238 * @param sessionAsyncDispatch
239 * @throws JMSException on internal error
240 */
241 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
242 this.debug = LOG.isDebugEnabled();
243 this.connection = connection;
244 this.acknowledgementMode = acknowledgeMode;
245 this.asyncDispatch = asyncDispatch;
246 this.sessionAsyncDispatch = sessionAsyncDispatch;
247 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
248 setTransactionContext(new TransactionContext(connection));
249 stats = new JMSSessionStatsImpl(producers, consumers);
250 this.connection.asyncSendPacket(info);
251 setTransformer(connection.getTransformer());
252 setBlobTransferPolicy(connection.getBlobTransferPolicy());
253 this.scheduler=connection.getScheduler();
254 this.connectionExecutor=connection.getExecutor();
255 this.executor = new ActiveMQSessionExecutor(this);
256 connection.addSession(this);
257 if (connection.isStarted()) {
258 start();
259 }
260
261 }
262
263 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
264 this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
265 }
266
267 /**
268 * Sets the transaction context of the session.
269 *
270 * @param transactionContext - provides the means to control a JMS
271 * transaction.
272 */
273 public void setTransactionContext(TransactionContext transactionContext) {
274 this.transactionContext = transactionContext;
275 }
276
277 /**
278 * Returns the transaction context of the session.
279 *
280 * @return transactionContext - session's transaction context.
281 */
282 public TransactionContext getTransactionContext() {
283 return transactionContext;
284 }
285
286 /*
287 * (non-Javadoc)
288 *
289 * @see org.apache.activemq.management.StatsCapable#getStats()
290 */
291 public StatsImpl getStats() {
292 return stats;
293 }
294
295 /**
296 * Returns the session's statistics.
297 *
298 * @return stats - session's statistics.
299 */
300 public JMSSessionStatsImpl getSessionStats() {
301 return stats;
302 }
303
304 /**
305 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
306 * object is used to send a message containing a stream of uninterpreted
307 * bytes.
308 *
309 * @return the an ActiveMQBytesMessage
310 * @throws JMSException if the JMS provider fails to create this message due
311 * to some internal error.
312 */
313 public BytesMessage createBytesMessage() throws JMSException {
314 ActiveMQBytesMessage message = new ActiveMQBytesMessage();
315 configureMessage(message);
316 return message;
317 }
318
319 /**
320 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
321 * object is used to send a self-defining set of name-value pairs, where
322 * names are <CODE>String</CODE> objects and values are primitive values
323 * in the Java programming language.
324 *
325 * @return an ActiveMQMapMessage
326 * @throws JMSException if the JMS provider fails to create this message due
327 * to some internal error.
328 */
329 public MapMessage createMapMessage() throws JMSException {
330 ActiveMQMapMessage message = new ActiveMQMapMessage();
331 configureMessage(message);
332 return message;
333 }
334
335 /**
336 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
337 * interface is the root interface of all JMS messages. A
338 * <CODE>Message</CODE> object holds all the standard message header
339 * information. It can be sent when a message containing only header
340 * information is sufficient.
341 *
342 * @return an ActiveMQMessage
343 * @throws JMSException if the JMS provider fails to create this message due
344 * to some internal error.
345 */
346 public Message createMessage() throws JMSException {
347 ActiveMQMessage message = new ActiveMQMessage();
348 configureMessage(message);
349 return message;
350 }
351
352 /**
353 * Creates an <CODE>ObjectMessage</CODE> object. An
354 * <CODE>ObjectMessage</CODE> object is used to send a message that
355 * contains a serializable Java object.
356 *
357 * @return an ActiveMQObjectMessage
358 * @throws JMSException if the JMS provider fails to create this message due
359 * to some internal error.
360 */
361 public ObjectMessage createObjectMessage() throws JMSException {
362 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
363 configureMessage(message);
364 return message;
365 }
366
367 /**
368 * Creates an initialized <CODE>ObjectMessage</CODE> object. An
369 * <CODE>ObjectMessage</CODE> object is used to send a message that
370 * contains a serializable Java object.
371 *
372 * @param object the object to use to initialize this message
373 * @return an ActiveMQObjectMessage
374 * @throws JMSException if the JMS provider fails to create this message due
375 * to some internal error.
376 */
377 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
378 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
379 configureMessage(message);
380 message.setObject(object);
381 return message;
382 }
383
384 /**
385 * Creates a <CODE>StreamMessage</CODE> object. A
386 * <CODE>StreamMessage</CODE> object is used to send a self-defining
387 * stream of primitive values in the Java programming language.
388 *
389 * @return an ActiveMQStreamMessage
390 * @throws JMSException if the JMS provider fails to create this message due
391 * to some internal error.
392 */
393 public StreamMessage createStreamMessage() throws JMSException {
394 ActiveMQStreamMessage message = new ActiveMQStreamMessage();
395 configureMessage(message);
396 return message;
397 }
398
399 /**
400 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
401 * object is used to send a message containing a <CODE>String</CODE>
402 * object.
403 *
404 * @return an ActiveMQTextMessage
405 * @throws JMSException if the JMS provider fails to create this message due
406 * to some internal error.
407 */
408 public TextMessage createTextMessage() throws JMSException {
409 ActiveMQTextMessage message = new ActiveMQTextMessage();
410 configureMessage(message);
411 return message;
412 }
413
414 /**
415 * Creates an initialized <CODE>TextMessage</CODE> object. A
416 * <CODE>TextMessage</CODE> object is used to send a message containing a
417 * <CODE>String</CODE>.
418 *
419 * @param text the string used to initialize this message
420 * @return an ActiveMQTextMessage
421 * @throws JMSException if the JMS provider fails to create this message due
422 * to some internal error.
423 */
424 public TextMessage createTextMessage(String text) throws JMSException {
425 ActiveMQTextMessage message = new ActiveMQTextMessage();
426 message.setText(text);
427 configureMessage(message);
428 return message;
429 }
430
431 /**
432 * Creates an initialized <CODE>BlobMessage</CODE> object. A
433 * <CODE>BlobMessage</CODE> object is used to send a message containing a
434 * <CODE>URL</CODE> which points to some network addressible BLOB.
435 *
436 * @param url the network addressable URL used to pass directly to the
437 * consumer
438 * @return a BlobMessage
439 * @throws JMSException if the JMS provider fails to create this message due
440 * to some internal error.
441 */
442 public BlobMessage createBlobMessage(URL url) throws JMSException {
443 return createBlobMessage(url, false);
444 }
445
446 /**
447 * Creates an initialized <CODE>BlobMessage</CODE> object. A
448 * <CODE>BlobMessage</CODE> object is used to send a message containing a
449 * <CODE>URL</CODE> which points to some network addressible BLOB.
450 *
451 * @param url the network addressable URL used to pass directly to the
452 * consumer
453 * @param deletedByBroker indicates whether or not the resource is deleted
454 * by the broker when the message is acknowledged
455 * @return a BlobMessage
456 * @throws JMSException if the JMS provider fails to create this message due
457 * to some internal error.
458 */
459 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
460 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
461 configureMessage(message);
462 message.setURL(url);
463 message.setDeletedByBroker(deletedByBroker);
464 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
465 return message;
466 }
467
468 /**
469 * Creates an initialized <CODE>BlobMessage</CODE> object. A
470 * <CODE>BlobMessage</CODE> object is used to send a message containing
471 * the <CODE>File</CODE> content. Before the message is sent the file
472 * conent will be uploaded to the broker or some other remote repository
473 * depending on the {@link #getBlobTransferPolicy()}.
474 *
475 * @param file the file to be uploaded to some remote repo (or the broker)
476 * depending on the strategy
477 * @return a BlobMessage
478 * @throws JMSException if the JMS provider fails to create this message due
479 * to some internal error.
480 */
481 public BlobMessage createBlobMessage(File file) throws JMSException {
482 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
483 configureMessage(message);
484 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
485 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
486 message.setDeletedByBroker(true);
487 message.setName(file.getName());
488 return message;
489 }
490
491 /**
492 * Creates an initialized <CODE>BlobMessage</CODE> object. A
493 * <CODE>BlobMessage</CODE> object is used to send a message containing
494 * the <CODE>File</CODE> content. Before the message is sent the file
495 * conent will be uploaded to the broker or some other remote repository
496 * depending on the {@link #getBlobTransferPolicy()}.
497 *
498 * @param in the stream to be uploaded to some remote repo (or the broker)
499 * depending on the strategy
500 * @return a BlobMessage
501 * @throws JMSException if the JMS provider fails to create this message due
502 * to some internal error.
503 */
504 public BlobMessage createBlobMessage(InputStream in) throws JMSException {
505 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
506 configureMessage(message);
507 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
508 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
509 message.setDeletedByBroker(true);
510 return message;
511 }
512
513 /**
514 * Indicates whether the session is in transacted mode.
515 *
516 * @return true if the session is in transacted mode
517 * @throws JMSException if there is some internal error.
518 */
519 public boolean getTransacted() throws JMSException {
520 checkClosed();
521 return isTransacted();
522 }
523
524 /**
525 * Returns the acknowledgement mode of the session. The acknowledgement mode
526 * is set at the time that the session is created. If the session is
527 * transacted, the acknowledgement mode is ignored.
528 *
529 * @return If the session is not transacted, returns the current
530 * acknowledgement mode for the session. If the session is
531 * transacted, returns SESSION_TRANSACTED.
532 * @throws JMSException
533 * @see javax.jms.Connection#createSession(boolean,int)
534 * @since 1.1 exception JMSException if there is some internal error.
535 */
536 public int getAcknowledgeMode() throws JMSException {
537 checkClosed();
538 return this.acknowledgementMode;
539 }
540
541 /**
542 * Commits all messages done in this transaction and releases any locks
543 * currently held.
544 *
545 * @throws JMSException if the JMS provider fails to commit the transaction
546 * due to some internal error.
547 * @throws TransactionRolledBackException if the transaction is rolled back
548 * due to some internal error during commit.
549 * @throws javax.jms.IllegalStateException if the method is not called by a
550 * transacted session.
551 */
552 public void commit() throws JMSException {
553 checkClosed();
554 if (!getTransacted()) {
555 throw new javax.jms.IllegalStateException("Not a transacted session");
556 }
557 if (LOG.isDebugEnabled()) {
558 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
559 }
560 transactionContext.commit();
561 }
562
563 /**
564 * Rolls back any messages done in this transaction and releases any locks
565 * currently held.
566 *
567 * @throws JMSException if the JMS provider fails to roll back the
568 * transaction due to some internal error.
569 * @throws javax.jms.IllegalStateException if the method is not called by a
570 * transacted session.
571 */
572 public void rollback() throws JMSException {
573 checkClosed();
574 if (!getTransacted()) {
575 throw new javax.jms.IllegalStateException("Not a transacted session");
576 }
577 if (LOG.isDebugEnabled()) {
578 LOG.debug(getSessionId() + " Transaction Rollback");
579 }
580 transactionContext.rollback();
581 }
582
583 /**
584 * Closes the session.
585 * <P>
586 * Since a provider may allocate some resources on behalf of a session
587 * outside the JVM, clients should close the resources when they are not
588 * needed. Relying on garbage collection to eventually reclaim these
589 * resources may not be timely enough.
590 * <P>
591 * There is no need to close the producers and consumers of a closed
592 * session.
593 * <P>
594 * This call will block until a <CODE>receive</CODE> call or message
595 * listener in progress has completed. A blocked message consumer
596 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
597 * is closed.
598 * <P>
599 * Closing a transacted session must roll back the transaction in progress.
600 * <P>
601 * This method is the only <CODE>Session</CODE> method that can be called
602 * concurrently.
603 * <P>
604 * Invoking any other <CODE>Session</CODE> method on a closed session must
605 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
606 * closed session must <I>not </I> throw an exception.
607 *
608 * @throws JMSException if the JMS provider fails to close the session due
609 * to some internal error.
610 */
611 public void close() throws JMSException {
612 if (!closed) {
613 if (getTransactionContext().isInXATransaction()) {
614 if (!synchronizationRegistered) {
615 synchronizationRegistered = true;
616 getTransactionContext().addSynchronization(new Synchronization() {
617
618 @Override
619 public void afterCommit() throws Exception {
620 doClose();
621 synchronizationRegistered = false;
622 }
623
624 @Override
625 public void afterRollback() throws Exception {
626 doClose();
627 synchronizationRegistered = false;
628 }
629 });
630 }
631
632 } else {
633 doClose();
634 }
635 }
636 }
637
638 private void doClose() throws JMSException {
639 dispose();
640 RemoveInfo removeCommand = info.createRemoveCommand();
641 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
642 connection.asyncSendPacket(removeCommand);
643 }
644
645 void clearMessagesInProgress() {
646 executor.clearMessagesInProgress();
647 // we are called from inside the transport reconnection logic
648 // which involves us clearing all the connections' consumers
649 // dispatch and delivered lists. So rather than trying to
650 // grab a mutex (which could be already owned by the message
651 // listener calling the send or an ack) we allow it to complete in
652 // a separate thread via the scheduler and notify us via
653 // connection.transportInterruptionProcessingComplete()
654 //
655 for (final ActiveMQMessageConsumer consumer : consumers) {
656 consumer.inProgressClearRequired();
657 scheduler.executeAfterDelay(new Runnable() {
658 public void run() {
659 consumer.clearMessagesInProgress();
660 }}, 0l);
661 }
662 }
663
664 void deliverAcks() {
665 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
666 ActiveMQMessageConsumer consumer = iter.next();
667 consumer.deliverAcks();
668 }
669 }
670
671 public synchronized void dispose() throws JMSException {
672 if (!closed) {
673
674 try {
675 executor.stop();
676
677 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
678 ActiveMQMessageConsumer consumer = iter.next();
679 consumer.setFailureError(connection.getFirstFailureError());
680 consumer.dispose();
681 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
682 }
683 consumers.clear();
684
685 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
686 ActiveMQMessageProducer producer = iter.next();
687 producer.dispose();
688 }
689 producers.clear();
690
691 try {
692 if (getTransactionContext().isInLocalTransaction()) {
693 rollback();
694 }
695 } catch (JMSException e) {
696 }
697
698 } finally {
699 connection.removeSession(this);
700 this.transactionContext = null;
701 closed = true;
702 }
703 }
704 }
705
706 /**
707 * Checks that the session is not closed then configures the message
708 */
709 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
710 checkClosed();
711 message.setConnection(connection);
712 }
713
714 /**
715 * Check if the session is closed. It is used for ensuring that the session
716 * is open before performing various operations.
717 *
718 * @throws IllegalStateException if the Session is closed
719 */
720 protected void checkClosed() throws IllegalStateException {
721 if (closed) {
722 throw new IllegalStateException("The Session is closed");
723 }
724 }
725
726 /**
727 * Stops message delivery in this session, and restarts message delivery
728 * with the oldest unacknowledged message.
729 * <P>
730 * All consumers deliver messages in a serial order. Acknowledging a
731 * received message automatically acknowledges all messages that have been
732 * delivered to the client.
733 * <P>
734 * Restarting a session causes it to take the following actions:
735 * <UL>
736 * <LI>Stop message delivery
737 * <LI>Mark all messages that might have been delivered but not
738 * acknowledged as "redelivered"
739 * <LI>Restart the delivery sequence including all unacknowledged messages
740 * that had been previously delivered. Redelivered messages do not have to
741 * be delivered in exactly their original delivery order.
742 * </UL>
743 *
744 * @throws JMSException if the JMS provider fails to stop and restart
745 * message delivery due to some internal error.
746 * @throws IllegalStateException if the method is called by a transacted
747 * session.
748 */
749 public void recover() throws JMSException {
750
751 checkClosed();
752 if (getTransacted()) {
753 throw new IllegalStateException("This session is transacted");
754 }
755
756 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
757 ActiveMQMessageConsumer c = iter.next();
758 c.rollback();
759 }
760
761 }
762
763 /**
764 * Returns the session's distinguished message listener (optional).
765 *
766 * @return the message listener associated with this session
767 * @throws JMSException if the JMS provider fails to get the message
768 * listener due to an internal error.
769 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
770 * @see javax.jms.ServerSessionPool
771 * @see javax.jms.ServerSession
772 */
773 public MessageListener getMessageListener() throws JMSException {
774 checkClosed();
775 return this.messageListener;
776 }
777
778 /**
779 * Sets the session's distinguished message listener (optional).
780 * <P>
781 * When the distinguished message listener is set, no other form of message
782 * receipt in the session can be used; however, all forms of sending
783 * messages are still supported.
784 * <P>
785 * This is an expert facility not used by regular JMS clients.
786 *
787 * @param listener the message listener to associate with this session
788 * @throws JMSException if the JMS provider fails to set the message
789 * listener due to an internal error.
790 * @see javax.jms.Session#getMessageListener()
791 * @see javax.jms.ServerSessionPool
792 * @see javax.jms.ServerSession
793 */
794 public void setMessageListener(MessageListener listener) throws JMSException {
795 checkClosed();
796 this.messageListener = listener;
797
798 if (listener != null) {
799 executor.setDispatchedBySessionPool(true);
800 }
801 }
802
803 /**
804 * Optional operation, intended to be used only by Application Servers, not
805 * by ordinary JMS clients.
806 *
807 * @see javax.jms.ServerSession
808 */
809 public void run() {
810 MessageDispatch messageDispatch;
811 while ((messageDispatch = executor.dequeueNoWait()) != null) {
812 final MessageDispatch md = messageDispatch;
813 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
814 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
815 // TODO: Ack it without delivery to client
816 continue;
817 }
818
819 if (isClientAcknowledge()||isIndividualAcknowledge()) {
820 message.setAcknowledgeCallback(new Callback() {
821 public void execute() throws Exception {
822 }
823 });
824 }
825
826 if (deliveryListener != null) {
827 deliveryListener.beforeDelivery(this, message);
828 }
829
830 md.setDeliverySequenceId(getNextDeliveryId());
831
832 try {
833 messageListener.onMessage(message);
834 } catch (RuntimeException e) {
835 LOG.error("error dispatching message: ", e);
836 // A problem while invoking the MessageListener does not
837 // in general indicate a problem with the connection to the broker, i.e.
838 // it will usually be sufficient to let the afterDelivery() method either
839 // commit or roll back in order to deal with the exception.
840 // However, we notify any registered client internal exception listener
841 // of the problem.
842 connection.onClientInternalException(e);
843 }
844
845 try {
846 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
847 ack.setFirstMessageId(md.getMessage().getMessageId());
848 doStartTransaction();
849 ack.setTransactionId(getTransactionContext().getTransactionId());
850 if (ack.getTransactionId() != null) {
851 getTransactionContext().addSynchronization(new Synchronization() {
852
853 @Override
854 public void afterRollback() throws Exception {
855 md.getMessage().onMessageRolledBack();
856 // ensure we don't filter this as a duplicate
857 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
858 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
859 int redeliveryCounter = md.getMessage().getRedeliveryCounter();
860 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
861 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
862 // We need to NACK the messages so that they get
863 // sent to the
864 // DLQ.
865 // Acknowledge the last message.
866 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
867 ack.setFirstMessageId(md.getMessage().getMessageId());
868 asyncSendPacket(ack);
869 } else {
870
871 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
872 ack.setFirstMessageId(md.getMessage().getMessageId());
873 asyncSendPacket(ack);
874
875 // Figure out how long we should wait to resend
876 // this message.
877 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
878 for (int i = 0; i < redeliveryCounter; i++) {
879 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
880 }
881 scheduler.executeAfterDelay(new Runnable() {
882
883 public void run() {
884 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
885 }
886 }, redeliveryDelay);
887 }
888 }
889 });
890 }
891 asyncSendPacket(ack);
892 } catch (Throwable e) {
893 connection.onClientInternalException(e);
894 }
895
896 if (deliveryListener != null) {
897 deliveryListener.afterDelivery(this, message);
898 }
899 }
900 }
901
902 /**
903 * Creates a <CODE>MessageProducer</CODE> to send messages to the
904 * specified destination.
905 * <P>
906 * A client uses a <CODE>MessageProducer</CODE> object to send messages to
907 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
908 * inherit from <CODE>Destination</CODE>, they can be used in the
909 * destination parameter to create a <CODE>MessageProducer</CODE> object.
910 *
911 * @param destination the <CODE>Destination</CODE> to send to, or null if
912 * this is a producer which does not have a specified
913 * destination.
914 * @return the MessageProducer
915 * @throws JMSException if the session fails to create a MessageProducer due
916 * to some internal error.
917 * @throws InvalidDestinationException if an invalid destination is
918 * specified.
919 * @since 1.1
920 */
921 public MessageProducer createProducer(Destination destination) throws JMSException {
922 checkClosed();
923 if (destination instanceof CustomDestination) {
924 CustomDestination customDestination = (CustomDestination)destination;
925 return customDestination.createProducer(this);
926 }
927 int timeSendOut = connection.getSendTimeout();
928 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
929 }
930
931 /**
932 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
933 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
934 * <CODE>Destination</CODE>, they can be used in the destination
935 * parameter to create a <CODE>MessageConsumer</CODE>.
936 *
937 * @param destination the <CODE>Destination</CODE> to access.
938 * @return the MessageConsumer
939 * @throws JMSException if the session fails to create a consumer due to
940 * some internal error.
941 * @throws InvalidDestinationException if an invalid destination is
942 * specified.
943 * @since 1.1
944 */
945 public MessageConsumer createConsumer(Destination destination) throws JMSException {
946 return createConsumer(destination, (String) null);
947 }
948
949 /**
950 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
951 * using a message selector. Since <CODE> Queue</CODE> and
952 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
953 * can be used in the destination parameter to create a
954 * <CODE>MessageConsumer</CODE>.
955 * <P>
956 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
957 * that have been sent to a destination.
958 *
959 * @param destination the <CODE>Destination</CODE> to access
960 * @param messageSelector only messages with properties matching the message
961 * selector expression are delivered. A value of null or an
962 * empty string indicates that there is no message selector
963 * for the message consumer.
964 * @return the MessageConsumer
965 * @throws JMSException if the session fails to create a MessageConsumer due
966 * to some internal error.
967 * @throws InvalidDestinationException if an invalid destination is
968 * specified.
969 * @throws InvalidSelectorException if the message selector is invalid.
970 * @since 1.1
971 */
972 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
973 return createConsumer(destination, messageSelector, false);
974 }
975
976 /**
977 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
978 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
979 * <CODE>Destination</CODE>, they can be used in the destination
980 * parameter to create a <CODE>MessageConsumer</CODE>.
981 *
982 * @param destination the <CODE>Destination</CODE> to access.
983 * @param messageListener the listener to use for async consumption of messages
984 * @return the MessageConsumer
985 * @throws JMSException if the session fails to create a consumer due to
986 * some internal error.
987 * @throws InvalidDestinationException if an invalid destination is
988 * specified.
989 * @since 1.1
990 */
991 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
992 return createConsumer(destination, null, messageListener);
993 }
994
995 /**
996 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
997 * using a message selector. Since <CODE> Queue</CODE> and
998 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
999 * can be used in the destination parameter to create a
1000 * <CODE>MessageConsumer</CODE>.
1001 * <P>
1002 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1003 * that have been sent to a destination.
1004 *
1005 * @param destination the <CODE>Destination</CODE> to access
1006 * @param messageSelector only messages with properties matching the message
1007 * selector expression are delivered. A value of null or an
1008 * empty string indicates that there is no message selector
1009 * for the message consumer.
1010 * @param messageListener the listener to use for async consumption of messages
1011 * @return the MessageConsumer
1012 * @throws JMSException if the session fails to create a MessageConsumer due
1013 * to some internal error.
1014 * @throws InvalidDestinationException if an invalid destination is
1015 * specified.
1016 * @throws InvalidSelectorException if the message selector is invalid.
1017 * @since 1.1
1018 */
1019 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1020 return createConsumer(destination, messageSelector, false, messageListener);
1021 }
1022
1023 /**
1024 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1025 * using a message selector. This method can specify whether messages
1026 * published by its own connection should be delivered to it, if the
1027 * destination is a topic.
1028 * <P>
1029 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1030 * <CODE>Destination</CODE>, they can be used in the destination
1031 * parameter to create a <CODE>MessageConsumer</CODE>.
1032 * <P>
1033 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1034 * that have been published to a destination.
1035 * <P>
1036 * In some cases, a connection may both publish and subscribe to a topic.
1037 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1038 * inhibit the delivery of messages published by its own connection. The
1039 * default value for this attribute is False. The <CODE>noLocal</CODE>
1040 * value must be supported by destinations that are topics.
1041 *
1042 * @param destination the <CODE>Destination</CODE> to access
1043 * @param messageSelector only messages with properties matching the message
1044 * selector expression are delivered. A value of null or an
1045 * empty string indicates that there is no message selector
1046 * for the message consumer.
1047 * @param noLocal - if true, and the destination is a topic, inhibits the
1048 * delivery of messages published by its own connection. The
1049 * behavior for <CODE>NoLocal</CODE> is not specified if
1050 * the destination is a queue.
1051 * @return the MessageConsumer
1052 * @throws JMSException if the session fails to create a MessageConsumer due
1053 * to some internal error.
1054 * @throws InvalidDestinationException if an invalid destination is
1055 * specified.
1056 * @throws InvalidSelectorException if the message selector is invalid.
1057 * @since 1.1
1058 */
1059 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1060 return createConsumer(destination, messageSelector, noLocal, null);
1061 }
1062
1063 /**
1064 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1065 * using a message selector. This method can specify whether messages
1066 * published by its own connection should be delivered to it, if the
1067 * destination is a topic.
1068 * <P>
1069 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1070 * <CODE>Destination</CODE>, they can be used in the destination
1071 * parameter to create a <CODE>MessageConsumer</CODE>.
1072 * <P>
1073 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1074 * that have been published to a destination.
1075 * <P>
1076 * In some cases, a connection may both publish and subscribe to a topic.
1077 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1078 * inhibit the delivery of messages published by its own connection. The
1079 * default value for this attribute is False. The <CODE>noLocal</CODE>
1080 * value must be supported by destinations that are topics.
1081 *
1082 * @param destination the <CODE>Destination</CODE> to access
1083 * @param messageSelector only messages with properties matching the message
1084 * selector expression are delivered. A value of null or an
1085 * empty string indicates that there is no message selector
1086 * for the message consumer.
1087 * @param noLocal - if true, and the destination is a topic, inhibits the
1088 * delivery of messages published by its own connection. The
1089 * behavior for <CODE>NoLocal</CODE> is not specified if
1090 * the destination is a queue.
1091 * @param messageListener the listener to use for async consumption of messages
1092 * @return the MessageConsumer
1093 * @throws JMSException if the session fails to create a MessageConsumer due
1094 * to some internal error.
1095 * @throws InvalidDestinationException if an invalid destination is
1096 * specified.
1097 * @throws InvalidSelectorException if the message selector is invalid.
1098 * @since 1.1
1099 */
1100 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1101 checkClosed();
1102
1103 if (destination instanceof CustomDestination) {
1104 CustomDestination customDestination = (CustomDestination)destination;
1105 return customDestination.createConsumer(this, messageSelector, noLocal);
1106 }
1107
1108 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1109 int prefetch = 0;
1110 if (destination instanceof Topic) {
1111 prefetch = prefetchPolicy.getTopicPrefetch();
1112 } else {
1113 prefetch = prefetchPolicy.getQueuePrefetch();
1114 }
1115 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1116 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1117 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1118 }
1119
1120 /**
1121 * Creates a queue identity given a <CODE>Queue</CODE> name.
1122 * <P>
1123 * This facility is provided for the rare cases where clients need to
1124 * dynamically manipulate queue identity. It allows the creation of a queue
1125 * identity with a provider-specific name. Clients that depend on this
1126 * ability are not portable.
1127 * <P>
1128 * Note that this method is not for creating the physical queue. The
1129 * physical creation of queues is an administrative task and is not to be
1130 * initiated by the JMS API. The one exception is the creation of temporary
1131 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1132 * method.
1133 *
1134 * @param queueName the name of this <CODE>Queue</CODE>
1135 * @return a <CODE>Queue</CODE> with the given name
1136 * @throws JMSException if the session fails to create a queue due to some
1137 * internal error.
1138 * @since 1.1
1139 */
1140 public Queue createQueue(String queueName) throws JMSException {
1141 checkClosed();
1142 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1143 return new ActiveMQTempQueue(queueName);
1144 }
1145 return new ActiveMQQueue(queueName);
1146 }
1147
1148 /**
1149 * Creates a topic identity given a <CODE>Topic</CODE> name.
1150 * <P>
1151 * This facility is provided for the rare cases where clients need to
1152 * dynamically manipulate topic identity. This allows the creation of a
1153 * topic identity with a provider-specific name. Clients that depend on this
1154 * ability are not portable.
1155 * <P>
1156 * Note that this method is not for creating the physical topic. The
1157 * physical creation of topics is an administrative task and is not to be
1158 * initiated by the JMS API. The one exception is the creation of temporary
1159 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1160 * method.
1161 *
1162 * @param topicName the name of this <CODE>Topic</CODE>
1163 * @return a <CODE>Topic</CODE> with the given name
1164 * @throws JMSException if the session fails to create a topic due to some
1165 * internal error.
1166 * @since 1.1
1167 */
1168 public Topic createTopic(String topicName) throws JMSException {
1169 checkClosed();
1170 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1171 return new ActiveMQTempTopic(topicName);
1172 }
1173 return new ActiveMQTopic(topicName);
1174 }
1175
1176 /**
1177 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1178 * the specified queue.
1179 *
1180 * @param queue the <CODE>queue</CODE> to access
1181 * @exception InvalidDestinationException if an invalid destination is
1182 * specified
1183 * @since 1.1
1184 */
1185 /**
1186 * Creates a durable subscriber to the specified topic.
1187 * <P>
1188 * If a client needs to receive all the messages published on a topic,
1189 * including the ones published while the subscriber is inactive, it uses a
1190 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1191 * record of this durable subscription and insures that all messages from
1192 * the topic's publishers are retained until they are acknowledged by this
1193 * durable subscriber or they have expired.
1194 * <P>
1195 * Sessions with durable subscribers must always provide the same client
1196 * identifier. In addition, each client must specify a name that uniquely
1197 * identifies (within client identifier) each durable subscription it
1198 * creates. Only one session at a time can have a
1199 * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1200 * <P>
1201 * A client can change an existing durable subscription by creating a
1202 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1203 * and/or message selector. Changing a durable subscriber is equivalent to
1204 * unsubscribing (deleting) the old one and creating a new one.
1205 * <P>
1206 * In some cases, a connection may both publish and subscribe to a topic.
1207 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1208 * inhibit the delivery of messages published by its own connection. The
1209 * default value for this attribute is false.
1210 *
1211 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1212 * @param name the name used to identify this subscription
1213 * @return the TopicSubscriber
1214 * @throws JMSException if the session fails to create a subscriber due to
1215 * some internal error.
1216 * @throws InvalidDestinationException if an invalid topic is specified.
1217 * @since 1.1
1218 */
1219 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1220 checkClosed();
1221 return createDurableSubscriber(topic, name, null, false);
1222 }
1223
1224 /**
1225 * Creates a durable subscriber to the specified topic, using a message
1226 * selector and specifying whether messages published by its own connection
1227 * should be delivered to it.
1228 * <P>
1229 * If a client needs to receive all the messages published on a topic,
1230 * including the ones published while the subscriber is inactive, it uses a
1231 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1232 * record of this durable subscription and insures that all messages from
1233 * the topic's publishers are retained until they are acknowledged by this
1234 * durable subscriber or they have expired.
1235 * <P>
1236 * Sessions with durable subscribers must always provide the same client
1237 * identifier. In addition, each client must specify a name which uniquely
1238 * identifies (within client identifier) each durable subscription it
1239 * creates. Only one session at a time can have a
1240 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1241 * inactive durable subscriber is one that exists but does not currently
1242 * have a message consumer associated with it.
1243 * <P>
1244 * A client can change an existing durable subscription by creating a
1245 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1246 * and/or message selector. Changing a durable subscriber is equivalent to
1247 * unsubscribing (deleting) the old one and creating a new one.
1248 *
1249 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1250 * @param name the name used to identify this subscription
1251 * @param messageSelector only messages with properties matching the message
1252 * selector expression are delivered. A value of null or an
1253 * empty string indicates that there is no message selector
1254 * for the message consumer.
1255 * @param noLocal if set, inhibits the delivery of messages published by its
1256 * own connection
1257 * @return the Queue Browser
1258 * @throws JMSException if the session fails to create a subscriber due to
1259 * some internal error.
1260 * @throws InvalidDestinationException if an invalid topic is specified.
1261 * @throws InvalidSelectorException if the message selector is invalid.
1262 * @since 1.1
1263 */
1264 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1265 checkClosed();
1266
1267 if (topic instanceof CustomDestination) {
1268 CustomDestination customDestination = (CustomDestination)topic;
1269 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1270 }
1271
1272 connection.checkClientIDWasManuallySpecified();
1273 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1274 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1275 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1276 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1277 noLocal, false, asyncDispatch);
1278 }
1279
1280 /**
1281 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1282 * the specified queue.
1283 *
1284 * @param queue the <CODE>queue</CODE> to access
1285 * @return the Queue Browser
1286 * @throws JMSException if the session fails to create a browser due to some
1287 * internal error.
1288 * @throws InvalidDestinationException if an invalid destination is
1289 * specified
1290 * @since 1.1
1291 */
1292 public QueueBrowser createBrowser(Queue queue) throws JMSException {
1293 checkClosed();
1294 return createBrowser(queue, null);
1295 }
1296
1297 /**
1298 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1299 * the specified queue using a message selector.
1300 *
1301 * @param queue the <CODE>queue</CODE> to access
1302 * @param messageSelector only messages with properties matching the message
1303 * selector expression are delivered. A value of null or an
1304 * empty string indicates that there is no message selector
1305 * for the message consumer.
1306 * @return the Queue Browser
1307 * @throws JMSException if the session fails to create a browser due to some
1308 * internal error.
1309 * @throws InvalidDestinationException if an invalid destination is
1310 * specified
1311 * @throws InvalidSelectorException if the message selector is invalid.
1312 * @since 1.1
1313 */
1314 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1315 checkClosed();
1316 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1317 }
1318
1319 /**
1320 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1321 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1322 *
1323 * @return a temporary queue identity
1324 * @throws JMSException if the session fails to create a temporary queue due
1325 * to some internal error.
1326 * @since 1.1
1327 */
1328 public TemporaryQueue createTemporaryQueue() throws JMSException {
1329 checkClosed();
1330 return (TemporaryQueue)connection.createTempDestination(false);
1331 }
1332
1333 /**
1334 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1335 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1336 *
1337 * @return a temporary topic identity
1338 * @throws JMSException if the session fails to create a temporary topic due
1339 * to some internal error.
1340 * @since 1.1
1341 */
1342 public TemporaryTopic createTemporaryTopic() throws JMSException {
1343 checkClosed();
1344 return (TemporaryTopic)connection.createTempDestination(true);
1345 }
1346
1347 /**
1348 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1349 * the specified queue.
1350 *
1351 * @param queue the <CODE>Queue</CODE> to access
1352 * @return
1353 * @throws JMSException if the session fails to create a receiver due to
1354 * some internal error.
1355 * @throws JMSException
1356 * @throws InvalidDestinationException if an invalid queue is specified.
1357 */
1358 public QueueReceiver createReceiver(Queue queue) throws JMSException {
1359 checkClosed();
1360 return createReceiver(queue, null);
1361 }
1362
1363 /**
1364 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1365 * the specified queue using a message selector.
1366 *
1367 * @param queue the <CODE>Queue</CODE> to access
1368 * @param messageSelector only messages with properties matching the message
1369 * selector expression are delivered. A value of null or an
1370 * empty string indicates that there is no message selector
1371 * for the message consumer.
1372 * @return QueueReceiver
1373 * @throws JMSException if the session fails to create a receiver due to
1374 * some internal error.
1375 * @throws InvalidDestinationException if an invalid queue is specified.
1376 * @throws InvalidSelectorException if the message selector is invalid.
1377 */
1378 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1379 checkClosed();
1380
1381 if (queue instanceof CustomDestination) {
1382 CustomDestination customDestination = (CustomDestination)queue;
1383 return customDestination.createReceiver(this, messageSelector);
1384 }
1385
1386 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1387 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1388 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1389 }
1390
1391 /**
1392 * Creates a <CODE>QueueSender</CODE> object to send messages to the
1393 * specified queue.
1394 *
1395 * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1396 * unidentified producer
1397 * @return QueueSender
1398 * @throws JMSException if the session fails to create a sender due to some
1399 * internal error.
1400 * @throws InvalidDestinationException if an invalid queue is specified.
1401 */
1402 public QueueSender createSender(Queue queue) throws JMSException {
1403 checkClosed();
1404 if (queue instanceof CustomDestination) {
1405 CustomDestination customDestination = (CustomDestination)queue;
1406 return customDestination.createSender(this);
1407 }
1408 int timeSendOut = connection.getSendTimeout();
1409 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1410 }
1411
1412 /**
1413 * Creates a nondurable subscriber to the specified topic. <p/>
1414 * <P>
1415 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1416 * that have been published to a topic. <p/>
1417 * <P>
1418 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1419 * receive only messages that are published while they are active. <p/>
1420 * <P>
1421 * In some cases, a connection may both publish and subscribe to a topic.
1422 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1423 * inhibit the delivery of messages published by its own connection. The
1424 * default value for this attribute is false.
1425 *
1426 * @param topic the <CODE>Topic</CODE> to subscribe to
1427 * @return TopicSubscriber
1428 * @throws JMSException if the session fails to create a subscriber due to
1429 * some internal error.
1430 * @throws InvalidDestinationException if an invalid topic is specified.
1431 */
1432 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1433 checkClosed();
1434 return createSubscriber(topic, null, false);
1435 }
1436
1437 /**
1438 * Creates a nondurable subscriber to the specified topic, using a message
1439 * selector or specifying whether messages published by its own connection
1440 * should be delivered to it. <p/>
1441 * <P>
1442 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1443 * that have been published to a topic. <p/>
1444 * <P>
1445 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1446 * receive only messages that are published while they are active. <p/>
1447 * <P>
1448 * Messages filtered out by a subscriber's message selector will never be
1449 * delivered to the subscriber. From the subscriber's perspective, they do
1450 * not exist. <p/>
1451 * <P>
1452 * In some cases, a connection may both publish and subscribe to a topic.
1453 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1454 * inhibit the delivery of messages published by its own connection. The
1455 * default value for this attribute is false.
1456 *
1457 * @param topic the <CODE>Topic</CODE> to subscribe to
1458 * @param messageSelector only messages with properties matching the message
1459 * selector expression are delivered. A value of null or an
1460 * empty string indicates that there is no message selector
1461 * for the message consumer.
1462 * @param noLocal if set, inhibits the delivery of messages published by its
1463 * own connection
1464 * @return TopicSubscriber
1465 * @throws JMSException if the session fails to create a subscriber due to
1466 * some internal error.
1467 * @throws InvalidDestinationException if an invalid topic is specified.
1468 * @throws InvalidSelectorException if the message selector is invalid.
1469 */
1470 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1471 checkClosed();
1472
1473 if (topic instanceof CustomDestination) {
1474 CustomDestination customDestination = (CustomDestination)topic;
1475 return customDestination.createSubscriber(this, messageSelector, noLocal);
1476 }
1477
1478 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1479 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1480 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1481 }
1482
1483 /**
1484 * Creates a publisher for the specified topic. <p/>
1485 * <P>
1486 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1487 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1488 * a topic, it defines a new sequence of messages that have no ordering
1489 * relationship with the messages it has previously sent.
1490 *
1491 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1492 * an unidentified producer
1493 * @return TopicPublisher
1494 * @throws JMSException if the session fails to create a publisher due to
1495 * some internal error.
1496 * @throws InvalidDestinationException if an invalid topic is specified.
1497 */
1498 public TopicPublisher createPublisher(Topic topic) throws JMSException {
1499 checkClosed();
1500
1501 if (topic instanceof CustomDestination) {
1502 CustomDestination customDestination = (CustomDestination)topic;
1503 return customDestination.createPublisher(this);
1504 }
1505 int timeSendOut = connection.getSendTimeout();
1506 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1507 }
1508
1509 /**
1510 * Unsubscribes a durable subscription that has been created by a client.
1511 * <P>
1512 * This method deletes the state being maintained on behalf of the
1513 * subscriber by its provider.
1514 * <P>
1515 * It is erroneous for a client to delete a durable subscription while there
1516 * is an active <CODE>MessageConsumer </CODE> or
1517 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1518 * message is part of a pending transaction or has not been acknowledged in
1519 * the session.
1520 *
1521 * @param name the name used to identify this subscription
1522 * @throws JMSException if the session fails to unsubscribe to the durable
1523 * subscription due to some internal error.
1524 * @throws InvalidDestinationException if an invalid subscription name is
1525 * specified.
1526 * @since 1.1
1527 */
1528 public void unsubscribe(String name) throws JMSException {
1529 checkClosed();
1530 connection.unsubscribe(name);
1531 }
1532
1533 public void dispatch(MessageDispatch messageDispatch) {
1534 try {
1535 executor.execute(messageDispatch);
1536 } catch (InterruptedException e) {
1537 Thread.currentThread().interrupt();
1538 connection.onClientInternalException(e);
1539 }
1540 }
1541
1542 /**
1543 * Acknowledges all consumed messages of the session of this consumed
1544 * message.
1545 * <P>
1546 * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1547 * for use when a client has specified that its JMS session's consumed
1548 * messages are to be explicitly acknowledged. By invoking
1549 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1550 * all messages consumed by the session that the message was delivered to.
1551 * <P>
1552 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1553 * sessions and sessions specified to use implicit acknowledgement modes.
1554 * <P>
1555 * A client may individually acknowledge each message as it is consumed, or
1556 * it may choose to acknowledge messages as an application-defined group
1557 * (which is done by calling acknowledge on the last received message of the
1558 * group, thereby acknowledging all messages consumed by the session.)
1559 * <P>
1560 * Messages that have been received but not acknowledged may be redelivered.
1561 *
1562 * @throws JMSException if the JMS provider fails to acknowledge the
1563 * messages due to some internal error.
1564 * @throws javax.jms.IllegalStateException if this method is called on a
1565 * closed session.
1566 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1567 */
1568 public void acknowledge() throws JMSException {
1569 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1570 ActiveMQMessageConsumer c = iter.next();
1571 c.acknowledge();
1572 }
1573 }
1574
1575 /**
1576 * Add a message consumer.
1577 *
1578 * @param consumer - message consumer.
1579 * @throws JMSException
1580 */
1581 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1582 this.consumers.add(consumer);
1583 if (consumer.isDurableSubscriber()) {
1584 stats.onCreateDurableSubscriber();
1585 }
1586 this.connection.addDispatcher(consumer.getConsumerId(), this);
1587 }
1588
1589 /**
1590 * Remove the message consumer.
1591 *
1592 * @param consumer - consumer to be removed.
1593 * @throws JMSException
1594 */
1595 protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1596 this.connection.removeDispatcher(consumer.getConsumerId());
1597 if (consumer.isDurableSubscriber()) {
1598 stats.onRemoveDurableSubscriber();
1599 }
1600 this.consumers.remove(consumer);
1601 this.connection.removeDispatcher(consumer);
1602 }
1603
1604 /**
1605 * Adds a message producer.
1606 *
1607 * @param producer - message producer to be added.
1608 * @throws JMSException
1609 */
1610 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1611 this.producers.add(producer);
1612 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1613 }
1614
1615 /**
1616 * Removes a message producer.
1617 *
1618 * @param producer - message producer to be removed.
1619 * @throws JMSException
1620 */
1621 protected void removeProducer(ActiveMQMessageProducer producer) {
1622 this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1623 this.producers.remove(producer);
1624 }
1625
1626 /**
1627 * Start this Session.
1628 *
1629 * @throws JMSException
1630 */
1631 protected void start() throws JMSException {
1632 started.set(true);
1633 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1634 ActiveMQMessageConsumer c = iter.next();
1635 c.start();
1636 }
1637 executor.start();
1638 }
1639
1640 /**
1641 * Stops this session.
1642 *
1643 * @throws JMSException
1644 */
1645 protected void stop() throws JMSException {
1646
1647 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1648 ActiveMQMessageConsumer c = iter.next();
1649 c.stop();
1650 }
1651
1652 started.set(false);
1653 executor.stop();
1654 }
1655
1656 /**
1657 * Returns the session id.
1658 *
1659 * @return value - session id.
1660 */
1661 protected SessionId getSessionId() {
1662 return info.getSessionId();
1663 }
1664
1665 /**
1666 * @return
1667 */
1668 protected ConsumerId getNextConsumerId() {
1669 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1670 }
1671
1672 /**
1673 * @return
1674 */
1675 protected ProducerId getNextProducerId() {
1676 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1677 }
1678
1679 /**
1680 * Sends the message for dispatch by the broker.
1681 *
1682 * @param producer - message producer.
1683 * @param destination - message destination.
1684 * @param message - message to be sent.
1685 * @param deliveryMode - JMS messsage delivery mode.
1686 * @param priority - message priority.
1687 * @param timeToLive - message expiration.
1688 * @param producerWindow
1689 * @throws JMSException
1690 */
1691 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1692 MemoryUsage producerWindow, int sendTimeout) throws JMSException {
1693
1694 checkClosed();
1695 if (destination.isTemporary() && connection.isDeleted(destination)) {
1696 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1697 }
1698 synchronized (sendMutex) {
1699 // tell the Broker we are about to start a new transaction
1700 doStartTransaction();
1701 TransactionId txid = transactionContext.getTransactionId();
1702 long sequenceNumber = producer.getMessageSequence();
1703
1704 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1705 message.setJMSDeliveryMode(deliveryMode);
1706 long expiration = 0L;
1707 if (!producer.getDisableMessageTimestamp()) {
1708 long timeStamp = System.currentTimeMillis();
1709 message.setJMSTimestamp(timeStamp);
1710 if (timeToLive > 0) {
1711 expiration = timeToLive + timeStamp;
1712 }
1713 }
1714 message.setJMSExpiration(expiration);
1715 message.setJMSPriority(priority);
1716 message.setJMSRedelivered(false);
1717
1718 // transform to our own message format here
1719 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1720
1721 // Set the message id.
1722 if (msg == message) {
1723 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1724 } else {
1725 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1726 message.setJMSMessageID(msg.getMessageId().toString());
1727 }
1728 //clear the brokerPath in case we are re-sending this message
1729 msg.setBrokerPath(null);
1730 // destination format is provider specific so only set on transformed message
1731 msg.setJMSDestination(destination);
1732
1733 msg.setTransactionId(txid);
1734 if (connection.isCopyMessageOnSend()) {
1735 msg = (ActiveMQMessage)msg.copy();
1736 }
1737 msg.setConnection(connection);
1738 msg.onSend();
1739 msg.setProducerId(msg.getMessageId().getProducerId());
1740 if (LOG.isTraceEnabled()) {
1741 LOG.trace(getSessionId() + " sending message: " + msg);
1742 }
1743 if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1744 this.connection.asyncSendPacket(msg);
1745 if (producerWindow != null) {
1746 // Since we defer lots of the marshaling till we hit the
1747 // wire, this might not
1748 // provide and accurate size. We may change over to doing
1749 // more aggressive marshaling,
1750 // to get more accurate sizes.. this is more important once
1751 // users start using producer window
1752 // flow control.
1753 int size = msg.getSize();
1754 producerWindow.increaseUsage(size);
1755 }
1756 } else {
1757 if (sendTimeout > 0) {
1758 this.connection.syncSendPacket(msg,sendTimeout);
1759 }else {
1760 this.connection.syncSendPacket(msg);
1761 }
1762 }
1763
1764 }
1765 }
1766
1767 /**
1768 * Send TransactionInfo to indicate transaction has started
1769 *
1770 * @throws JMSException if some internal error occurs
1771 */
1772 protected void doStartTransaction() throws JMSException {
1773 if (getTransacted() && !transactionContext.isInXATransaction()) {
1774 transactionContext.begin();
1775 }
1776 }
1777
1778 /**
1779 * Checks whether the session has unconsumed messages.
1780 *
1781 * @return true - if there are unconsumed messages.
1782 */
1783 public boolean hasUncomsumedMessages() {
1784 return executor.hasUncomsumedMessages();
1785 }
1786
1787 /**
1788 * Checks whether the session uses transactions.
1789 *
1790 * @return true - if the session uses transactions.
1791 */
1792 public boolean isTransacted() {
1793 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1794 }
1795
1796 /**
1797 * Checks whether the session used client acknowledgment.
1798 *
1799 * @return true - if the session uses client acknowledgment.
1800 */
1801 protected boolean isClientAcknowledge() {
1802 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1803 }
1804
1805 /**
1806 * Checks whether the session used auto acknowledgment.
1807 *
1808 * @return true - if the session uses client acknowledgment.
1809 */
1810 public boolean isAutoAcknowledge() {
1811 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1812 }
1813
1814 /**
1815 * Checks whether the session used dup ok acknowledgment.
1816 *
1817 * @return true - if the session uses client acknowledgment.
1818 */
1819 public boolean isDupsOkAcknowledge() {
1820 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1821 }
1822
1823 public boolean isIndividualAcknowledge(){
1824 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1825 }
1826
1827 /**
1828 * Returns the message delivery listener.
1829 *
1830 * @return deliveryListener - message delivery listener.
1831 */
1832 public DeliveryListener getDeliveryListener() {
1833 return deliveryListener;
1834 }
1835
1836 /**
1837 * Sets the message delivery listener.
1838 *
1839 * @param deliveryListener - message delivery listener.
1840 */
1841 public void setDeliveryListener(DeliveryListener deliveryListener) {
1842 this.deliveryListener = deliveryListener;
1843 }
1844
1845 /**
1846 * Returns the SessionInfo bean.
1847 *
1848 * @return info - SessionInfo bean.
1849 * @throws JMSException
1850 */
1851 protected SessionInfo getSessionInfo() throws JMSException {
1852 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1853 return info;
1854 }
1855
1856 /**
1857 * Send the asynchronus command.
1858 *
1859 * @param command - command to be executed.
1860 * @throws JMSException
1861 */
1862 public void asyncSendPacket(Command command) throws JMSException {
1863 connection.asyncSendPacket(command);
1864 }
1865
1866 /**
1867 * Send the synchronus command.
1868 *
1869 * @param command - command to be executed.
1870 * @return Response
1871 * @throws JMSException
1872 */
1873 public Response syncSendPacket(Command command) throws JMSException {
1874 return connection.syncSendPacket(command);
1875 }
1876
1877 public long getNextDeliveryId() {
1878 return deliveryIdGenerator.getNextSequenceId();
1879 }
1880
1881 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1882
1883 List<MessageDispatch> c = unconsumedMessages.removeAll();
1884 for (MessageDispatch md : c) {
1885 this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1886 }
1887 Collections.reverse(c);
1888
1889 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1890 MessageDispatch md = iter.next();
1891 executor.executeFirst(md);
1892 }
1893
1894 }
1895
1896 public boolean isRunning() {
1897 return started.get();
1898 }
1899
1900 public boolean isAsyncDispatch() {
1901 return asyncDispatch;
1902 }
1903
1904 public void setAsyncDispatch(boolean asyncDispatch) {
1905 this.asyncDispatch = asyncDispatch;
1906 }
1907
1908 /**
1909 * @return Returns the sessionAsyncDispatch.
1910 */
1911 public boolean isSessionAsyncDispatch() {
1912 return sessionAsyncDispatch;
1913 }
1914
1915 /**
1916 * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1917 */
1918 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1919 this.sessionAsyncDispatch = sessionAsyncDispatch;
1920 }
1921
1922 public MessageTransformer getTransformer() {
1923 return transformer;
1924 }
1925
1926 public ActiveMQConnection getConnection() {
1927 return connection;
1928 }
1929
1930 /**
1931 * Sets the transformer used to transform messages before they are sent on
1932 * to the JMS bus or when they are received from the bus but before they are
1933 * delivered to the JMS client
1934 */
1935 public void setTransformer(MessageTransformer transformer) {
1936 this.transformer = transformer;
1937 }
1938
1939 public BlobTransferPolicy getBlobTransferPolicy() {
1940 return blobTransferPolicy;
1941 }
1942
1943 /**
1944 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1945 * OBjects) are transferred from producers to brokers to consumers
1946 */
1947 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1948 this.blobTransferPolicy = blobTransferPolicy;
1949 }
1950
1951 public List getUnconsumedMessages() {
1952 return executor.getUnconsumedMessages();
1953 }
1954
1955 @Override
1956 public String toString() {
1957 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1958 }
1959
1960 public void checkMessageListener() throws JMSException {
1961 if (messageListener != null) {
1962 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1963 }
1964 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1965 ActiveMQMessageConsumer consumer = i.next();
1966 if (consumer.getMessageListener() != null) {
1967 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1968 }
1969 }
1970 }
1971
1972 protected void setOptimizeAcknowledge(boolean value) {
1973 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1974 ActiveMQMessageConsumer c = iter.next();
1975 c.setOptimizeAcknowledge(value);
1976 }
1977 }
1978
1979 protected void setPrefetchSize(ConsumerId id, int prefetch) {
1980 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1981 ActiveMQMessageConsumer c = iter.next();
1982 if (c.getConsumerId().equals(id)) {
1983 c.setPrefetchSize(prefetch);
1984 break;
1985 }
1986 }
1987 }
1988
1989 protected void close(ConsumerId id) {
1990 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1991 ActiveMQMessageConsumer c = iter.next();
1992 if (c.getConsumerId().equals(id)) {
1993 try {
1994 c.close();
1995 } catch (JMSException e) {
1996 LOG.warn("Exception closing consumer", e);
1997 }
1998 LOG.warn("Closed consumer on Command");
1999 break;
2000 }
2001 }
2002 }
2003
2004 public boolean isInUse(ActiveMQTempDestination destination) {
2005 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2006 ActiveMQMessageConsumer c = iter.next();
2007 if (c.isInUse(destination)) {
2008 return true;
2009 }
2010 }
2011 return false;
2012 }
2013
2014 /**
2015 * highest sequence id of the last message delivered by this session.
2016 * Passed to the broker in the close command, maintained by dispose()
2017 * @return lastDeliveredSequenceId
2018 */
2019 public long getLastDeliveredSequenceId() {
2020 return lastDeliveredSequenceId;
2021 }
2022
2023 protected void sendAck(MessageAck ack) throws JMSException {
2024 sendAck(ack,false);
2025 }
2026
2027 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2028 if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2029 asyncSendPacket(ack);
2030 } else {
2031 syncSendPacket(ack);
2032 }
2033 }
2034
2035 protected Scheduler getScheduler() {
2036 return this.scheduler;
2037 }
2038
2039 protected ThreadPoolExecutor getConnectionExecutor() {
2040 return this.connectionExecutor;
2041 }
2042 }