001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.HashMap;
023 import java.util.Iterator;
024 import java.util.Map;
025
026 import org.apache.activemq.broker.BrokerService;
027 import org.apache.activemq.broker.SslContext;
028 import org.apache.activemq.command.DiscoveryEvent;
029 import org.apache.activemq.transport.Transport;
030 import org.apache.activemq.transport.TransportDisposedIOException;
031 import org.apache.activemq.transport.TransportFactory;
032 import org.apache.activemq.transport.discovery.DiscoveryAgent;
033 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
034 import org.apache.activemq.transport.discovery.DiscoveryListener;
035 import org.apache.activemq.util.IntrospectionSupport;
036 import org.apache.activemq.util.ServiceStopper;
037 import org.apache.activemq.util.ServiceSupport;
038 import org.apache.activemq.util.URISupport;
039 import org.apache.activemq.util.URISupport.CompositeData;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 import javax.management.ObjectName;
044
045 /**
046 * A network connector which uses a discovery agent to detect the remote brokers
047 * available and setup a connection to each available remote broker
048 *
049 * @org.apache.xbean.XBean element="networkConnector"
050 *
051 */
052 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
053 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class);
054
055 private DiscoveryAgent discoveryAgent;
056
057 private Map<String, String> parameters;
058
059 public DiscoveryNetworkConnector() {
060 }
061
062 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException {
063 setUri(discoveryURI);
064 }
065
066 public void setUri(URI discoveryURI) throws IOException {
067 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
068 try {
069 parameters = URISupport.parseParameters(discoveryURI);
070 // allow discovery agent to grab it's parameters
071 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
072 } catch (URISyntaxException e) {
073 LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
074 }
075
076 }
077
078 public void onServiceAdd(DiscoveryEvent event) {
079 // Ignore events once we start stopping.
080 if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
081 return;
082 }
083 String url = event.getServiceName();
084 if (url != null) {
085 URI uri;
086 try {
087 uri = new URI(url);
088 } catch (URISyntaxException e) {
089 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
090 return;
091 }
092 // Should we try to connect to that URI?
093 if( bridges.containsKey(uri) ) {
094 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
095 return;
096 }
097 if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
098 LOG.debug("not connecting loopback: " + uri);
099 return;
100 }
101 URI connectUri = uri;
102 try {
103 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
104 } catch (URISyntaxException e) {
105 LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
106 }
107 LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
108
109 Transport remoteTransport;
110 Transport localTransport;
111 try {
112 // Allows the transport to access the broker's ssl configuration.
113 SslContext.setCurrentSslContext(getBrokerService().getSslContext());
114 try {
115 remoteTransport = TransportFactory.connect(connectUri);
116 } catch (Exception e) {
117 LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
118 LOG.debug("Connection failure exception: " + e, e);
119 return;
120 }
121 try {
122 localTransport = createLocalTransport();
123 } catch (Exception e) {
124 ServiceSupport.dispose(remoteTransport);
125 LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
126 LOG.debug("Connection failure exception: " + e, e);
127 return;
128 }
129 } finally {
130 SslContext.setCurrentSslContext(null);
131 }
132 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
133 try {
134 bridge.start();
135 bridges.put(uri, bridge);
136 } catch (TransportDisposedIOException e) {
137 LOG.warn("Network bridge between: " + localURI + " and: " + uri + " was correctly stopped before it was correctly started.");
138 } catch (Exception e) {
139 ServiceSupport.dispose(localTransport);
140 ServiceSupport.dispose(remoteTransport);
141 LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
142 LOG.debug("Start failure exception: " + e, e);
143 try {
144 discoveryAgent.serviceFailed(event);
145 } catch (IOException e1) {
146 LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
147 }
148 return;
149 }
150 }
151 }
152
153 public void onServiceRemove(DiscoveryEvent event) {
154 String url = event.getServiceName();
155 if (url != null) {
156 URI uri;
157 try {
158 uri = new URI(url);
159 } catch (URISyntaxException e) {
160 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
161 return;
162 }
163
164 NetworkBridge bridge = bridges.remove(uri);
165 if (bridge == null) {
166 return;
167 }
168
169 ServiceSupport.dispose(bridge);
170 }
171 }
172
173 public DiscoveryAgent getDiscoveryAgent() {
174 return discoveryAgent;
175 }
176
177 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
178 this.discoveryAgent = discoveryAgent;
179 if (discoveryAgent != null) {
180 this.discoveryAgent.setDiscoveryListener(this);
181 }
182 }
183
184 protected void handleStart() throws Exception {
185 if (discoveryAgent == null) {
186 throw new IllegalStateException("You must configure the 'discoveryAgent' property");
187 }
188 this.discoveryAgent.start();
189 super.handleStart();
190 }
191
192 protected void handleStop(ServiceStopper stopper) throws Exception {
193 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) {
194 NetworkBridge bridge = i.next();
195 try {
196 bridge.stop();
197 } catch (Exception e) {
198 stopper.onException(this, e);
199 }
200 }
201 try {
202 this.discoveryAgent.stop();
203 } catch (Exception e) {
204 stopper.onException(this, e);
205 }
206
207 super.handleStop(stopper);
208 }
209
210 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
211 class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
212
213 public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) {
214 super(brokerService, connectorName);
215 }
216
217 public void bridgeFailed() {
218 if (!serviceSupport.isStopped()) {
219 try {
220 discoveryAgent.serviceFailed(event);
221 } catch (IOException e) {
222 }
223 }
224
225 }
226 }
227 NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
228
229 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
230 result.setBrokerService(getBrokerService());
231 return configureBridge(result);
232 }
233
234 @Override
235 public String toString() {
236 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
237 }
238 }