001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.vm;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.net.URI;
022 import java.util.concurrent.LinkedBlockingQueue;
023 import java.util.concurrent.atomic.AtomicBoolean;
024 import java.util.concurrent.atomic.AtomicLong;
025 import org.apache.activemq.command.ShutdownInfo;
026 import org.apache.activemq.thread.DefaultThreadPools;
027 import org.apache.activemq.thread.Task;
028 import org.apache.activemq.thread.TaskRunner;
029 import org.apache.activemq.thread.TaskRunnerFactory;
030 import org.apache.activemq.thread.Valve;
031 import org.apache.activemq.transport.FutureResponse;
032 import org.apache.activemq.transport.ResponseCallback;
033 import org.apache.activemq.transport.Transport;
034 import org.apache.activemq.transport.TransportDisposedIOException;
035 import org.apache.activemq.transport.TransportListener;
036 import org.apache.activemq.util.IOExceptionSupport;
037
038
039 /**
040 * A Transport implementation that uses direct method invocations.
041 *
042 *
043 */
044 public class VMTransport implements Transport, Task {
045
046 private static final Object DISCONNECT = new Object();
047 private static final AtomicLong NEXT_ID = new AtomicLong(0);
048 protected VMTransport peer;
049 protected TransportListener transportListener;
050 protected boolean disposed;
051 protected boolean marshal;
052 protected boolean network;
053 protected boolean async = true;
054 protected int asyncQueueDepth = 2000;
055 protected LinkedBlockingQueue<Object> messageQueue;
056 protected boolean started;
057 protected final URI location;
058 protected final long id;
059 private TaskRunner taskRunner;
060 private final Object lazyInitMutext = new Object();
061 private final Valve enqueueValve = new Valve(true);
062 protected final AtomicBoolean stopping = new AtomicBoolean();
063 private volatile int receiveCounter;
064
065 public VMTransport(URI location) {
066 this.location = location;
067 this.id = NEXT_ID.getAndIncrement();
068 }
069
070 public void setPeer(VMTransport peer) {
071 this.peer = peer;
072 }
073
074 public void oneway(Object command) throws IOException {
075 if (disposed) {
076 throw new TransportDisposedIOException("Transport disposed.");
077 }
078 if (peer == null) {
079 throw new IOException("Peer not connected.");
080 }
081
082
083 TransportListener transportListener=null;
084 try {
085 // Disable the peer from changing his state while we try to enqueue onto him.
086 peer.enqueueValve.increment();
087
088 if (peer.disposed || peer.stopping.get()) {
089 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
090 }
091
092 if (peer.started) {
093 if (peer.async) {
094 peer.getMessageQueue().put(command);
095 peer.wakeup();
096 } else {
097 transportListener = peer.transportListener;
098 }
099 } else {
100 peer.getMessageQueue().put(command);
101 }
102
103 } catch (InterruptedException e) {
104 InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
105 iioe.initCause(e);
106 throw iioe;
107 } finally {
108 // Allow the peer to change state again...
109 peer.enqueueValve.decrement();
110 }
111
112 dispatch(peer, transportListener, command);
113 }
114
115 public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
116 if( transportListener!=null ) {
117 if( command == DISCONNECT ) {
118 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
119 } else {
120 transport.receiveCounter++;
121 transportListener.onCommand(command);
122 }
123 }
124 }
125
126 public void start() throws Exception {
127 if (transportListener == null) {
128 throw new IOException("TransportListener not set.");
129 }
130 try {
131 enqueueValve.turnOff();
132 if (messageQueue != null && !async) {
133 Object command;
134 while ((command = messageQueue.poll()) != null && !stopping.get() ) {
135 receiveCounter++;
136 dispatch(this, transportListener, command);
137 }
138 }
139 started = true;
140 wakeup();
141 } finally {
142 enqueueValve.turnOn();
143 }
144 // If we get stopped while starting up, then do the actual stop now
145 // that the enqueueValve is back on.
146 if( stopping.get() ) {
147 stop();
148 }
149 }
150
151 public void stop() throws Exception {
152 stopping.set(true);
153
154 // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
155 if( enqueueValve.isOn() ) {
156
157 // let the peer know that we are disconnecting..
158 try {
159 peer.transportListener.onCommand(new ShutdownInfo());
160 } catch (Exception ignore) {
161 }
162
163
164 TaskRunner tr = null;
165 try {
166 enqueueValve.turnOff();
167 if (!disposed) {
168 started = false;
169 disposed = true;
170 if (taskRunner != null) {
171 tr = taskRunner;
172 taskRunner = null;
173 }
174 }
175 } finally {
176 stopping.set(false);
177 enqueueValve.turnOn();
178 }
179 if (tr != null) {
180 tr.shutdown(1000);
181 }
182
183
184 }
185
186 }
187
188 /**
189 * @see org.apache.activemq.thread.Task#iterate()
190 */
191 public boolean iterate() {
192
193 final TransportListener tl;
194 try {
195 // Disable changing the state variables while we are running...
196 enqueueValve.increment();
197 tl = transportListener;
198 if (!started || disposed || tl == null || stopping.get()) {
199 if( stopping.get() ) {
200 // drain the queue it since folks could be blocked putting on to
201 // it and that would not allow the stop() method for finishing up.
202 getMessageQueue().clear();
203 }
204 return false;
205 }
206 } catch (InterruptedException e) {
207 return false;
208 } finally {
209 enqueueValve.decrement();
210 }
211
212 LinkedBlockingQueue<Object> mq = getMessageQueue();
213 Object command = mq.poll();
214 if (command != null) {
215 if( command == DISCONNECT ) {
216 tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
217 } else {
218 tl.onCommand(command);
219 }
220 return !mq.isEmpty();
221 } else {
222 return false;
223 }
224
225 }
226
227 public void setTransportListener(TransportListener commandListener) {
228 try {
229 try {
230 enqueueValve.turnOff();
231 this.transportListener = commandListener;
232 wakeup();
233 } finally {
234 enqueueValve.turnOn();
235 }
236 } catch (InterruptedException e) {
237 throw new RuntimeException(e);
238 }
239 }
240
241 private LinkedBlockingQueue<Object> getMessageQueue() {
242 synchronized (lazyInitMutext) {
243 if (messageQueue == null) {
244 messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
245 }
246 return messageQueue;
247 }
248 }
249
250 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
251 throw new AssertionError("Unsupported Method");
252 }
253
254 public Object request(Object command) throws IOException {
255 throw new AssertionError("Unsupported Method");
256 }
257
258 public Object request(Object command, int timeout) throws IOException {
259 throw new AssertionError("Unsupported Method");
260 }
261
262 public TransportListener getTransportListener() {
263 return transportListener;
264 }
265
266 public <T> T narrow(Class<T> target) {
267 if (target.isAssignableFrom(getClass())) {
268 return target.cast(this);
269 }
270 return null;
271 }
272
273 public boolean isMarshal() {
274 return marshal;
275 }
276
277 public void setMarshal(boolean marshal) {
278 this.marshal = marshal;
279 }
280
281 public boolean isNetwork() {
282 return network;
283 }
284
285 public void setNetwork(boolean network) {
286 this.network = network;
287 }
288
289 @Override
290 public String toString() {
291 return location + "#" + id;
292 }
293
294 public String getRemoteAddress() {
295 if (peer != null) {
296 return peer.toString();
297 }
298 return null;
299 }
300
301 /**
302 * @return the async
303 */
304 public boolean isAsync() {
305 return async;
306 }
307
308 /**
309 * @param async the async to set
310 */
311 public void setAsync(boolean async) {
312 this.async = async;
313 }
314
315 /**
316 * @return the asyncQueueDepth
317 */
318 public int getAsyncQueueDepth() {
319 return asyncQueueDepth;
320 }
321
322 /**
323 * @param asyncQueueDepth the asyncQueueDepth to set
324 */
325 public void setAsyncQueueDepth(int asyncQueueDepth) {
326 this.asyncQueueDepth = asyncQueueDepth;
327 }
328
329 protected void wakeup() {
330 if (async) {
331 synchronized (lazyInitMutext) {
332 if (taskRunner == null) {
333 taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
334 }
335 }
336 try {
337 taskRunner.wakeup();
338 } catch (InterruptedException e) {
339 Thread.currentThread().interrupt();
340 }
341 }
342 }
343
344 public boolean isFaultTolerant() {
345 return false;
346 }
347
348 public boolean isDisposed() {
349 return disposed;
350 }
351
352 public boolean isConnected() {
353 return started;
354 }
355
356 public void reconnect(URI uri) throws IOException {
357 throw new IOException("Not supported");
358 }
359
360 public boolean isReconnectSupported() {
361 return false;
362 }
363
364 public boolean isUpdateURIsSupported() {
365 return false;
366 }
367 public void updateURIs(boolean reblance,URI[] uris) throws IOException {
368 throw new IOException("Not supported");
369 }
370
371 public int getReceiveCounter() {
372 return receiveCounter;
373 }
374 }