001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq;
019
020 import java.util.List;
021 import javax.jms.JMSException;
022 import org.apache.activemq.command.ConsumerId;
023 import org.apache.activemq.command.MessageDispatch;
024 import org.apache.activemq.thread.Task;
025 import org.apache.activemq.thread.TaskRunner;
026 import org.apache.activemq.util.JMSExceptionSupport;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * A utility class used by the Session for dispatching messages asynchronously
032 * to consumers
033 *
034 *
035 * @see javax.jms.Session
036 */
037 public class ActiveMQSessionExecutor implements Task {
038 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class);
039
040 private final ActiveMQSession session;
041 private final MessageDispatchChannel messageQueue;
042 private boolean dispatchedBySessionPool;
043 private volatile TaskRunner taskRunner;
044 private boolean startedOrWarnedThatNotStarted;
045
046 ActiveMQSessionExecutor(ActiveMQSession session) {
047 this.session = session;
048 if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
049 this.messageQueue = new SimplePriorityMessageDispatchChannel();
050 }else {
051 this.messageQueue = new FifoMessageDispatchChannel();
052 }
053 }
054
055 void setDispatchedBySessionPool(boolean value) {
056 dispatchedBySessionPool = value;
057 wakeup();
058 }
059
060 void execute(MessageDispatch message) throws InterruptedException {
061
062 if (!startedOrWarnedThatNotStarted) {
063
064 ActiveMQConnection connection = session.connection;
065 long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
066 if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
067 startedOrWarnedThatNotStarted = true;
068 } else {
069 long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
070
071 // lets only warn when a significant amount of time has passed
072 // just in case its normal operation
073 if (elapsedTime > aboutUnstartedConnectionTimeout) {
074 LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
075 + " Received: " + message);
076 startedOrWarnedThatNotStarted = true;
077 }
078 }
079 }
080
081 if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
082 dispatch(message);
083 } else {
084 messageQueue.enqueue(message);
085 wakeup();
086 }
087 }
088
089 public void wakeup() {
090 if (!dispatchedBySessionPool) {
091 if (session.isSessionAsyncDispatch()) {
092 try {
093 TaskRunner taskRunner = this.taskRunner;
094 if (taskRunner == null) {
095 synchronized (this) {
096 if (this.taskRunner == null) {
097 if (!isRunning()) {
098 // stop has been called
099 return;
100 }
101 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
102 "ActiveMQ Session: " + session.getSessionId());
103 }
104 taskRunner = this.taskRunner;
105 }
106 }
107 taskRunner.wakeup();
108 } catch (InterruptedException e) {
109 Thread.currentThread().interrupt();
110 }
111 } else {
112 while (iterate()) {
113 }
114 }
115 }
116 }
117
118 void executeFirst(MessageDispatch message) {
119 messageQueue.enqueueFirst(message);
120 wakeup();
121 }
122
123 public boolean hasUncomsumedMessages() {
124 return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
125 }
126
127 void dispatch(MessageDispatch message) {
128
129 // TODO - we should use a Map for this indexed by consumerId
130
131 for (ActiveMQMessageConsumer consumer : this.session.consumers) {
132 ConsumerId consumerId = message.getConsumerId();
133 if (consumerId.equals(consumer.getConsumerId())) {
134 consumer.dispatch(message);
135 break;
136 }
137 }
138 }
139
140 synchronized void start() {
141 if (!messageQueue.isRunning()) {
142 messageQueue.start();
143 if (hasUncomsumedMessages()) {
144 wakeup();
145 }
146 }
147 }
148
149 void stop() throws JMSException {
150 try {
151 if (messageQueue.isRunning()) {
152 synchronized(this) {
153 messageQueue.stop();
154 if (this.taskRunner != null) {
155 this.taskRunner.shutdown();
156 this.taskRunner = null;
157 }
158 }
159 }
160 } catch (InterruptedException e) {
161 Thread.currentThread().interrupt();
162 throw JMSExceptionSupport.create(e);
163 }
164 }
165
166 boolean isRunning() {
167 return messageQueue.isRunning();
168 }
169
170 void close() {
171 messageQueue.close();
172 }
173
174 void clear() {
175 messageQueue.clear();
176 }
177
178 MessageDispatch dequeueNoWait() {
179 return messageQueue.dequeueNoWait();
180 }
181
182 protected void clearMessagesInProgress() {
183 messageQueue.clear();
184 }
185
186 public boolean isEmpty() {
187 return messageQueue.isEmpty();
188 }
189
190 public boolean iterate() {
191
192 // Deliver any messages queued on the consumer to their listeners.
193 for (ActiveMQMessageConsumer consumer : this.session.consumers) {
194 if (consumer.iterate()) {
195 return true;
196 }
197 }
198
199 // No messages left queued on the listeners.. so now dispatch messages
200 // queued on the session
201 MessageDispatch message = messageQueue.dequeueNoWait();
202 if (message == null) {
203 return false;
204 } else {
205 dispatch(message);
206 return !messageQueue.isEmpty();
207 }
208 }
209
210 List getUnconsumedMessages() {
211 return messageQueue.removeAll();
212 }
213
214 }