001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.IOException;
020 import java.net.InetAddress;
021 import java.net.InetSocketAddress;
022 import java.net.ServerSocket;
023 import java.net.Socket;
024 import java.net.SocketException;
025 import java.net.SocketTimeoutException;
026 import java.net.URI;
027 import java.net.URISyntaxException;
028 import java.net.UnknownHostException;
029 import java.util.HashMap;
030 import java.util.Map;
031 import java.util.concurrent.BlockingQueue;
032 import java.util.concurrent.LinkedBlockingQueue;
033 import java.util.concurrent.TimeUnit;
034
035 import javax.net.ServerSocketFactory;
036
037 import org.apache.activemq.Service;
038 import org.apache.activemq.ThreadPriorities;
039 import org.apache.activemq.command.BrokerInfo;
040 import org.apache.activemq.openwire.OpenWireFormatFactory;
041 import org.apache.activemq.transport.Transport;
042 import org.apache.activemq.transport.TransportLoggerFactory;
043 import org.apache.activemq.transport.TransportServer;
044 import org.apache.activemq.transport.TransportServerThreadSupport;
045 import org.apache.activemq.util.IOExceptionSupport;
046 import org.apache.activemq.util.InetAddressUtil;
047 import org.apache.activemq.util.IntrospectionSupport;
048 import org.apache.activemq.util.ServiceListener;
049 import org.apache.activemq.util.ServiceStopper;
050 import org.apache.activemq.util.ServiceSupport;
051 import org.apache.activemq.wireformat.WireFormat;
052 import org.apache.activemq.wireformat.WireFormatFactory;
053 import org.slf4j.Logger;
054 import org.slf4j.LoggerFactory;
055
056 /**
057 * A TCP based implementation of {@link TransportServer}
058 *
059 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
060 *
061 */
062
063 public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
064
065 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
066 protected ServerSocket serverSocket;
067 protected int backlog = 5000;
068 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
069 protected final TcpTransportFactory transportFactory;
070 protected long maxInactivityDuration = 30000;
071 protected long maxInactivityDurationInitalDelay = 10000;
072 protected int minmumWireFormatVersion;
073 protected boolean useQueueForAccept=true;
074
075 /**
076 * trace=true -> the Transport stack where this TcpTransport
077 * object will be, will have a TransportLogger layer
078 * trace=false -> the Transport stack where this TcpTransport
079 * object will be, will NOT have a TransportLogger layer, and therefore
080 * will never be able to print logging messages.
081 * This parameter is most probably set in Connection or TransportConnector URIs.
082 */
083 protected boolean trace = false;
084
085 protected int soTimeout = 0;
086 protected int socketBufferSize = 64 * 1024;
087 protected int connectionTimeout = 30000;
088
089 /**
090 * Name of the LogWriter implementation to use.
091 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
092 * This parameter is most probably set in Connection or TransportConnector URIs.
093 */
094 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
095 /**
096 * Specifies if the TransportLogger will be manageable by JMX or not.
097 * Also, as long as there is at least 1 TransportLogger which is manageable,
098 * a TransportLoggerControl MBean will me created.
099 */
100 protected boolean dynamicManagement = false;
101 /**
102 * startLogging=true -> the TransportLogger object of the Transport stack
103 * will initially write messages to the log.
104 * startLogging=false -> the TransportLogger object of the Transport stack
105 * will initially NOT write messages to the log.
106 * This parameter only has an effect if trace == true.
107 * This parameter is most probably set in Connection or TransportConnector URIs.
108 */
109 protected boolean startLogging = true;
110 protected final ServerSocketFactory serverSocketFactory;
111 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
112 protected Thread socketHandlerThread;
113 /**
114 * The maximum number of sockets allowed for this server
115 */
116 protected int maximumConnections = Integer.MAX_VALUE;
117 protected int currentTransportCount=0;
118
119 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
120 super(location);
121 this.transportFactory = transportFactory;
122 this.serverSocketFactory = serverSocketFactory;
123
124 }
125
126 public void bind() throws IOException {
127 URI bind = getBindLocation();
128
129 String host = bind.getHost();
130 host = (host == null || host.length() == 0) ? "localhost" : host;
131 InetAddress addr = InetAddress.getByName(host);
132
133 try {
134
135 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
136 configureServerSocket(this.serverSocket);
137
138 } catch (IOException e) {
139 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
140 }
141 try {
142 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
143 .getFragment()));
144 } catch (URISyntaxException e) {
145
146 // it could be that the host name contains invalid characters such
147 // as _ on unix platforms
148 // so lets try use the IP address instead
149 try {
150 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
151 } catch (URISyntaxException e2) {
152 throw IOExceptionSupport.create(e2);
153 }
154 }
155 }
156
157 private void configureServerSocket(ServerSocket socket) throws SocketException {
158 socket.setSoTimeout(2000);
159 if (transportOptions != null) {
160 IntrospectionSupport.setProperties(socket, transportOptions);
161 }
162 }
163
164 /**
165 * @return Returns the wireFormatFactory.
166 */
167 public WireFormatFactory getWireFormatFactory() {
168 return wireFormatFactory;
169 }
170
171 /**
172 * @param wireFormatFactory The wireFormatFactory to set.
173 */
174 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
175 this.wireFormatFactory = wireFormatFactory;
176 }
177
178 /**
179 * Associates a broker info with the transport server so that the transport
180 * can do discovery advertisements of the broker.
181 *
182 * @param brokerInfo
183 */
184 public void setBrokerInfo(BrokerInfo brokerInfo) {
185 }
186
187 public long getMaxInactivityDuration() {
188 return maxInactivityDuration;
189 }
190
191 public void setMaxInactivityDuration(long maxInactivityDuration) {
192 this.maxInactivityDuration = maxInactivityDuration;
193 }
194
195 public long getMaxInactivityDurationInitalDelay() {
196 return this.maxInactivityDurationInitalDelay;
197 }
198
199 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
200 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
201 }
202
203 public int getMinmumWireFormatVersion() {
204 return minmumWireFormatVersion;
205 }
206
207 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
208 this.minmumWireFormatVersion = minmumWireFormatVersion;
209 }
210
211 public boolean isTrace() {
212 return trace;
213 }
214
215 public void setTrace(boolean trace) {
216 this.trace = trace;
217 }
218
219 public String getLogWriterName() {
220 return logWriterName;
221 }
222
223 public void setLogWriterName(String logFormat) {
224 this.logWriterName = logFormat;
225 }
226
227 public boolean isDynamicManagement() {
228 return dynamicManagement;
229 }
230
231 public void setDynamicManagement(boolean useJmx) {
232 this.dynamicManagement = useJmx;
233 }
234
235 public boolean isStartLogging() {
236 return startLogging;
237 }
238
239
240 public void setStartLogging(boolean startLogging) {
241 this.startLogging = startLogging;
242 }
243
244 /**
245 * @return the backlog
246 */
247 public int getBacklog() {
248 return backlog;
249 }
250
251 /**
252 * @param backlog the backlog to set
253 */
254 public void setBacklog(int backlog) {
255 this.backlog = backlog;
256 }
257
258 /**
259 * @return the useQueueForAccept
260 */
261 public boolean isUseQueueForAccept() {
262 return useQueueForAccept;
263 }
264
265 /**
266 * @param useQueueForAccept the useQueueForAccept to set
267 */
268 public void setUseQueueForAccept(boolean useQueueForAccept) {
269 this.useQueueForAccept = useQueueForAccept;
270 }
271
272
273 /**
274 * pull Sockets from the ServerSocket
275 */
276 public void run() {
277 while (!isStopped()) {
278 Socket socket = null;
279 try {
280 socket = serverSocket.accept();
281 if (socket != null) {
282 if (isStopped() || getAcceptListener() == null) {
283 socket.close();
284 } else {
285 if (useQueueForAccept) {
286 socketQueue.put(socket);
287 }else {
288 handleSocket(socket);
289 }
290 }
291 }
292 } catch (SocketTimeoutException ste) {
293 // expect this to happen
294 } catch (Exception e) {
295 if (!isStopping()) {
296 onAcceptError(e);
297 } else if (!isStopped()) {
298 LOG.warn("run()", e);
299 onAcceptError(e);
300 }
301 }
302 }
303 }
304
305 /**
306 * Allow derived classes to override the Transport implementation that this
307 * transport server creates.
308 *
309 * @param socket
310 * @param format
311 * @return
312 * @throws IOException
313 */
314 protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
315 return new TcpTransport(format, socket);
316 }
317
318 /**
319 * @return pretty print of this
320 */
321 public String toString() {
322 return "" + getBindLocation();
323 }
324
325 /**
326 * @param socket
327 * @param inetAddress
328 * @return real hostName
329 * @throws UnknownHostException
330 */
331 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
332 String result = null;
333 if (socket.isBound()) {
334 if (socket.getInetAddress().isAnyLocalAddress()) {
335 // make it more human readable and useful, an alternative to 0.0.0.0
336 result = InetAddressUtil.getLocalHostName();
337 } else {
338 result = socket.getInetAddress().getCanonicalHostName();
339 }
340 } else {
341 result = bindAddress.getCanonicalHostName();
342 }
343 return result;
344 }
345
346 protected void doStart() throws Exception {
347 if(useQueueForAccept) {
348 Runnable run = new Runnable() {
349 public void run() {
350 try {
351 while (!isStopped() && !isStopping()) {
352 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
353 if (sock != null) {
354 handleSocket(sock);
355 }
356 }
357
358 } catch (InterruptedException e) {
359 LOG.info("socketQueue interuppted - stopping");
360 if (!isStopping()) {
361 onAcceptError(e);
362 }
363 }
364
365 }
366
367 };
368 socketHandlerThread = new Thread(null, run,
369 "ActiveMQ Transport Server Thread Handler: " + toString(),
370 getStackSize());
371 socketHandlerThread.setDaemon(true);
372 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
373 socketHandlerThread.start();
374 }
375 super.doStart();
376
377 }
378
379 protected void doStop(ServiceStopper stopper) throws Exception {
380 super.doStop(stopper);
381 if (serverSocket != null) {
382 serverSocket.close();
383 }
384 }
385
386 public InetSocketAddress getSocketAddress() {
387 return (InetSocketAddress)serverSocket.getLocalSocketAddress();
388 }
389
390 protected final void handleSocket(Socket socket) {
391 try {
392 if (this.currentTransportCount >= this.maximumConnections) {
393 throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
394 "number of allowed client connections. See the 'maximumConnections' " +
395 "property on the TCP transport configuration URI in the ActiveMQ " +
396 "configuration file (e.g., activemq.xml)");
397
398 } else {
399 HashMap<String, Object> options = new HashMap<String, Object>();
400 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
401 options.put("maxInactivityDurationInitalDelay",
402 Long.valueOf(maxInactivityDurationInitalDelay));
403 options.put("minmumWireFormatVersion",
404 Integer.valueOf(minmumWireFormatVersion));
405 options.put("trace", Boolean.valueOf(trace));
406 options.put("soTimeout", Integer.valueOf(soTimeout));
407 options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
408 options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
409 options.put("logWriterName", logWriterName);
410 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
411 options.put("startLogging", Boolean.valueOf(startLogging));
412 options.putAll(transportOptions);
413
414 WireFormat format = wireFormatFactory.createWireFormat();
415 Transport transport = createTransport(socket, format);
416
417 if (transport instanceof ServiceSupport) {
418 ((ServiceSupport) transport).addServiceListener(this);
419 }
420
421 Transport configuredTransport =
422 transportFactory.serverConfigure( transport, format, options);
423
424 getAcceptListener().onAccept(configuredTransport);
425 }
426 } catch (SocketTimeoutException ste) {
427 // expect this to happen
428 } catch (Exception e) {
429 if (!isStopping()) {
430 onAcceptError(e);
431 } else if (!isStopped()) {
432 LOG.warn("run()", e);
433 onAcceptError(e);
434 }
435 }
436
437 }
438
439 public int getSoTimeout() {
440 return soTimeout;
441 }
442
443 public void setSoTimeout(int soTimeout) {
444 this.soTimeout = soTimeout;
445 }
446
447 public int getSocketBufferSize() {
448 return socketBufferSize;
449 }
450
451 public void setSocketBufferSize(int socketBufferSize) {
452 this.socketBufferSize = socketBufferSize;
453 }
454
455 public int getConnectionTimeout() {
456 return connectionTimeout;
457 }
458
459 public void setConnectionTimeout(int connectionTimeout) {
460 this.connectionTimeout = connectionTimeout;
461 }
462
463 /**
464 * @return the maximumConnections
465 */
466 public int getMaximumConnections() {
467 return maximumConnections;
468 }
469
470 /**
471 * @param maximumConnections the maximumConnections to set
472 */
473 public void setMaximumConnections(int maximumConnections) {
474 this.maximumConnections = maximumConnections;
475 }
476
477
478 public void started(Service service) {
479 this.currentTransportCount++;
480 }
481
482 public void stopped(Service service) {
483 this.currentTransportCount--;
484 }
485 }