001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.StringTokenizer;
025 import java.util.concurrent.CopyOnWriteArrayList;
026 import java.util.regex.Pattern;
027 import javax.management.ObjectName;
028 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
029 import org.apache.activemq.broker.jmx.ManagementContext;
030 import org.apache.activemq.broker.region.ConnectorStatistics;
031 import org.apache.activemq.command.BrokerInfo;
032 import org.apache.activemq.command.ConnectionControl;
033 import org.apache.activemq.security.MessageAuthorizationPolicy;
034 import org.apache.activemq.thread.DefaultThreadPools;
035 import org.apache.activemq.thread.TaskRunnerFactory;
036 import org.apache.activemq.transport.Transport;
037 import org.apache.activemq.transport.TransportAcceptListener;
038 import org.apache.activemq.transport.TransportFactory;
039 import org.apache.activemq.transport.TransportServer;
040 import org.apache.activemq.transport.discovery.DiscoveryAgent;
041 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042 import org.apache.activemq.util.MDCHelper;
043 import org.apache.activemq.util.ServiceStopper;
044 import org.apache.activemq.util.ServiceSupport;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * @org.apache.xbean.XBean
050 *
051 */
052 public class TransportConnector implements Connector, BrokerServiceAware {
053
054 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
055
056 protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
057 protected TransportStatusDetector statusDector;
058 private BrokerService brokerService;
059 private TransportServer server;
060 private URI uri;
061 private BrokerInfo brokerInfo = new BrokerInfo();
062 private TaskRunnerFactory taskRunnerFactory;
063 private MessageAuthorizationPolicy messageAuthorizationPolicy;
064 private DiscoveryAgent discoveryAgent;
065 private final ConnectorStatistics statistics = new ConnectorStatistics();
066 private URI discoveryUri;
067 private URI connectUri;
068 private String name;
069 private boolean disableAsyncDispatch;
070 private boolean enableStatusMonitor = false;
071 private Broker broker;
072 private boolean updateClusterClients = false;
073 private boolean rebalanceClusterClients;
074 private boolean updateClusterClientsOnRemove = false;
075 private String updateClusterFilter;
076
077 public TransportConnector() {
078 }
079
080 public TransportConnector(TransportServer server) {
081 this();
082 setServer(server);
083 if (server != null && server.getConnectURI() != null) {
084 URI uri = server.getConnectURI();
085 if (uri != null && uri.getScheme().equals("vm")) {
086 setEnableStatusMonitor(false);
087 }
088 }
089
090 }
091
092 /**
093 * @return Returns the connections.
094 */
095 public CopyOnWriteArrayList<TransportConnection> getConnections() {
096 return connections;
097 }
098
099 /**
100 * Factory method to create a JMX managed version of this transport
101 * connector
102 */
103 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
104 throws IOException, URISyntaxException {
105 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
106 rc.setBrokerInfo(getBrokerInfo());
107 rc.setConnectUri(getConnectUri());
108 rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
109 rc.setDiscoveryAgent(getDiscoveryAgent());
110 rc.setDiscoveryUri(getDiscoveryUri());
111 rc.setEnableStatusMonitor(isEnableStatusMonitor());
112 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
113 rc.setName(getName());
114 rc.setTaskRunnerFactory(getTaskRunnerFactory());
115 rc.setUri(getUri());
116 rc.setBrokerService(brokerService);
117 rc.setUpdateClusterClients(isUpdateClusterClients());
118 rc.setRebalanceClusterClients(isRebalanceClusterClients());
119 rc.setUpdateClusterFilter(getUpdateClusterFilter());
120 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
121 return rc;
122 }
123
124 public BrokerInfo getBrokerInfo() {
125 return brokerInfo;
126 }
127
128 public void setBrokerInfo(BrokerInfo brokerInfo) {
129 this.brokerInfo = brokerInfo;
130 }
131
132 /**
133 *
134 * @deprecated use the {@link #setBrokerService(BrokerService)} method
135 * instead.
136 */
137 @Deprecated
138 public void setBrokerName(String name) {
139 if (this.brokerInfo == null) {
140 this.brokerInfo = new BrokerInfo();
141 }
142 this.brokerInfo.setBrokerName(name);
143 }
144
145 public TransportServer getServer() throws IOException, URISyntaxException {
146 if (server == null) {
147 setServer(createTransportServer());
148 }
149 return server;
150 }
151
152 public void setServer(TransportServer server) {
153 this.server = server;
154 }
155
156 public URI getUri() {
157 if (uri == null) {
158 try {
159 uri = getConnectUri();
160 } catch (Throwable e) {
161 }
162 }
163 return uri;
164 }
165
166 /**
167 * Sets the server transport URI to use if there is not a
168 * {@link TransportServer} configured via the
169 * {@link #setServer(TransportServer)} method. This value is used to lazy
170 * create a {@link TransportServer} instance
171 *
172 * @param uri
173 */
174 public void setUri(URI uri) {
175 this.uri = uri;
176 }
177
178 public TaskRunnerFactory getTaskRunnerFactory() {
179 return taskRunnerFactory;
180 }
181
182 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
183 this.taskRunnerFactory = taskRunnerFactory;
184 }
185
186 /**
187 * @return the statistics for this connector
188 */
189 public ConnectorStatistics getStatistics() {
190 return statistics;
191 }
192
193 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
194 return messageAuthorizationPolicy;
195 }
196
197 /**
198 * Sets the policy used to decide if the current connection is authorized to
199 * consume a given message
200 */
201 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
202 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
203 }
204
205 public void start() throws Exception {
206 broker = brokerService.getBroker();
207 brokerInfo.setBrokerName(broker.getBrokerName());
208 brokerInfo.setBrokerId(broker.getBrokerId());
209 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
210 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
211 brokerInfo.setBrokerURL(getServer().getConnectURI().toString());
212 final Map context = MDCHelper.getCopyOfContextMap();
213 getServer().setAcceptListener(new TransportAcceptListener() {
214 public void onAccept(final Transport transport) {
215 try {
216 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
217 public void run() {
218 MDCHelper.setContextMap(context);
219 try {
220 Connection connection = createConnection(transport);
221 connection.start();
222 } catch (Exception e) {
223 ServiceSupport.dispose(transport);
224 onAcceptError(e);
225 }
226 }
227 });
228 } catch (Exception e) {
229 String remoteHost = transport.getRemoteAddress();
230 ServiceSupport.dispose(transport);
231 onAcceptError(e, remoteHost);
232 }
233 }
234
235 public void onAcceptError(Exception error) {
236 onAcceptError(error, null);
237 }
238
239 private void onAcceptError(Exception error, String remoteHost) {
240 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
241 + error);
242 LOG.debug("Reason: " + error, error);
243 }
244 });
245 getServer().setBrokerInfo(brokerInfo);
246 getServer().start();
247
248 DiscoveryAgent da = getDiscoveryAgent();
249 if (da != null) {
250 da.registerService(getPublishableConnectString());
251 da.start();
252 }
253 if (enableStatusMonitor) {
254 this.statusDector = new TransportStatusDetector(this);
255 this.statusDector.start();
256 }
257
258 LOG.info("Connector " + getName() + " Started");
259 }
260
261 public String getPublishableConnectString() throws Exception {
262 String publishableConnectString = null;
263 URI theConnectURI = getConnectUri();
264 if (theConnectURI != null) {
265 publishableConnectString = theConnectURI.toString();
266 // strip off server side query parameters which may not be compatible to
267 // clients
268 if (theConnectURI.getRawQuery() != null) {
269 publishableConnectString = publishableConnectString.substring(0, publishableConnectString
270 .indexOf(theConnectURI.getRawQuery()) - 1);
271 }
272 }
273 if (LOG.isDebugEnabled()) {
274 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
275 }
276 return publishableConnectString;
277 }
278
279 public void stop() throws Exception {
280 ServiceStopper ss = new ServiceStopper();
281 if (discoveryAgent != null) {
282 ss.stop(discoveryAgent);
283 }
284 if (server != null) {
285 ss.stop(server);
286 server = null;
287 }
288 if (this.statusDector != null) {
289 this.statusDector.stop();
290 }
291
292 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
293 TransportConnection c = iter.next();
294 ss.stop(c);
295 }
296 ss.throwFirstException();
297 LOG.info("Connector " + getName() + " Stopped");
298 }
299
300 // Implementation methods
301 // -------------------------------------------------------------------------
302 protected Connection createConnection(Transport transport) throws IOException {
303 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
304 : taskRunnerFactory);
305 boolean statEnabled = this.getStatistics().isEnabled();
306 answer.getStatistics().setEnabled(statEnabled);
307 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
308 return answer;
309 }
310
311 protected TransportServer createTransportServer() throws IOException, URISyntaxException {
312 if (uri == null) {
313 throw new IllegalArgumentException("You must specify either a server or uri property");
314 }
315 if (brokerService == null) {
316 throw new IllegalArgumentException(
317 "You must specify the brokerService property. Maybe this connector should be added to a broker?");
318 }
319 return TransportFactory.bind(brokerService, uri);
320 }
321
322 public DiscoveryAgent getDiscoveryAgent() throws IOException {
323 if (discoveryAgent == null) {
324 discoveryAgent = createDiscoveryAgent();
325 }
326 return discoveryAgent;
327 }
328
329 protected DiscoveryAgent createDiscoveryAgent() throws IOException {
330 if (discoveryUri != null) {
331 return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
332 }
333 return null;
334 }
335
336 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
337 this.discoveryAgent = discoveryAgent;
338 }
339
340 public URI getDiscoveryUri() {
341 return discoveryUri;
342 }
343
344 public void setDiscoveryUri(URI discoveryUri) {
345 this.discoveryUri = discoveryUri;
346 }
347
348 public URI getConnectUri() throws IOException, URISyntaxException {
349 if (connectUri == null) {
350 if (server != null) {
351 connectUri = server.getConnectURI();
352 }
353 }
354 return connectUri;
355 }
356
357 public void setConnectUri(URI transportUri) {
358 this.connectUri = transportUri;
359 }
360
361 public void onStarted(TransportConnection connection) {
362 connections.add(connection);
363 }
364
365 public void onStopped(TransportConnection connection) {
366 connections.remove(connection);
367 }
368
369 public String getName() {
370 if (name == null) {
371 uri = getUri();
372 if (uri != null) {
373 name = uri.toString();
374 }
375 }
376 return name;
377 }
378
379 public void setName(String name) {
380 this.name = name;
381 }
382
383 @Override
384 public String toString() {
385 String rc = getName();
386 if (rc == null) {
387 rc = super.toString();
388 }
389 return rc;
390 }
391
392 protected ConnectionControl getConnectionControl() {
393 boolean rebalance = isRebalanceClusterClients();
394 String connectedBrokers = "";
395 String self = "";
396
397 if (isUpdateClusterClients()) {
398 if (brokerService.getDefaultSocketURIString() != null) {
399 self += brokerService.getDefaultSocketURIString();
400 self += ",";
401 }
402 if (rebalance == false) {
403 connectedBrokers += self;
404 }
405 if (this.broker.getPeerBrokerInfos() != null) {
406 for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
407 if (isMatchesClusterFilter(info.getBrokerName())) {
408 connectedBrokers += info.getBrokerURL();
409 connectedBrokers += ",";
410 }
411 }
412 }
413 if (rebalance) {
414 connectedBrokers += self;
415 }
416 }
417
418 ConnectionControl control = new ConnectionControl();
419 control.setConnectedBrokers(connectedBrokers);
420 control.setRebalanceConnection(rebalance);
421 return control;
422
423 }
424
425 public void updateClientClusterInfo() {
426 if (isRebalanceClusterClients() || isUpdateClusterClients()) {
427 ConnectionControl control = getConnectionControl();
428 for (Connection c : this.connections) {
429 c.updateClient(control);
430 }
431 }
432 }
433
434 private boolean isMatchesClusterFilter(String brokerName) {
435 boolean result = true;
436 String filter = getUpdateClusterFilter();
437 if (filter != null) {
438 filter = filter.trim();
439 if (filter.length() > 0) {
440 StringTokenizer tokenizer = new StringTokenizer(filter, ",");
441 while (result && tokenizer.hasMoreTokens()) {
442 String token = tokenizer.nextToken();
443 result = isMatchesClusterFilter(brokerName, token);
444 }
445 }
446 }
447 return result;
448 }
449
450 private boolean isMatchesClusterFilter(String brokerName, String match) {
451 boolean result = true;
452 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
453 result = Pattern.matches(match, brokerName);
454 }
455 return result;
456 }
457
458 public boolean isDisableAsyncDispatch() {
459 return disableAsyncDispatch;
460 }
461
462 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
463 this.disableAsyncDispatch = disableAsyncDispatch;
464 }
465
466 /**
467 * @return the enableStatusMonitor
468 */
469 public boolean isEnableStatusMonitor() {
470 return enableStatusMonitor;
471 }
472
473 /**
474 * @param enableStatusMonitor
475 * the enableStatusMonitor to set
476 */
477 public void setEnableStatusMonitor(boolean enableStatusMonitor) {
478 this.enableStatusMonitor = enableStatusMonitor;
479 }
480
481 /**
482 * This is called by the BrokerService right before it starts the transport.
483 */
484 public void setBrokerService(BrokerService brokerService) {
485 this.brokerService = brokerService;
486 }
487
488 public Broker getBroker() {
489 return broker;
490 }
491
492 public BrokerService getBrokerService() {
493 return brokerService;
494 }
495
496 /**
497 * @return the updateClusterClients
498 */
499 public boolean isUpdateClusterClients() {
500 return this.updateClusterClients;
501 }
502
503 /**
504 * @param updateClusterClients
505 * the updateClusterClients to set
506 */
507 public void setUpdateClusterClients(boolean updateClusterClients) {
508 this.updateClusterClients = updateClusterClients;
509 }
510
511 /**
512 * @return the rebalanceClusterClients
513 */
514 public boolean isRebalanceClusterClients() {
515 return this.rebalanceClusterClients;
516 }
517
518 /**
519 * @param rebalanceClusterClients
520 * the rebalanceClusterClients to set
521 */
522 public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
523 this.rebalanceClusterClients = rebalanceClusterClients;
524 }
525
526 /**
527 * @return the updateClusterClientsOnRemove
528 */
529 public boolean isUpdateClusterClientsOnRemove() {
530 return this.updateClusterClientsOnRemove;
531 }
532
533 /**
534 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
535 */
536 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
537 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
538 }
539
540 /**
541 * @return the updateClusterFilter
542 */
543 public String getUpdateClusterFilter() {
544 return this.updateClusterFilter;
545 }
546
547 /**
548 * @param updateClusterFilter
549 * the updateClusterFilter to set
550 */
551 public void setUpdateClusterFilter(String updateClusterFilter) {
552 this.updateClusterFilter = updateClusterFilter;
553 }
554
555 }