001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.security.GeneralSecurityException;
021 import java.security.cert.X509Certificate;
022 import java.util.*;
023 import java.util.concurrent.ConcurrentHashMap;
024 import java.util.concurrent.CountDownLatch;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicLong;
028
029 import org.apache.activemq.Service;
030 import org.apache.activemq.advisory.AdvisorySupport;
031 import org.apache.activemq.broker.BrokerService;
032 import org.apache.activemq.broker.BrokerServiceAware;
033 import org.apache.activemq.broker.TransportConnection;
034 import org.apache.activemq.broker.region.AbstractRegion;
035 import org.apache.activemq.broker.region.RegionBroker;
036 import org.apache.activemq.broker.region.Subscription;
037 import org.apache.activemq.command.ActiveMQDestination;
038 import org.apache.activemq.command.ActiveMQMessage;
039 import org.apache.activemq.command.ActiveMQTempDestination;
040 import org.apache.activemq.command.ActiveMQTopic;
041 import org.apache.activemq.command.BrokerId;
042 import org.apache.activemq.command.BrokerInfo;
043 import org.apache.activemq.command.Command;
044 import org.apache.activemq.command.ConnectionError;
045 import org.apache.activemq.command.ConnectionId;
046 import org.apache.activemq.command.ConnectionInfo;
047 import org.apache.activemq.command.ConsumerId;
048 import org.apache.activemq.command.ConsumerInfo;
049 import org.apache.activemq.command.DataStructure;
050 import org.apache.activemq.command.DestinationInfo;
051 import org.apache.activemq.command.ExceptionResponse;
052 import org.apache.activemq.command.KeepAliveInfo;
053 import org.apache.activemq.command.Message;
054 import org.apache.activemq.command.MessageAck;
055 import org.apache.activemq.command.MessageDispatch;
056 import org.apache.activemq.command.NetworkBridgeFilter;
057 import org.apache.activemq.command.ProducerInfo;
058 import org.apache.activemq.command.RemoveInfo;
059 import org.apache.activemq.command.Response;
060 import org.apache.activemq.command.SessionInfo;
061 import org.apache.activemq.command.ShutdownInfo;
062 import org.apache.activemq.command.WireFormatInfo;
063 import org.apache.activemq.filter.DestinationFilter;
064 import org.apache.activemq.filter.MessageEvaluationContext;
065 import org.apache.activemq.thread.DefaultThreadPools;
066 import org.apache.activemq.thread.TaskRunnerFactory;
067 import org.apache.activemq.transport.DefaultTransportListener;
068 import org.apache.activemq.transport.FutureResponse;
069 import org.apache.activemq.transport.ResponseCallback;
070 import org.apache.activemq.transport.Transport;
071 import org.apache.activemq.transport.TransportDisposedIOException;
072 import org.apache.activemq.transport.TransportFilter;
073 import org.apache.activemq.transport.TransportListener;
074 import org.apache.activemq.transport.tcp.SslTransport;
075 import org.apache.activemq.util.*;
076 import org.slf4j.Logger;
077 import org.slf4j.LoggerFactory;
078 import org.slf4j.MDC;
079
080 /**
081 * A useful base class for implementing demand forwarding bridges.
082 *
083 *
084 */
085 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
086 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
087 private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
088 protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
089 protected final Transport localBroker;
090 protected final Transport remoteBroker;
091 protected final IdGenerator idGenerator = new IdGenerator();
092 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
093 protected ConnectionInfo localConnectionInfo;
094 protected ConnectionInfo remoteConnectionInfo;
095 protected SessionInfo localSessionInfo;
096 protected ProducerInfo producerInfo;
097 protected String remoteBrokerName = "Unknown";
098 protected String localClientId;
099 protected ConsumerInfo demandConsumerInfo;
100 protected int demandConsumerDispatched;
101 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
102 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
103 protected AtomicBoolean disposed = new AtomicBoolean();
104 protected BrokerId localBrokerId;
105 protected ActiveMQDestination[] excludedDestinations;
106 protected ActiveMQDestination[] dynamicallyIncludedDestinations;
107 protected ActiveMQDestination[] staticallyIncludedDestinations;
108 protected ActiveMQDestination[] durableDestinations;
109 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
110 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
111 protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
112 protected CountDownLatch startedLatch = new CountDownLatch(2);
113 protected CountDownLatch localStartedLatch = new CountDownLatch(1);
114 protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
115 protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
116 protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
117 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
118 protected NetworkBridgeConfiguration configuration;
119
120 final AtomicLong enqueueCounter = new AtomicLong();
121 final AtomicLong dequeueCounter = new AtomicLong();
122
123 private NetworkBridgeListener networkBridgeListener;
124 private boolean createdByDuplex;
125 private BrokerInfo localBrokerInfo;
126 private BrokerInfo remoteBrokerInfo;
127
128 private final AtomicBoolean started = new AtomicBoolean();
129 private TransportConnection duplexInitiatingConnection;
130 private BrokerService brokerService = null;
131
132 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
133 this.configuration = configuration;
134 this.localBroker = localBroker;
135 this.remoteBroker = remoteBroker;
136 }
137
138 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
139 this.localBrokerInfo = localBrokerInfo;
140 this.remoteBrokerInfo = remoteBrokerInfo;
141 this.duplexInitiatingConnection = connection;
142 start();
143 serviceRemoteCommand(remoteBrokerInfo);
144 }
145
146 public void start() throws Exception {
147 if (started.compareAndSet(false, true)) {
148 localBroker.setTransportListener(new DefaultTransportListener() {
149
150 @Override
151 public void onCommand(Object o) {
152 Command command = (Command) o;
153 serviceLocalCommand(command);
154 }
155
156 @Override
157 public void onException(IOException error) {
158 serviceLocalException(error);
159 }
160 });
161 remoteBroker.setTransportListener(new TransportListener() {
162
163 public void onCommand(Object o) {
164 Command command = (Command) o;
165 serviceRemoteCommand(command);
166 }
167
168 public void onException(IOException error) {
169 serviceRemoteException(error);
170 }
171
172 public void transportInterupted() {
173 // clear any subscriptions - to try and prevent the bridge
174 // from stalling the broker
175 if (remoteInterupted.compareAndSet(false, true)) {
176 LOG.info("Outbound transport to " + remoteBrokerName + " interrupted.");
177 if (localBridgeStarted.get()) {
178 clearDownSubscriptions();
179 synchronized (DemandForwardingBridgeSupport.this) {
180 try {
181 localBroker.oneway(localConnectionInfo.createRemoveCommand());
182 } catch (TransportDisposedIOException td) {
183 LOG.debug("local broker is now disposed", td);
184 } catch (IOException e) {
185 LOG.warn("Caught exception from local start", e);
186 }
187 }
188 }
189 localBridgeStarted.set(false);
190 remoteBridgeStarted.set(false);
191 startedLatch = new CountDownLatch(2);
192 localStartedLatch = new CountDownLatch(1);
193 }
194 }
195
196 public void transportResumed() {
197 if (remoteInterupted.compareAndSet(true, false)) {
198 // We want to slow down false connects so that we don't
199 // get in a busy loop.
200 // False connects can occurr if you using SSH tunnels.
201 if (!lastConnectSucceeded.get()) {
202 try {
203 LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
204 Thread.sleep(1000);
205 } catch (InterruptedException e) {
206 Thread.currentThread().interrupt();
207 }
208 }
209 lastConnectSucceeded.set(false);
210 try {
211 startLocalBridge();
212 remoteBridgeStarted.set(true);
213 startedLatch.countDown();
214 LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
215 } catch (Throwable e) {
216 LOG.error("Caught exception from local start in resume transport", e);
217 serviceLocalException(e);
218 }
219 }
220 }
221 });
222
223 localBroker.start();
224 remoteBroker.start();
225 if (!disposed.get()) {
226 try {
227 triggerRemoteStartBridge();
228 } catch (IOException e) {
229 LOG.warn("Caught exception from remote start", e);
230 }
231 } else {
232 LOG.warn ("Bridge was disposed before the start() method was fully executed.");
233 throw new TransportDisposedIOException();
234 }
235 }
236 }
237
238 protected void triggerLocalStartBridge() throws IOException {
239 final Map context = MDCHelper.getCopyOfContextMap();
240 asyncTaskRunner.execute(new Runnable() {
241 public void run() {
242 MDCHelper.setContextMap(context);
243 final String originalName = Thread.currentThread().getName();
244 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
245 try {
246 startLocalBridge();
247 } catch (Throwable e) {
248 serviceLocalException(e);
249 } finally {
250 Thread.currentThread().setName(originalName);
251 }
252 }
253 });
254 }
255
256 protected void triggerRemoteStartBridge() throws IOException {
257 final Map context = MDCHelper.getCopyOfContextMap();
258 asyncTaskRunner.execute(new Runnable() {
259 public void run() {
260 MDCHelper.setContextMap(context);
261 final String originalName = Thread.currentThread().getName();
262 Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
263 try {
264 startRemoteBridge();
265 } catch (Exception e) {
266 serviceRemoteException(e);
267 } finally {
268 Thread.currentThread().setName(originalName);
269 }
270 }
271 });
272 }
273
274 protected void startLocalBridge() throws Throwable {
275 if (localBridgeStarted.compareAndSet(false, true)) {
276 synchronized (this) {
277 if (LOG.isTraceEnabled()) {
278 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
279 }
280 remoteBrokerNameKnownLatch.await();
281
282 if (!disposed.get()) {
283 localConnectionInfo = new ConnectionInfo();
284 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
285 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
286 localConnectionInfo.setClientId(localClientId);
287 localConnectionInfo.setUserName(configuration.getUserName());
288 localConnectionInfo.setPassword(configuration.getPassword());
289 Transport originalTransport = remoteBroker;
290 while (originalTransport instanceof TransportFilter) {
291 originalTransport = ((TransportFilter) originalTransport).getNext();
292 }
293 if (originalTransport instanceof SslTransport) {
294 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
295 localConnectionInfo.setTransportContext(peerCerts);
296 }
297 // sync requests that may fail
298 Object resp = localBroker.request(localConnectionInfo);
299 if (resp instanceof ExceptionResponse) {
300 throw ((ExceptionResponse)resp).getException();
301 }
302 localSessionInfo = new SessionInfo(localConnectionInfo, 1);
303 localBroker.oneway(localSessionInfo);
304
305 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
306 NetworkBridgeListener l = this.networkBridgeListener;
307 if (l != null) {
308 l.onStart(this);
309 }
310 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
311
312 } else {
313 LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
314 }
315 startedLatch.countDown();
316 localStartedLatch.countDown();
317 if (!disposed.get()) {
318 setupStaticDestinations();
319 } else {
320 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
321 }
322 }
323 }
324 }
325
326 protected void startRemoteBridge() throws Exception {
327 if (remoteBridgeStarted.compareAndSet(false, true)) {
328 if (LOG.isTraceEnabled()) {
329 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker);
330 }
331 synchronized (this) {
332 if (!isCreatedByDuplex()) {
333 BrokerInfo brokerInfo = new BrokerInfo();
334 brokerInfo.setBrokerName(configuration.getBrokerName());
335 brokerInfo.setBrokerURL(configuration.getBrokerURL());
336 brokerInfo.setNetworkConnection(true);
337 brokerInfo.setDuplexConnection(configuration.isDuplex());
338 // set our properties
339 Properties props = new Properties();
340 IntrospectionSupport.getProperties(configuration, props, null);
341 String str = MarshallingSupport.propertiesToString(props);
342 brokerInfo.setNetworkProperties(str);
343 brokerInfo.setBrokerId(this.localBrokerId);
344 remoteBroker.oneway(brokerInfo);
345 }
346 if (remoteConnectionInfo != null) {
347 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
348 }
349 remoteConnectionInfo = new ConnectionInfo();
350 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
351 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
352 remoteConnectionInfo.setUserName(configuration.getUserName());
353 remoteConnectionInfo.setPassword(configuration.getPassword());
354 remoteBroker.oneway(remoteConnectionInfo);
355
356 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
357 remoteBroker.oneway(remoteSessionInfo);
358 producerInfo = new ProducerInfo(remoteSessionInfo, 1);
359 producerInfo.setResponseRequired(false);
360 remoteBroker.oneway(producerInfo);
361 // Listen to consumer advisory messages on the remote broker to
362 // determine demand.
363 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
364 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
365 String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter();
366 if (configuration.isBridgeTempDestinations()) {
367 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
368 }
369 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
370 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
371 remoteBroker.oneway(demandConsumerInfo);
372 startedLatch.countDown();
373 if (!disposed.get()) {
374 triggerLocalStartBridge();
375 }
376 }
377 }
378 }
379
380 public void stop() throws Exception {
381 if (started.compareAndSet(true, false)) {
382 if (disposed.compareAndSet(false, true)) {
383 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
384 NetworkBridgeListener l = this.networkBridgeListener;
385 if (l != null) {
386 l.onStop(this);
387 }
388 try {
389 remoteBridgeStarted.set(false);
390 final CountDownLatch sendShutdown = new CountDownLatch(1);
391 final Map map = MDCHelper.getCopyOfContextMap();
392 asyncTaskRunner.execute(new Runnable() {
393 public void run() {
394 try {
395 MDCHelper.setContextMap(map);
396 localBroker.oneway(new ShutdownInfo());
397 sendShutdown.countDown();
398 remoteBroker.oneway(new ShutdownInfo());
399 } catch (Throwable e) {
400 LOG.debug("Caught exception sending shutdown", e);
401 } finally {
402 sendShutdown.countDown();
403 }
404
405 }
406 });
407 if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
408 LOG.info("Network Could not shutdown in a timely manner");
409 }
410 } finally {
411 ServiceStopper ss = new ServiceStopper();
412 ss.stop(remoteBroker);
413 ss.stop(localBroker);
414 // Release the started Latch since another thread could be
415 // stuck waiting for it to start up.
416 startedLatch.countDown();
417 startedLatch.countDown();
418 localStartedLatch.countDown();
419 ss.throwFirstException();
420 }
421 }
422 brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
423 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
424 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
425 remoteBrokerNameKnownLatch.countDown();
426 }
427 }
428
429 public void serviceRemoteException(Throwable error) {
430 if (!disposed.get()) {
431 if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
432 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
433 } else {
434 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
435 }
436 LOG.debug("The remote Exception was: " + error, error);
437 final Map map = MDCHelper.getCopyOfContextMap();
438 asyncTaskRunner.execute(new Runnable() {
439 public void run() {
440 MDCHelper.setContextMap(map);
441 ServiceSupport.dispose(getControllingService());
442 }
443 });
444 fireBridgeFailed();
445 }
446 }
447
448 protected void serviceRemoteCommand(Command command) {
449 if (!disposed.get()) {
450 try {
451 if (command.isMessageDispatch()) {
452 waitStarted();
453 MessageDispatch md = (MessageDispatch) command;
454 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
455 demandConsumerDispatched++;
456 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
457 remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
458 demandConsumerDispatched = 0;
459 }
460 } else if (command.isBrokerInfo()) {
461 lastConnectSucceeded.set(true);
462 remoteBrokerInfo = (BrokerInfo) command;
463 Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
464 try {
465 IntrospectionSupport.getProperties(configuration, props, null);
466 if (configuration.getExcludedDestinations() != null) {
467 excludedDestinations = configuration.getExcludedDestinations().toArray(
468 new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
469 }
470 if (configuration.getStaticallyIncludedDestinations() != null) {
471 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
472 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
473 }
474 if (configuration.getDynamicallyIncludedDestinations() != null) {
475 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
476 .toArray(
477 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
478 .size()]);
479 }
480 } catch (Throwable t) {
481 LOG.error("Error mapping remote destinations", t);
482 }
483 serviceRemoteBrokerInfo(command);
484 // Let the local broker know the remote broker's ID.
485 localBroker.oneway(command);
486 // new peer broker (a consumer can work with remote broker also)
487 brokerService.getBroker().addBroker(null, remoteBrokerInfo);
488 } else if (command.getClass() == ConnectionError.class) {
489 ConnectionError ce = (ConnectionError) command;
490 serviceRemoteException(ce.getException());
491 } else {
492 if (isDuplex()) {
493 if (command.isMessage()) {
494 ActiveMQMessage message = (ActiveMQMessage) command;
495 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
496 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
497 serviceRemoteConsumerAdvisory(message.getDataStructure());
498 } else {
499 if (!isPermissableDestination(message.getDestination(), true)) {
500 return;
501 }
502 if (message.isResponseRequired()) {
503 Response reply = new Response();
504 reply.setCorrelationId(message.getCommandId());
505 localBroker.oneway(message);
506 remoteBroker.oneway(reply);
507 } else {
508 localBroker.oneway(message);
509 }
510 }
511 } else {
512 switch (command.getDataStructureType()) {
513 case ConnectionInfo.DATA_STRUCTURE_TYPE:
514 case SessionInfo.DATA_STRUCTURE_TYPE:
515 case ProducerInfo.DATA_STRUCTURE_TYPE:
516 localBroker.oneway(command);
517 break;
518 case ConsumerInfo.DATA_STRUCTURE_TYPE:
519 localStartedLatch.await();
520 if (started.get()) {
521 if (!addConsumerInfo((ConsumerInfo) command)) {
522 if (LOG.isDebugEnabled()) {
523 LOG.debug("Ignoring ConsumerInfo: " + command);
524 }
525 } else {
526 if (LOG.isTraceEnabled()) {
527 LOG.trace("Adding ConsumerInfo: " + command);
528 }
529 }
530 } else {
531 // received a subscription whilst stopping
532 LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
533 }
534 break;
535 case ShutdownInfo.DATA_STRUCTURE_TYPE:
536 // initiator is shutting down, controlled case
537 // abortive close dealt with by inactivity monitor
538 LOG.info("Stopping network bridge on shutdown of remote broker");
539 serviceRemoteException(new IOException(command.toString()));
540 break;
541 default:
542 if (LOG.isDebugEnabled()) {
543 LOG.debug("Ignoring remote command: " + command);
544 }
545 }
546 }
547 } else {
548 switch (command.getDataStructureType()) {
549 case KeepAliveInfo.DATA_STRUCTURE_TYPE:
550 case WireFormatInfo.DATA_STRUCTURE_TYPE:
551 case ShutdownInfo.DATA_STRUCTURE_TYPE:
552 break;
553 default:
554 LOG.warn("Unexpected remote command: " + command);
555 }
556 }
557 }
558 } catch (Throwable e) {
559 if (LOG.isDebugEnabled()) {
560 LOG.debug("Exception processing remote command: " + command, e);
561 }
562 serviceRemoteException(e);
563 }
564 }
565 }
566
567 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
568 final int networkTTL = configuration.getNetworkTTL();
569 if (data.getClass() == ConsumerInfo.class) {
570 // Create a new local subscription
571 ConsumerInfo info = (ConsumerInfo) data;
572 BrokerId[] path = info.getBrokerPath();
573
574 if (info.isBrowser()) {
575 if (LOG.isDebugEnabled()) {
576 LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
577 }
578 return;
579 }
580
581 if (path != null && path.length >= networkTTL) {
582 if (LOG.isDebugEnabled()) {
583 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
584 }
585 return;
586 }
587 if (contains(path, localBrokerPath[0])) {
588 // Ignore this consumer as it's a consumer we locally sent to the broker.
589 if (LOG.isDebugEnabled()) {
590 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
591 }
592 return;
593 }
594 if (!isPermissableDestination(info.getDestination())) {
595 // ignore if not in the permitted or in the excluded list
596 if (LOG.isDebugEnabled()) {
597 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
598 }
599 return;
600 }
601
602 // in a cyclic network there can be multiple bridges per broker that can propagate
603 // a network subscription so there is a need to synchronise on a shared entity
604 synchronized (brokerService.getVmConnectorURI()) {
605 if (addConsumerInfo(info)) {
606 if (LOG.isDebugEnabled()) {
607 LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
608 }
609 } else {
610 if (LOG.isDebugEnabled()) {
611 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
612 }
613 }
614 }
615 } else if (data.getClass() == DestinationInfo.class) {
616 // It's a destination info - we want to pass up
617 // information about temporary destinations
618 DestinationInfo destInfo = (DestinationInfo) data;
619 BrokerId[] path = destInfo.getBrokerPath();
620 if (path != null && path.length >= networkTTL) {
621 if (LOG.isDebugEnabled()) {
622 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
623 }
624 return;
625 }
626 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
627 // Ignore this consumer as it's a consumer we locally sent to
628 // the broker.
629 if (LOG.isDebugEnabled()) {
630 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
631 }
632 return;
633 }
634 destInfo.setConnectionId(localConnectionInfo.getConnectionId());
635 if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
636 // re-set connection id so comes from here
637 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
638 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
639 }
640 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
641 if (LOG.isTraceEnabled()) {
642 LOG.trace("bridging destination control command: " + destInfo);
643 }
644 localBroker.oneway(destInfo);
645 } else if (data.getClass() == RemoveInfo.class) {
646 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
647 removeDemandSubscription(id);
648 }
649 }
650
651 public void serviceLocalException(Throwable error) {
652 if (!disposed.get()) {
653 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
654 LOG.debug("The local Exception was:" + error, error);
655 final Map map = MDCHelper.getCopyOfContextMap();
656 asyncTaskRunner.execute(new Runnable() {
657 public void run() {
658 MDCHelper.setContextMap(map);
659 ServiceSupport.dispose(getControllingService());
660 }
661 });
662 fireBridgeFailed();
663 }
664 }
665
666 protected Service getControllingService() {
667 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
668 }
669
670 protected void addSubscription(DemandSubscription sub) throws IOException {
671 if (sub != null) {
672 localBroker.oneway(sub.getLocalInfo());
673 }
674 }
675
676 protected void removeSubscription(final DemandSubscription sub) throws IOException {
677 if (sub != null) {
678 if (LOG.isDebugEnabled()) {
679 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
680 }
681 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
682
683 // continue removal in separate thread to free up this thread for outstanding responses
684 final Map map = MDCHelper.getCopyOfContextMap();
685 asyncTaskRunner.execute(new Runnable() {
686 public void run() {
687 MDCHelper.setContextMap(map);
688 sub.waitForCompletion();
689 try {
690 localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
691 } catch (IOException e) {
692 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
693 }
694 }
695 });
696 }
697 }
698
699 protected Message configureMessage(MessageDispatch md) {
700 Message message = md.getMessage().copy();
701 // Update the packet to show where it came from.
702 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
703 message.setProducerId(producerInfo.getProducerId());
704 message.setDestination(md.getDestination());
705 if (message.getOriginalTransactionId() == null) {
706 message.setOriginalTransactionId(message.getTransactionId());
707 }
708 message.setTransactionId(null);
709 return message;
710 }
711
712 protected void serviceLocalCommand(Command command) {
713 if (!disposed.get()) {
714 try {
715 if (command.isMessageDispatch()) {
716 enqueueCounter.incrementAndGet();
717 final MessageDispatch md = (MessageDispatch) command;
718 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
719 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
720
721 if (suppressMessageDispatch(md, sub)) {
722 if (LOG.isDebugEnabled()) {
723 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
724 }
725 // still ack as it may be durable
726 try {
727 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
728 } finally {
729 sub.decrementOutstandingResponses();
730 }
731 return;
732 }
733
734 Message message = configureMessage(md);
735 if (LOG.isDebugEnabled()) {
736 LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
737 }
738
739 if (!message.isResponseRequired()) {
740
741 // If the message was originally sent using async
742 // send, we will preserve that QOS
743 // by bridging it using an async send (small chance
744 // of message loss).
745 try {
746 remoteBroker.oneway(message);
747 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
748 dequeueCounter.incrementAndGet();
749 } finally {
750 sub.decrementOutstandingResponses();
751 }
752
753 } else {
754
755 // The message was not sent using async send, so we
756 // should only ack the local
757 // broker when we get confirmation that the remote
758 // broker has received the message.
759 ResponseCallback callback = new ResponseCallback() {
760 public void onCompletion(FutureResponse future) {
761 try {
762 Response response = future.getResult();
763 if (response.isException()) {
764 ExceptionResponse er = (ExceptionResponse) response;
765 serviceLocalException(er.getException());
766 } else {
767 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
768 dequeueCounter.incrementAndGet();
769 }
770 } catch (IOException e) {
771 serviceLocalException(e);
772 } finally {
773 sub.decrementOutstandingResponses();
774 }
775 }
776 };
777
778 remoteBroker.asyncRequest(message, callback);
779
780 }
781 } else {
782 if (LOG.isDebugEnabled()) {
783 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
784 }
785 }
786 } else if (command.isBrokerInfo()) {
787 localBrokerInfo = (BrokerInfo) command;
788 serviceLocalBrokerInfo(command);
789 } else if (command.isShutdownInfo()) {
790 LOG.info(configuration.getBrokerName() + " Shutting down");
791 // Don't shut down the whole connector if the remote side
792 // was interrupted.
793 // the local transport is just shutting down temporarily
794 // until the remote side
795 // is restored.
796 if (!remoteInterupted.get()) {
797 stop();
798 }
799 } else if (command.getClass() == ConnectionError.class) {
800 ConnectionError ce = (ConnectionError) command;
801 serviceLocalException(ce.getException());
802 } else {
803 switch (command.getDataStructureType()) {
804 case WireFormatInfo.DATA_STRUCTURE_TYPE:
805 break;
806 default:
807 LOG.warn("Unexpected local command: " + command);
808 }
809 }
810 } catch (Throwable e) {
811 LOG.warn("Caught an exception processing local command", e);
812 serviceLocalException(e);
813 }
814 }
815 }
816
817 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
818 // See if this consumer's brokerPath tells us it came from the broker at the other end
819 // of the bridge. I think we should be making this decision based on the message's
820 // broker bread crumbs and not the consumer's? However, the message's broker bread
821 // crumbs are null, which is another matter.
822 boolean suppress = false;
823 Object consumerInfo = md.getMessage().getDataStructure();
824 if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
825 suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
826 }
827
828 // for durable subs, suppression via filter leaves dangling acks so we need to
829 // check here and allow the ack irrespective
830 if (!suppress && sub.getLocalInfo().isDurable()) {
831 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
832 messageEvalContext.setMessageReference(md.getMessage());
833 suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext);
834 }
835 return suppress;
836 }
837
838 /**
839 * @return Returns the dynamicallyIncludedDestinations.
840 */
841 public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
842 return dynamicallyIncludedDestinations;
843 }
844
845 /**
846 * @param dynamicallyIncludedDestinations The
847 * dynamicallyIncludedDestinations to set.
848 */
849 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
850 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
851 }
852
853 /**
854 * @return Returns the excludedDestinations.
855 */
856 public ActiveMQDestination[] getExcludedDestinations() {
857 return excludedDestinations;
858 }
859
860 /**
861 * @param excludedDestinations The excludedDestinations to set.
862 */
863 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
864 this.excludedDestinations = excludedDestinations;
865 }
866
867 /**
868 * @return Returns the staticallyIncludedDestinations.
869 */
870 public ActiveMQDestination[] getStaticallyIncludedDestinations() {
871 return staticallyIncludedDestinations;
872 }
873
874 /**
875 * @param staticallyIncludedDestinations The staticallyIncludedDestinations
876 * to set.
877 */
878 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
879 this.staticallyIncludedDestinations = staticallyIncludedDestinations;
880 }
881
882 /**
883 * @return Returns the durableDestinations.
884 */
885 public ActiveMQDestination[] getDurableDestinations() {
886 return durableDestinations;
887 }
888
889 /**
890 * @param durableDestinations The durableDestinations to set.
891 */
892 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
893 this.durableDestinations = durableDestinations;
894 }
895
896 /**
897 * @return Returns the localBroker.
898 */
899 public Transport getLocalBroker() {
900 return localBroker;
901 }
902
903 /**
904 * @return Returns the remoteBroker.
905 */
906 public Transport getRemoteBroker() {
907 return remoteBroker;
908 }
909
910 /**
911 * @return the createdByDuplex
912 */
913 public boolean isCreatedByDuplex() {
914 return this.createdByDuplex;
915 }
916
917 /**
918 * @param createdByDuplex the createdByDuplex to set
919 */
920 public void setCreatedByDuplex(boolean createdByDuplex) {
921 this.createdByDuplex = createdByDuplex;
922 }
923
924 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
925 if (brokerPath != null) {
926 for (int i = 0; i < brokerPath.length; i++) {
927 if (brokerId.equals(brokerPath[i])) {
928 return true;
929 }
930 }
931 }
932 return false;
933 }
934
935 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
936 if (brokerPath == null || brokerPath.length == 0) {
937 return pathsToAppend;
938 }
939 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
940 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
941 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
942 return rc;
943 }
944
945 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
946 if (brokerPath == null || brokerPath.length == 0) {
947 return new BrokerId[] { idToAppend };
948 }
949 BrokerId rc[] = new BrokerId[brokerPath.length + 1];
950 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
951 rc[brokerPath.length] = idToAppend;
952 return rc;
953 }
954
955 protected boolean isPermissableDestination(ActiveMQDestination destination) {
956 return isPermissableDestination(destination, false);
957 }
958
959 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
960 // Are we not bridging temp destinations?
961 if (destination.isTemporary()) {
962 if (allowTemporary) {
963 return true;
964 } else {
965 return configuration.isBridgeTempDestinations();
966 }
967 }
968
969 ActiveMQDestination[] dests = excludedDestinations;
970 if (dests != null && dests.length > 0) {
971 for (int i = 0; i < dests.length; i++) {
972 ActiveMQDestination match = dests[i];
973 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
974 if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
975 return false;
976 }
977 }
978 }
979
980 dests = dynamicallyIncludedDestinations;
981 if (dests != null && dests.length > 0) {
982 for (int i = 0; i < dests.length; i++) {
983 ActiveMQDestination match = dests[i];
984 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
985 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
986 return true;
987 }
988 }
989
990 return false;
991 }
992 return true;
993 }
994
995 /**
996 * Subscriptions for these destinations are always created
997 */
998 protected void setupStaticDestinations() {
999 ActiveMQDestination[] dests = staticallyIncludedDestinations;
1000 if (dests != null) {
1001 for (int i = 0; i < dests.length; i++) {
1002 ActiveMQDestination dest = dests[i];
1003 DemandSubscription sub = createDemandSubscription(dest);
1004 try {
1005 addSubscription(sub);
1006 } catch (IOException e) {
1007 LOG.error("Failed to add static destination " + dest, e);
1008 }
1009 if (LOG.isTraceEnabled()) {
1010 LOG.trace("bridging messages for static destination: " + dest);
1011 }
1012 }
1013 }
1014 }
1015
1016 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1017 boolean consumerAdded = false;
1018 ConsumerInfo info = consumerInfo.copy();
1019 addRemoteBrokerToBrokerPath(info);
1020 DemandSubscription sub = createDemandSubscription(info);
1021 if (sub != null) {
1022 if (duplicateSuppressionIsRequired(sub)) {
1023 undoMapRegistration(sub);
1024 } else {
1025 addSubscription(sub);
1026 consumerAdded = true;
1027 }
1028 }
1029 return consumerAdded;
1030 }
1031
1032 private void undoMapRegistration(DemandSubscription sub) {
1033 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1034 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1035 }
1036
1037 /*
1038 * check our existing subs networkConsumerIds against the list of network ids in this subscription
1039 * A match means a duplicate which we suppress for topics and maybe for queues
1040 */
1041 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1042 final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1043 boolean suppress = false;
1044
1045 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
1046 consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1047 return suppress;
1048 }
1049
1050 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1051 Collection<Subscription> currentSubs =
1052 getRegionSubscriptions(consumerInfo.getDestination().isTopic());
1053 for (Subscription sub : currentSubs) {
1054 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1055 if (!networkConsumers.isEmpty()) {
1056 if (matchFound(candidateConsumers, networkConsumers)) {
1057 suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1058 break;
1059 }
1060 }
1061 }
1062 return suppress;
1063 }
1064
1065 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1066 boolean suppress = false;
1067
1068 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1069 if (LOG.isDebugEnabled()) {
1070 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1071 + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
1072 + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1073 }
1074 suppress = true;
1075 } else {
1076 // remove the existing lower priority duplicate and allow this candidate
1077 try {
1078 removeDuplicateSubscription(existingSub);
1079
1080 if (LOG.isDebugEnabled()) {
1081 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1082 + " with sub from " + remoteBrokerName
1083 + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1084 + candidateInfo.getNetworkConsumerIds());
1085 }
1086 } catch (IOException e) {
1087 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1088 }
1089 }
1090 return suppress;
1091 }
1092
1093 private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1094 for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1095 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1096 break;
1097 }
1098 }
1099 }
1100
1101 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1102 boolean found = false;
1103 for (ConsumerId aliasConsumer : networkConsumers) {
1104 if (candidateConsumers.contains(aliasConsumer)) {
1105 found = true;
1106 break;
1107 }
1108 }
1109 return found;
1110 }
1111
1112 private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
1113 RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
1114 AbstractRegion abstractRegion = (AbstractRegion)
1115 (isTopic ? region.getTopicRegion() : region.getQueueRegion());
1116 return abstractRegion.getSubscriptions().values();
1117 }
1118
1119 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1120 //add our original id to ourselves
1121 info.addNetworkConsumerId(info.getConsumerId());
1122 return doCreateDemandSubscription(info);
1123 }
1124
1125 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1126 DemandSubscription result = new DemandSubscription(info);
1127 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1128 if (info.getDestination().isTemporary()) {
1129 // reset the local connection Id
1130
1131 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1132 dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1133 }
1134
1135 if (configuration.isDecreaseNetworkConsumerPriority()) {
1136 byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
1137 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1138 // The longer the path to the consumer, the less it's consumer priority.
1139 priority -= info.getBrokerPath().length + 1;
1140 }
1141 result.getLocalInfo().setPriority(priority);
1142 if (LOG.isDebugEnabled()) {
1143 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1144 }
1145 }
1146 configureDemandSubscription(info, result);
1147 return result;
1148 }
1149
1150 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1151 ConsumerInfo info = new ConsumerInfo();
1152 info.setDestination(destination);
1153 // the remote info held by the DemandSubscription holds the original
1154 // consumerId,
1155 // the local info get's overwritten
1156
1157 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1158 DemandSubscription result = null;
1159 try {
1160 result = createDemandSubscription(info);
1161 } catch (IOException e) {
1162 LOG.error("Failed to create DemandSubscription ", e);
1163 }
1164 if (result != null) {
1165 result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
1166 }
1167 return result;
1168 }
1169
1170 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1171 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1172 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1173 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1174 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1175
1176 if (!info.isDurable()) {
1177 // This works for now since we use a VM connection to the local broker.
1178 // may need to change if we ever subscribe to a remote broker.
1179 sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
1180 } else {
1181 // need to ack this message if it is ignored as it is durable so
1182 // we check before we send. see: suppressMessageDispatch()
1183 }
1184 }
1185
1186 protected void removeDemandSubscription(ConsumerId id) throws IOException {
1187 DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1188 if (LOG.isDebugEnabled()) {
1189 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1190 }
1191 if (sub != null) {
1192 removeSubscription(sub);
1193 if (LOG.isDebugEnabled()) {
1194 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
1195 }
1196 }
1197 }
1198
1199 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1200 boolean removeDone = false;
1201 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1202 if (sub != null) {
1203 try {
1204 removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1205 removeDone = true;
1206 } catch (IOException e) {
1207 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1208 }
1209 }
1210 return removeDone;
1211 }
1212
1213 protected void waitStarted() throws InterruptedException {
1214 startedLatch.await();
1215 localBrokerIdKnownLatch.await();
1216 }
1217
1218 protected void clearDownSubscriptions() {
1219 subscriptionMapByLocalId.clear();
1220 subscriptionMapByRemoteId.clear();
1221 }
1222
1223 protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
1224
1225 protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;
1226
1227 protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
1228
1229 protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
1230
1231 protected abstract BrokerId[] getRemoteBrokerPath();
1232
1233 public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1234 this.networkBridgeListener = listener;
1235 }
1236
1237 private void fireBridgeFailed() {
1238 NetworkBridgeListener l = this.networkBridgeListener;
1239 if (l != null) {
1240 l.bridgeFailed();
1241 }
1242 }
1243
1244 public String getRemoteAddress() {
1245 return remoteBroker.getRemoteAddress();
1246 }
1247
1248 public String getLocalAddress() {
1249 return localBroker.getRemoteAddress();
1250 }
1251
1252 public String getRemoteBrokerName() {
1253 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1254 }
1255
1256 public String getLocalBrokerName() {
1257 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1258 }
1259
1260 public long getDequeueCounter() {
1261 return dequeueCounter.get();
1262 }
1263
1264 public long getEnqueueCounter() {
1265 return enqueueCounter.get();
1266 }
1267
1268 protected boolean isDuplex() {
1269 return configuration.isDuplex() || createdByDuplex;
1270 }
1271
1272 public void setBrokerService(BrokerService brokerService) {
1273 this.brokerService = brokerService;
1274 }
1275 }