001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.InterruptedIOException;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.Socket;
026 import java.net.SocketException;
027 import java.net.SocketTimeoutException;
028 import java.net.URI;
029 import java.net.UnknownHostException;
030 import java.util.HashMap;
031 import java.util.Map;
032 import java.util.concurrent.CountDownLatch;
033 import java.util.concurrent.SynchronousQueue;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.ThreadPoolExecutor;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.atomic.AtomicReference;
038 import javax.net.SocketFactory;
039 import org.apache.activemq.Service;
040 import org.apache.activemq.thread.DefaultThreadPools;
041 import org.apache.activemq.transport.Transport;
042 import org.apache.activemq.transport.TransportLoggerFactory;
043 import org.apache.activemq.transport.TransportThreadSupport;
044 import org.apache.activemq.util.InetAddressUtil;
045 import org.apache.activemq.util.IntrospectionSupport;
046 import org.apache.activemq.util.ServiceStopper;
047 import org.apache.activemq.wireformat.WireFormat;
048 import org.slf4j.Logger;
049 import org.slf4j.LoggerFactory;
050
051 /**
052 * An implementation of the {@link Transport} interface using raw tcp/ip
053 *
054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055 *
056 */
057 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058 private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059 protected final URI remoteLocation;
060 protected final URI localLocation;
061 protected final WireFormat wireFormat;
062
063 protected int connectionTimeout = 30000;
064 protected int soTimeout;
065 protected int socketBufferSize = 64 * 1024;
066 protected int ioBufferSize = 8 * 1024;
067 protected boolean closeAsync=true;
068 protected Socket socket;
069 protected DataOutputStream dataOut;
070 protected DataInputStream dataIn;
071 protected TimeStampStream buffOut = null;
072 /**
073 * The Traffic Class to be set on the socket.
074 */
075 protected int trafficClass = 0;
076 /**
077 * Keeps track of attempts to set the Traffic Class on the socket.
078 */
079 private boolean trafficClassSet = false;
080 /**
081 * Prevents setting both the Differentiated Services and Type of Service
082 * transport options at the same time, since they share the same spot in
083 * the TCP/IP packet headers.
084 */
085 protected boolean diffServChosen = false;
086 protected boolean typeOfServiceChosen = false;
087 /**
088 * trace=true -> the Transport stack where this TcpTransport
089 * object will be, will have a TransportLogger layer
090 * trace=false -> the Transport stack where this TcpTransport
091 * object will be, will NOT have a TransportLogger layer, and therefore
092 * will never be able to print logging messages.
093 * This parameter is most probably set in Connection or TransportConnector URIs.
094 */
095 protected boolean trace = false;
096 /**
097 * Name of the LogWriter implementation to use.
098 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
099 * This parameter is most probably set in Connection or TransportConnector URIs.
100 */
101 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
102 /**
103 * Specifies if the TransportLogger will be manageable by JMX or not.
104 * Also, as long as there is at least 1 TransportLogger which is manageable,
105 * a TransportLoggerControl MBean will me created.
106 */
107 protected boolean dynamicManagement = false;
108 /**
109 * startLogging=true -> the TransportLogger object of the Transport stack
110 * will initially write messages to the log.
111 * startLogging=false -> the TransportLogger object of the Transport stack
112 * will initially NOT write messages to the log.
113 * This parameter only has an effect if trace == true.
114 * This parameter is most probably set in Connection or TransportConnector URIs.
115 */
116 protected boolean startLogging = true;
117 /**
118 * Specifies the port that will be used by the JMX server to manage
119 * the TransportLoggers.
120 * This should only be set in an URI by a client (producer or consumer) since
121 * a broker will already create a JMX server.
122 * It is useful for people who test a broker and clients in the same machine
123 * and want to control both via JMX; a different port will be needed.
124 */
125 protected int jmxPort = 1099;
126 protected boolean useLocalHost = false;
127 protected int minmumWireFormatVersion;
128 protected SocketFactory socketFactory;
129 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
130
131 private Map<String, Object> socketOptions;
132 private Boolean keepAlive;
133 private Boolean tcpNoDelay;
134 private Thread runnerThread;
135 private volatile int receiveCounter;
136
137 /**
138 * Connect to a remote Node - e.g. a Broker
139 *
140 * @param wireFormat
141 * @param socketFactory
142 * @param remoteLocation
143 * @param localLocation - e.g. local InetAddress and local port
144 * @throws IOException
145 * @throws UnknownHostException
146 */
147 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
148 URI localLocation) throws UnknownHostException, IOException {
149 this.wireFormat = wireFormat;
150 this.socketFactory = socketFactory;
151 try {
152 this.socket = socketFactory.createSocket();
153 } catch (SocketException e) {
154 this.socket = null;
155 }
156 this.remoteLocation = remoteLocation;
157 this.localLocation = localLocation;
158 setDaemon(false);
159 }
160
161 /**
162 * Initialize from a server Socket
163 *
164 * @param wireFormat
165 * @param socket
166 * @throws IOException
167 */
168 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
169 this.wireFormat = wireFormat;
170 this.socket = socket;
171 this.remoteLocation = null;
172 this.localLocation = null;
173 setDaemon(true);
174 }
175
176 /**
177 * A one way asynchronous send
178 */
179 public void oneway(Object command) throws IOException {
180 checkStarted();
181 wireFormat.marshal(command, dataOut);
182 dataOut.flush();
183 }
184
185 /**
186 * @return pretty print of 'this'
187 */
188 @Override
189 public String toString() {
190 return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
191 : (localLocation != null ? localLocation : remoteLocation)) ;
192 }
193
194 /**
195 * reads packets from a Socket
196 */
197 public void run() {
198 LOG.trace("TCP consumer thread for " + this + " starting");
199 this.runnerThread=Thread.currentThread();
200 try {
201 while (!isStopped()) {
202 doRun();
203 }
204 } catch (IOException e) {
205 stoppedLatch.get().countDown();
206 onException(e);
207 } catch (Throwable e){
208 stoppedLatch.get().countDown();
209 IOException ioe=new IOException("Unexpected error occured");
210 ioe.initCause(e);
211 onException(ioe);
212 }finally {
213 stoppedLatch.get().countDown();
214 }
215 }
216
217 protected void doRun() throws IOException {
218 try {
219 Object command = readCommand();
220 doConsume(command);
221 } catch (SocketTimeoutException e) {
222 } catch (InterruptedIOException e) {
223 }
224 }
225
226 protected Object readCommand() throws IOException {
227 return wireFormat.unmarshal(dataIn);
228 }
229
230 // Properties
231 // -------------------------------------------------------------------------
232 public String getDiffServ() {
233 // This is the value requested by the user by setting the Tcp Transport
234 // options. If the socket hasn't been created, then this value may not
235 // reflect the value returned by Socket.getTrafficClass().
236 return Integer.toString(this.trafficClass);
237 }
238
239 public void setDiffServ(String diffServ) throws IllegalArgumentException {
240 this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
241 this.diffServChosen = true;
242 }
243
244 public int getTypeOfService() {
245 // This is the value requested by the user by setting the Tcp Transport
246 // options. If the socket hasn't been created, then this value may not
247 // reflect the value returned by Socket.getTrafficClass().
248 return this.trafficClass;
249 }
250
251 public void setTypeOfService(int typeOfService) {
252 this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
253 this.typeOfServiceChosen = true;
254 }
255
256 public boolean isTrace() {
257 return trace;
258 }
259
260 public void setTrace(boolean trace) {
261 this.trace = trace;
262 }
263
264 public String getLogWriterName() {
265 return logWriterName;
266 }
267
268 public void setLogWriterName(String logFormat) {
269 this.logWriterName = logFormat;
270 }
271
272 public boolean isDynamicManagement() {
273 return dynamicManagement;
274 }
275
276 public void setDynamicManagement(boolean useJmx) {
277 this.dynamicManagement = useJmx;
278 }
279
280 public boolean isStartLogging() {
281 return startLogging;
282 }
283
284 public void setStartLogging(boolean startLogging) {
285 this.startLogging = startLogging;
286 }
287
288 public int getJmxPort() {
289 return jmxPort;
290 }
291
292 public void setJmxPort(int jmxPort) {
293 this.jmxPort = jmxPort;
294 }
295
296 public int getMinmumWireFormatVersion() {
297 return minmumWireFormatVersion;
298 }
299
300 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
301 this.minmumWireFormatVersion = minmumWireFormatVersion;
302 }
303
304 public boolean isUseLocalHost() {
305 return useLocalHost;
306 }
307
308 /**
309 * Sets whether 'localhost' or the actual local host name should be used to
310 * make local connections. On some operating systems such as Macs its not
311 * possible to connect as the local host name so localhost is better.
312 */
313 public void setUseLocalHost(boolean useLocalHost) {
314 this.useLocalHost = useLocalHost;
315 }
316
317 public int getSocketBufferSize() {
318 return socketBufferSize;
319 }
320
321 /**
322 * Sets the buffer size to use on the socket
323 */
324 public void setSocketBufferSize(int socketBufferSize) {
325 this.socketBufferSize = socketBufferSize;
326 }
327
328 public int getSoTimeout() {
329 return soTimeout;
330 }
331
332 /**
333 * Sets the socket timeout
334 */
335 public void setSoTimeout(int soTimeout) {
336 this.soTimeout = soTimeout;
337 }
338
339 public int getConnectionTimeout() {
340 return connectionTimeout;
341 }
342
343 /**
344 * Sets the timeout used to connect to the socket
345 */
346 public void setConnectionTimeout(int connectionTimeout) {
347 this.connectionTimeout = connectionTimeout;
348 }
349
350 public Boolean getKeepAlive() {
351 return keepAlive;
352 }
353
354 /**
355 * Enable/disable TCP KEEP_ALIVE mode
356 */
357 public void setKeepAlive(Boolean keepAlive) {
358 this.keepAlive = keepAlive;
359 }
360
361 public Boolean getTcpNoDelay() {
362 return tcpNoDelay;
363 }
364
365 /**
366 * Enable/disable the TCP_NODELAY option on the socket
367 */
368 public void setTcpNoDelay(Boolean tcpNoDelay) {
369 this.tcpNoDelay = tcpNoDelay;
370 }
371
372 /**
373 * @return the ioBufferSize
374 */
375 public int getIoBufferSize() {
376 return this.ioBufferSize;
377 }
378
379 /**
380 * @param ioBufferSize the ioBufferSize to set
381 */
382 public void setIoBufferSize(int ioBufferSize) {
383 this.ioBufferSize = ioBufferSize;
384 }
385
386 /**
387 * @return the closeAsync
388 */
389 public boolean isCloseAsync() {
390 return closeAsync;
391 }
392
393 /**
394 * @param closeAsync the closeAsync to set
395 */
396 public void setCloseAsync(boolean closeAsync) {
397 this.closeAsync = closeAsync;
398 }
399
400 // Implementation methods
401 // -------------------------------------------------------------------------
402 protected String resolveHostName(String host) throws UnknownHostException {
403 if (isUseLocalHost()) {
404 String localName = InetAddressUtil.getLocalHostName();
405 if (localName != null && localName.equals(host)) {
406 return "localhost";
407 }
408 }
409 return host;
410 }
411
412 /**
413 * Configures the socket for use
414 *
415 * @param sock
416 * @throws SocketException, IllegalArgumentException if setting the options
417 * on the socket failed.
418 */
419 protected void initialiseSocket(Socket sock) throws SocketException,
420 IllegalArgumentException {
421 if (socketOptions != null) {
422 IntrospectionSupport.setProperties(socket, socketOptions);
423 }
424
425 try {
426 sock.setReceiveBufferSize(socketBufferSize);
427 sock.setSendBufferSize(socketBufferSize);
428 } catch (SocketException se) {
429 LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
430 LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
431 }
432 sock.setSoTimeout(soTimeout);
433
434 if (keepAlive != null) {
435 sock.setKeepAlive(keepAlive.booleanValue());
436 }
437 if (tcpNoDelay != null) {
438 sock.setTcpNoDelay(tcpNoDelay.booleanValue());
439 }
440 if (!this.trafficClassSet) {
441 this.trafficClassSet = setTrafficClass(sock);
442 }
443 }
444
445 @Override
446 protected void doStart() throws Exception {
447 connect();
448 stoppedLatch.set(new CountDownLatch(1));
449 super.doStart();
450 }
451
452 protected void connect() throws Exception {
453
454 if (socket == null && socketFactory == null) {
455 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
456 }
457
458 InetSocketAddress localAddress = null;
459 InetSocketAddress remoteAddress = null;
460
461 if (localLocation != null) {
462 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
463 localLocation.getPort());
464 }
465
466 if (remoteLocation != null) {
467 String host = resolveHostName(remoteLocation.getHost());
468 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
469 }
470 // Set the traffic class before the socket is connected when possible so
471 // that the connection packets are given the correct traffic class.
472 this.trafficClassSet = setTrafficClass(socket);
473
474 if (socket != null) {
475
476 if (localAddress != null) {
477 socket.bind(localAddress);
478 }
479
480 // If it's a server accepted socket.. we don't need to connect it
481 // to a remote address.
482 if (remoteAddress != null) {
483 if (connectionTimeout >= 0) {
484 socket.connect(remoteAddress, connectionTimeout);
485 } else {
486 socket.connect(remoteAddress);
487 }
488 }
489
490 } else {
491 // For SSL sockets.. you can't create an unconnected socket :(
492 // This means the timout option are not supported either.
493 if (localAddress != null) {
494 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
495 localAddress.getAddress(), localAddress.getPort());
496 } else {
497 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
498 }
499 }
500
501 initialiseSocket(socket);
502 initializeStreams();
503 }
504
505 @Override
506 protected void doStop(ServiceStopper stopper) throws Exception {
507 if (LOG.isDebugEnabled()) {
508 LOG.debug("Stopping transport " + this);
509 }
510
511 // Closing the streams flush the sockets before closing.. if the socket
512 // is hung.. then this hangs the close.
513 // closeStreams();
514 if (socket != null) {
515 if (closeAsync) {
516 //closing the socket can hang also
517 final CountDownLatch latch = new CountDownLatch(1);
518
519 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
520
521 public void run() {
522 try {
523 socket.close();
524 } catch (IOException e) {
525 LOG.debug("Caught exception closing socket",e);
526 }finally {
527 latch.countDown();
528 }
529 }
530
531 });
532 latch.await(1,TimeUnit.SECONDS);
533 }else {
534 try {
535 socket.close();
536 } catch (IOException e) {
537 LOG.debug("Caught exception closing socket",e);
538 }
539 }
540
541 }
542 }
543
544 /**
545 * Override so that stop() blocks until the run thread is no longer running.
546 */
547 @Override
548 public void stop() throws Exception {
549 super.stop();
550 CountDownLatch countDownLatch = stoppedLatch.get();
551 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
552 countDownLatch.await(1,TimeUnit.SECONDS);
553 }
554 }
555
556 protected void initializeStreams() throws Exception {
557 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
558 @Override
559 public int read() throws IOException {
560 receiveCounter++;
561 return super.read();
562 }
563 @Override
564 public int read(byte[] b, int off, int len) throws IOException {
565 receiveCounter++;
566 return super.read(b, off, len);
567 }
568 @Override
569 public long skip(long n) throws IOException {
570 receiveCounter++;
571 return super.skip(n);
572 }
573 @Override
574 protected void fill() throws IOException {
575 receiveCounter++;
576 super.fill();
577 }
578 };
579 this.dataIn = new DataInputStream(buffIn);
580 TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
581 this.dataOut = new DataOutputStream(outputStream);
582 this.buffOut = outputStream;
583 }
584
585 protected void closeStreams() throws IOException {
586 if (dataOut != null) {
587 dataOut.close();
588 }
589 if (dataIn != null) {
590 dataIn.close();
591 }
592 }
593
594 public void setSocketOptions(Map<String, Object> socketOptions) {
595 this.socketOptions = new HashMap<String, Object>(socketOptions);
596 }
597
598 public String getRemoteAddress() {
599 if (socket != null) {
600 return "" + socket.getRemoteSocketAddress();
601 }
602 return null;
603 }
604
605 @Override
606 public <T> T narrow(Class<T> target) {
607 if (target == Socket.class) {
608 return target.cast(socket);
609 } else if ( target == TimeStampStream.class) {
610 return target.cast(buffOut);
611 }
612 return super.narrow(target);
613 }
614
615 public int getReceiveCounter() {
616 return receiveCounter;
617 }
618
619
620 /**
621 * @param sock The socket on which to set the Traffic Class.
622 * @return Whether or not the Traffic Class was set on the given socket.
623 * @throws SocketException if the system does not support setting the
624 * Traffic Class.
625 * @throws IllegalArgumentException if both the Differentiated Services and
626 * Type of Services transport options have been set on the same
627 * connection.
628 */
629 private boolean setTrafficClass(Socket sock) throws SocketException,
630 IllegalArgumentException {
631 if (sock == null
632 || (!this.diffServChosen && !this.typeOfServiceChosen)) {
633 return false;
634 }
635 if (this.diffServChosen && this.typeOfServiceChosen) {
636 throw new IllegalArgumentException("Cannot set both the "
637 + " Differentiated Services and Type of Services transport "
638 + " options on the same connection.");
639 }
640
641 sock.setTrafficClass(this.trafficClass);
642
643 int resultTrafficClass = sock.getTrafficClass();
644 if (this.trafficClass != resultTrafficClass) {
645 // In the case where the user has specified the ECN bits (e.g. in
646 // Type of Service) but the system won't allow the ECN bits to be
647 // set or in the case where setting the traffic class failed for
648 // other reasons, emit a warning.
649 if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
650 && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
651 LOG.warn("Attempted to set the Traffic Class to "
652 + this.trafficClass + " but the result Traffic Class was "
653 + resultTrafficClass + ". Please check that your system "
654 + "allows you to set the ECN bits (the first two bits).");
655 } else {
656 LOG.warn("Attempted to set the Traffic Class to "
657 + this.trafficClass + " but the result Traffic Class was "
658 + resultTrafficClass + ". Please check that your system "
659 + "supports java.net.setTrafficClass.");
660 }
661 return false;
662 }
663 // Reset the guards that prevent both the Differentiated Services
664 // option and the Type of Service option from being set on the same
665 // connection.
666 this.diffServChosen = false;
667 this.typeOfServiceChosen = false;
668 return true;
669 }
670 }