001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport;
019
020 import java.io.IOException;
021 import java.net.Socket;
022 import java.util.Iterator;
023 import java.util.concurrent.ConcurrentLinkedQueue;
024 import java.util.concurrent.atomic.AtomicInteger;
025 import java.util.concurrent.locks.Condition;
026 import java.util.concurrent.locks.ReentrantLock;
027
028 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
029 import org.apache.activemq.transport.tcp.TimeStampStream;
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 /**
034 * This filter implements write timeouts for socket write operations.
035 * When using blocking IO, the Java implementation doesn't have an explicit flag
036 * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
037 * which is usually around 13-30 minutes).<br/>
038 * To enable this transport, in the transport URI, simpley add<br/>
039 * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
040 * For example (15 second timeout on write operations to the socket):</br>
041 * <pre><code>
042 * <transportConnector
043 * name="tcp1"
044 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
045 * />
046 * </code></pre><br/>
047 * For example (enable default timeout on the socket):</br>
048 * <pre><code>
049 * <transportConnector
050 * name="tcp1"
051 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
052 * />
053 * </code></pre>
054 * @author Filip Hanik
055 *
056 */
057 public class WriteTimeoutFilter extends TransportFilter {
058
059 private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
060 protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
061 protected static AtomicInteger messageCounter = new AtomicInteger(0);
062 protected static TimeoutThread timeoutThread = new TimeoutThread();
063
064 protected static long sleep = 5000l;
065
066 protected long writeTimeout = -1;
067
068 public WriteTimeoutFilter(Transport next) {
069 super(next);
070 }
071
072 @Override
073 public void oneway(Object command) throws IOException {
074 try {
075 registerWrite(this);
076 super.oneway(command);
077 } catch (IOException x) {
078 throw x;
079 } finally {
080 deRegisterWrite(this,false,null);
081 }
082 }
083
084 public long getWriteTimeout() {
085 return writeTimeout;
086 }
087
088 public void setWriteTimeout(long writeTimeout) {
089 this.writeTimeout = writeTimeout;
090 }
091
092 public static long getSleep() {
093 return sleep;
094 }
095
096 public static void setSleep(long sleep) {
097 WriteTimeoutFilter.sleep = sleep;
098 }
099
100
101 protected TimeStampStream getWriter() {
102 return next.narrow(TimeStampStream.class);
103 }
104
105 protected Socket getSocket() {
106 return next.narrow(Socket.class);
107 }
108
109 protected static void registerWrite(WriteTimeoutFilter filter) {
110 writers.add(filter);
111 }
112
113 protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
114 boolean result = writers.remove(filter);
115 if (result) {
116 if (fail) {
117 String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
118 LOG.warn(message);
119 Socket sock = filter.getSocket();
120 if (sock==null) {
121 LOG.error("Destination socket is null, unable to close socket.("+message+")");
122 } else {
123 try {
124 sock.close();
125 }catch (IOException ignore) {
126 }
127 }
128 }
129 }
130 return result;
131 }
132
133 @Override
134 public void start() throws Exception {
135 super.start();
136 }
137
138 @Override
139 public void stop() throws Exception {
140 super.stop();
141 }
142
143 protected static class TimeoutThread extends Thread {
144 static AtomicInteger instance = new AtomicInteger(0);
145 boolean run = true;
146 public TimeoutThread() {
147 setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
148 setDaemon(true);
149 setPriority(Thread.MIN_PRIORITY);
150 start();
151 }
152
153
154 public void run() {
155 while (run) {
156 boolean error = false;
157 try {
158 if (!interrupted()) {
159 Iterator<WriteTimeoutFilter> filters = writers.iterator();
160 while (run && filters.hasNext()) {
161 WriteTimeoutFilter filter = filters.next();
162 if (filter.getWriteTimeout()<=0) continue; //no timeout set
163 long writeStart = filter.getWriter().getWriteTimestamp();
164 long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
165 if (delta>filter.getWriteTimeout()) {
166 WriteTimeoutFilter.deRegisterWrite(filter, true,null);
167 }//if timeout
168 }//while
169 }//if interrupted
170 try {
171 Thread.sleep(getSleep());
172 error = false;
173 } catch (InterruptedException x) {
174 //do nothing
175 }
176 }catch (Throwable t) { //make sure this thread never dies
177 if (!error) { //use error flag to avoid filling up the logs
178 LOG.error("WriteTimeout thread unable validate existing sockets.",t);
179 error = true;
180 }
181 }
182 }
183 }
184 }
185
186 }