001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.util.Timer;
021 import java.util.concurrent.SynchronousQueue;
022 import java.util.concurrent.ThreadFactory;
023 import java.util.concurrent.ThreadPoolExecutor;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import java.util.concurrent.atomic.AtomicInteger;
027
028 import org.apache.activemq.command.KeepAliveInfo;
029 import org.apache.activemq.command.WireFormatInfo;
030 import org.apache.activemq.thread.SchedulerTimerTask;
031 import org.apache.activemq.wireformat.WireFormat;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * Used to make sure that commands are arriving periodically from the peer of
037 * the transport.
038 *
039 *
040 */
041 public class InactivityMonitor extends TransportFilter {
042
043 private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
044 private static ThreadPoolExecutor ASYNC_TASKS;
045 private static int CHECKER_COUNTER;
046 private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047 private static Timer READ_CHECK_TIMER;
048 private static Timer WRITE_CHECK_TIMER;
049
050 private WireFormatInfo localWireFormatInfo;
051 private WireFormatInfo remoteWireFormatInfo;
052 private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053
054 private final AtomicBoolean commandSent = new AtomicBoolean(false);
055 private final AtomicBoolean inSend = new AtomicBoolean(false);
056 private final AtomicBoolean failed = new AtomicBoolean(false);
057
058 private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059 private final AtomicBoolean inReceive = new AtomicBoolean(false);
060 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061
062 private SchedulerTimerTask writeCheckerTask;
063 private SchedulerTimerTask readCheckerTask;
064
065 private boolean ignoreRemoteWireFormat = false;
066 private boolean ignoreAllWireFormatInfo = false;
067
068 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
069 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
070 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
071 private boolean useKeepAlive = true;
072 private boolean keepAliveResponseRequired;
073 private WireFormat wireFormat;
074
075 private final Runnable readChecker = new Runnable() {
076 long lastRunTime;
077 public void run() {
078 long now = System.currentTimeMillis();
079 long elapsed = (now-lastRunTime);
080
081 if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
082 LOG.debug(""+elapsed+" ms elapsed since last read check.");
083 }
084
085 // Perhaps the timer executed a read check late.. and then executes
086 // the next read check on time which causes the time elapsed between
087 // read checks to be small..
088
089 // If less than 90% of the read check Time elapsed then abort this readcheck.
090 if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
091 LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
092 return;
093 }
094
095 lastRunTime = now;
096 readCheck();
097 }
098 };
099
100 private boolean allowReadCheck(long elapsed) {
101 return elapsed > (readCheckTime * 9 / 10);
102 }
103
104 private final Runnable writeChecker = new Runnable() {
105 long lastRunTime;
106 public void run() {
107 long now = System.currentTimeMillis();
108 if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
109 LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
110
111 }
112 lastRunTime = now;
113 writeCheck();
114 }
115 };
116
117 public InactivityMonitor(Transport next, WireFormat wireFormat) {
118 super(next);
119 this.wireFormat = wireFormat;
120 if (this.wireFormat == null) {
121 this.ignoreAllWireFormatInfo = true;
122 }
123 }
124
125 public void start() throws Exception {
126 next.start();
127 startMonitorThreads();
128 }
129
130 public void stop() throws Exception {
131 stopMonitorThreads();
132 next.stop();
133 }
134
135 final void writeCheck() {
136 if (inSend.get()) {
137 if (LOG.isTraceEnabled()) {
138 LOG.trace("A send is in progress");
139 }
140 return;
141 }
142
143 if (!commandSent.get() && useKeepAlive) {
144 if (LOG.isTraceEnabled()) {
145 LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
146 }
147 ASYNC_TASKS.execute(new Runnable() {
148 public void run() {
149 if (monitorStarted.get()) {
150 try {
151
152 KeepAliveInfo info = new KeepAliveInfo();
153 info.setResponseRequired(keepAliveResponseRequired);
154 oneway(info);
155 } catch (IOException e) {
156 onException(e);
157 }
158 }
159 };
160 });
161 } else {
162 if (LOG.isTraceEnabled()) {
163 LOG.trace(this + " message sent since last write check, resetting flag");
164 }
165 }
166
167 commandSent.set(false);
168 }
169
170 final void readCheck() {
171 int currentCounter = next.getReceiveCounter();
172 int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
173 if (inReceive.get() || currentCounter!=previousCounter ) {
174 if (LOG.isTraceEnabled()) {
175 LOG.trace("A receive is in progress");
176 }
177 return;
178 }
179 if (!commandReceived.get()) {
180 if (LOG.isDebugEnabled()) {
181 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
182 }
183 ASYNC_TASKS.execute(new Runnable() {
184 public void run() {
185 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
186 };
187
188 });
189 } else {
190 if (LOG.isTraceEnabled()) {
191 LOG.trace("Message received since last read check, resetting flag: ");
192 }
193 }
194 commandReceived.set(false);
195 }
196
197 public void onCommand(Object command) {
198 commandReceived.set(true);
199 inReceive.set(true);
200 try {
201 if (command.getClass() == KeepAliveInfo.class) {
202 KeepAliveInfo info = (KeepAliveInfo) command;
203 if (info.isResponseRequired()) {
204 try {
205 info.setResponseRequired(false);
206 oneway(info);
207 } catch (IOException e) {
208 onException(e);
209 }
210 }
211 } else {
212 if (command.getClass() == WireFormatInfo.class) {
213 synchronized (this) {
214 IOException error = null;
215 remoteWireFormatInfo = (WireFormatInfo) command;
216 try {
217 startMonitorThreads();
218 } catch (IOException e) {
219 error = e;
220 }
221 if (error != null) {
222 onException(error);
223 }
224 }
225 }
226 synchronized (readChecker) {
227 transportListener.onCommand(command);
228 }
229 }
230 } finally {
231
232 inReceive.set(false);
233 }
234 }
235
236 public void oneway(Object o) throws IOException {
237 // Disable inactivity monitoring while processing a command.
238 //synchronize this method - its not synchronized
239 //further down the transport stack and gets called by more
240 //than one thread by this class
241 synchronized(inSend) {
242 inSend.set(true);
243 try {
244
245 if( failed.get() ) {
246 throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
247 }
248 if (o.getClass() == WireFormatInfo.class) {
249 synchronized (this) {
250 localWireFormatInfo = (WireFormatInfo)o;
251 startMonitorThreads();
252 }
253 }
254 next.oneway(o);
255 } finally {
256 commandSent.set(true);
257 inSend.set(false);
258 }
259 }
260 }
261
262 public void onException(IOException error) {
263 if (failed.compareAndSet(false, true)) {
264 stopMonitorThreads();
265 transportListener.onException(error);
266 }
267 }
268
269 public void setKeepAliveResponseRequired(boolean val) {
270 keepAliveResponseRequired = val;
271 }
272
273 public void setUseKeepAlive(boolean val) {
274 useKeepAlive = val;
275 }
276
277 public void setIgnoreRemoteWireFormat(boolean val) {
278 ignoreRemoteWireFormat = val;
279 }
280
281 public long getReadCheckTime() {
282 return readCheckTime;
283 }
284
285 public void setReadCheckTime(long readCheckTime) {
286 this.readCheckTime = readCheckTime;
287 }
288
289 public long getInitialDelayTime() {
290 return initialDelayTime;
291 }
292
293 public void setInitialDelayTime(long initialDelayTime) {
294 this.initialDelayTime = initialDelayTime;
295 }
296
297 private synchronized void startMonitorThreads() throws IOException {
298 if (monitorStarted.get()) {
299 return;
300 }
301
302 if (!configuredOk()) {
303 return;
304 }
305
306 if (readCheckTime > 0) {
307 monitorStarted.set(true);
308 writeCheckerTask = new SchedulerTimerTask(writeChecker);
309 readCheckerTask = new SchedulerTimerTask(readChecker);
310 writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
311 synchronized( InactivityMonitor.class ) {
312 if( CHECKER_COUNTER == 0 ) {
313 ASYNC_TASKS = createExecutor();
314 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
315 WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
316 }
317 CHECKER_COUNTER++;
318 WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
319 READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
320 }
321 }
322 }
323
324 private boolean configuredOk() throws IOException {
325 boolean configured = false;
326 if (ignoreAllWireFormatInfo) {
327 configured = true;
328 } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
329 if (!ignoreRemoteWireFormat) {
330 if (LOG.isDebugEnabled()) {
331 LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
332 }
333 readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
334 initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
335 } else {
336 if (LOG.isDebugEnabled()) {
337 LOG.debug("Using local: " + localWireFormatInfo);
338 }
339 readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
340 initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
341 }
342 configured = true;
343 }
344 return configured;
345 }
346
347 /**
348 *
349 */
350 private synchronized void stopMonitorThreads() {
351 if (monitorStarted.compareAndSet(true, false)) {
352 readCheckerTask.cancel();
353 writeCheckerTask.cancel();
354 synchronized( InactivityMonitor.class ) {
355 WRITE_CHECK_TIMER.purge();
356 READ_CHECK_TIMER.purge();
357 CHECKER_COUNTER--;
358 if(CHECKER_COUNTER==0) {
359 WRITE_CHECK_TIMER.cancel();
360 READ_CHECK_TIMER.cancel();
361 WRITE_CHECK_TIMER = null;
362 READ_CHECK_TIMER = null;
363 ASYNC_TASKS.shutdownNow();
364 ASYNC_TASKS = null;
365 }
366 }
367 }
368 }
369
370 private ThreadFactory factory = new ThreadFactory() {
371 public Thread newThread(Runnable runnable) {
372 Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
373 thread.setDaemon(true);
374 return thread;
375 }
376 };
377
378 private ThreadPoolExecutor createExecutor() {
379 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
380 }
381 }