001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.net.UnknownHostException;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import org.apache.activemq.openwire.OpenWireFormat;
027 import org.apache.activemq.transport.CommandJoiner;
028 import org.apache.activemq.transport.InactivityMonitor;
029 import org.apache.activemq.transport.Transport;
030 import org.apache.activemq.transport.TransportFactory;
031 import org.apache.activemq.transport.TransportLoggerFactory;
032 import org.apache.activemq.transport.TransportServer;
033 import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
034 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
035 import org.apache.activemq.transport.reliable.ReliableTransport;
036 import org.apache.activemq.transport.reliable.ReplayStrategy;
037 import org.apache.activemq.transport.reliable.Replayer;
038 import org.apache.activemq.transport.tcp.TcpTransportFactory;
039 import org.apache.activemq.util.IOExceptionSupport;
040 import org.apache.activemq.util.IntrospectionSupport;
041 import org.apache.activemq.util.URISupport;
042 import org.apache.activemq.wireformat.WireFormat;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 /**
047 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
048 *
049 */
050 public class UdpTransportFactory extends TransportFactory {
051
052 private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class);
053
054 public TransportServer doBind(final URI location) throws IOException {
055 try {
056 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
057 if (options.containsKey("port")) {
058 throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax");
059 }
060 WireFormat wf = createWireFormat(options);
061 int port = location.getPort();
062 OpenWireFormat openWireFormat = asOpenWireFormat(wf);
063 UdpTransport transport = (UdpTransport) createTransport(location, wf);
064
065 Transport configuredTransport = configure(transport, wf, options, true);
066 UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
067 return server;
068 } catch (URISyntaxException e) {
069 throw IOExceptionSupport.create(e);
070 } catch (Exception e) {
071 throw IOExceptionSupport.create(e);
072 }
073 }
074
075 public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
076 return configure(transport, format, options, false);
077 }
078
079 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
080 IntrospectionSupport.setProperties(transport, options);
081 final UdpTransport udpTransport = (UdpTransport)transport;
082
083 // deal with fragmentation
084 transport = new CommandJoiner(transport, asOpenWireFormat(format));
085
086 if (udpTransport.isTrace()) {
087 try {
088 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
089 } catch (Throwable e) {
090 log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e);
091 }
092 }
093
094 transport = new InactivityMonitor(transport, format);
095
096 if (format instanceof OpenWireFormat) {
097 transport = configureClientSideNegotiator(transport, format, udpTransport);
098 }
099
100 return transport;
101 }
102
103 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
104 OpenWireFormat wireFormat = asOpenWireFormat(wf);
105 return new UdpTransport(wireFormat, location);
106 }
107
108 /**
109 * Configures the transport
110 *
111 * @param acceptServer true if this transport is used purely as an 'accept'
112 * transport for new connections which work like TCP
113 * SocketServers where new connections spin up a new separate
114 * UDP transport
115 */
116 protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception {
117 IntrospectionSupport.setProperties(transport, options);
118 UdpTransport udpTransport = (UdpTransport)transport;
119
120 OpenWireFormat openWireFormat = asOpenWireFormat(format);
121
122 if (udpTransport.isTrace()) {
123 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
124 }
125
126 transport = new InactivityMonitor(transport, format);
127
128 if (!acceptServer && format instanceof OpenWireFormat) {
129 transport = configureClientSideNegotiator(transport, format, udpTransport);
130 }
131
132 // deal with fragmentation
133
134 if (acceptServer) {
135 // lets not support a buffer of messages to enable reliable
136 // messaging on the 'accept server' transport
137 udpTransport.setReplayEnabled(false);
138
139 // we don't want to do reliable checks on this transport as we
140 // delegate to one that does
141 transport = new CommandJoiner(transport, openWireFormat);
142 return transport;
143 } else {
144 ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport);
145 Replayer replayer = reliableTransport.getReplayer();
146 reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
147
148 // Joiner must be on outside as the inbound messages must be
149 // processed by the reliable transport first
150 return new CommandJoiner(reliableTransport, openWireFormat);
151 }
152 }
153
154 protected ReplayStrategy createReplayStrategy(Replayer replayer) {
155 if (replayer != null) {
156 return new DefaultReplayStrategy(5);
157 }
158 return new ExceptionIfDroppedReplayStrategy(1);
159 }
160
161 protected ReplayStrategy createReplayStrategy() {
162 return new DefaultReplayStrategy(5);
163 }
164
165 protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
166 return new ResponseRedirectInterceptor(transport, udpTransport);
167 }
168
169 protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
170 OpenWireFormat answer = (OpenWireFormat)wf;
171 return answer;
172 }
173 }