001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Properties;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArrayList;
029 import java.util.concurrent.CountDownLatch;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.atomic.AtomicBoolean;
032 import java.util.concurrent.atomic.AtomicInteger;
033 import java.util.concurrent.atomic.AtomicReference;
034 import java.util.concurrent.locks.ReentrantReadWriteLock;
035
036 import javax.management.ObjectName;
037 import javax.transaction.xa.XAResource;
038
039 import org.apache.activemq.broker.ft.MasterBroker;
040 import org.apache.activemq.broker.region.ConnectionStatistics;
041 import org.apache.activemq.broker.region.RegionBroker;
042 import org.apache.activemq.command.BrokerId;
043 import org.apache.activemq.command.BrokerInfo;
044 import org.apache.activemq.command.Command;
045 import org.apache.activemq.command.CommandTypes;
046 import org.apache.activemq.command.ConnectionControl;
047 import org.apache.activemq.command.ConnectionError;
048 import org.apache.activemq.command.ConnectionId;
049 import org.apache.activemq.command.ConnectionInfo;
050 import org.apache.activemq.command.ConsumerControl;
051 import org.apache.activemq.command.ConsumerId;
052 import org.apache.activemq.command.ConsumerInfo;
053 import org.apache.activemq.command.ControlCommand;
054 import org.apache.activemq.command.DataArrayResponse;
055 import org.apache.activemq.command.DestinationInfo;
056 import org.apache.activemq.command.ExceptionResponse;
057 import org.apache.activemq.command.FlushCommand;
058 import org.apache.activemq.command.IntegerResponse;
059 import org.apache.activemq.command.KeepAliveInfo;
060 import org.apache.activemq.command.Message;
061 import org.apache.activemq.command.MessageAck;
062 import org.apache.activemq.command.MessageDispatch;
063 import org.apache.activemq.command.MessageDispatchNotification;
064 import org.apache.activemq.command.MessagePull;
065 import org.apache.activemq.command.ProducerAck;
066 import org.apache.activemq.command.ProducerId;
067 import org.apache.activemq.command.ProducerInfo;
068 import org.apache.activemq.command.RemoveSubscriptionInfo;
069 import org.apache.activemq.command.Response;
070 import org.apache.activemq.command.SessionId;
071 import org.apache.activemq.command.SessionInfo;
072 import org.apache.activemq.command.ShutdownInfo;
073 import org.apache.activemq.command.TransactionId;
074 import org.apache.activemq.command.TransactionInfo;
075 import org.apache.activemq.command.WireFormatInfo;
076 import org.apache.activemq.network.*;
077 import org.apache.activemq.security.MessageAuthorizationPolicy;
078 import org.apache.activemq.state.CommandVisitor;
079 import org.apache.activemq.state.ConnectionState;
080 import org.apache.activemq.state.ConsumerState;
081 import org.apache.activemq.state.ProducerState;
082 import org.apache.activemq.state.SessionState;
083 import org.apache.activemq.state.TransactionState;
084 import org.apache.activemq.thread.DefaultThreadPools;
085 import org.apache.activemq.thread.Task;
086 import org.apache.activemq.thread.TaskRunner;
087 import org.apache.activemq.thread.TaskRunnerFactory;
088 import org.apache.activemq.transaction.Transaction;
089 import org.apache.activemq.transport.DefaultTransportListener;
090 import org.apache.activemq.transport.ResponseCorrelator;
091 import org.apache.activemq.transport.Transport;
092 import org.apache.activemq.transport.TransportDisposedIOException;
093 import org.apache.activemq.transport.TransportFactory;
094 import org.apache.activemq.util.*;
095 import org.slf4j.Logger;
096 import org.slf4j.LoggerFactory;
097 import org.slf4j.MDC;
098
099 public class TransportConnection implements Connection, Task, CommandVisitor {
100 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
101 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
102 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
103 // Keeps track of the broker and connector that created this connection.
104 protected final Broker broker;
105 protected final TransportConnector connector;
106 // Keeps track of the state of the connections.
107 // protected final ConcurrentHashMap localConnectionStates=new
108 // ConcurrentHashMap();
109 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
110 // The broker and wireformat info that was exchanged.
111 protected BrokerInfo brokerInfo;
112 protected final List<Command> dispatchQueue = new LinkedList<Command>();
113 protected TaskRunner taskRunner;
114 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
115 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
116 private MasterBroker masterBroker;
117 private final Transport transport;
118 private MessageAuthorizationPolicy messageAuthorizationPolicy;
119 private WireFormatInfo wireFormatInfo;
120 // Used to do async dispatch.. this should perhaps be pushed down into the
121 // transport layer..
122 private boolean inServiceException;
123 private final ConnectionStatistics statistics = new ConnectionStatistics();
124 private boolean manageable;
125 private boolean slow;
126 private boolean markedCandidate;
127 private boolean blockedCandidate;
128 private boolean blocked;
129 private boolean connected;
130 private boolean active;
131 private boolean starting;
132 private boolean pendingStop;
133 private long timeStamp;
134 private final AtomicBoolean stopping = new AtomicBoolean(false);
135 private final CountDownLatch stopped = new CountDownLatch(1);
136 private final AtomicBoolean asyncException = new AtomicBoolean(false);
137 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
138 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
139 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
140 private ConnectionContext context;
141 private boolean networkConnection;
142 private boolean faultTolerantConnection;
143 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
144 private DemandForwardingBridge duplexBridge;
145 private final TaskRunnerFactory taskRunnerFactory;
146 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
147 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
148 private String duplexNetworkConnectorId;
149
150 /**
151 * @param connector
152 * @param transport
153 * @param broker
154 * @param taskRunnerFactory
155 * - can be null if you want direct dispatch to the transport
156 * else commands are sent async.
157 */
158 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
159 TaskRunnerFactory taskRunnerFactory) {
160 this.connector = connector;
161 this.broker = broker;
162 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
163 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
164 brokerConnectionStates = rb.getConnectionStates();
165 if (connector != null) {
166 this.statistics.setParent(connector.getStatistics());
167 }
168 this.taskRunnerFactory = taskRunnerFactory;
169 this.transport = transport;
170 this.transport.setTransportListener(new DefaultTransportListener() {
171 @Override
172 public void onCommand(Object o) {
173 serviceLock.readLock().lock();
174 try {
175 if (!(o instanceof Command)) {
176 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
177 }
178 Command command = (Command) o;
179 Response response = service(command);
180 if (response != null) {
181 dispatchSync(response);
182 }
183 } finally {
184 serviceLock.readLock().unlock();
185 }
186 }
187
188 @Override
189 public void onException(IOException exception) {
190 serviceLock.readLock().lock();
191 try {
192 serviceTransportException(exception);
193 } finally {
194 serviceLock.readLock().unlock();
195 }
196 }
197 });
198 connected = true;
199 }
200
201 /**
202 * Returns the number of messages to be dispatched to this connection
203 *
204 * @return size of dispatch queue
205 */
206 public int getDispatchQueueSize() {
207 synchronized (dispatchQueue) {
208 return dispatchQueue.size();
209 }
210 }
211
212 public void serviceTransportException(IOException e) {
213 BrokerService bService = connector.getBrokerService();
214 if (bService.isShutdownOnSlaveFailure()) {
215 if (brokerInfo != null) {
216 if (brokerInfo.isSlaveBroker()) {
217 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
218 try {
219 doStop();
220 bService.stop();
221 } catch (Exception ex) {
222 LOG.warn("Failed to stop the master", ex);
223 }
224 }
225 }
226 }
227 if (!stopping.get()) {
228 transportException.set(e);
229 if (TRANSPORTLOG.isDebugEnabled()) {
230 TRANSPORTLOG.debug("Transport failed: " + e, e);
231 } else if (TRANSPORTLOG.isInfoEnabled()) {
232 TRANSPORTLOG.info("Transport failed: " + e);
233 }
234 stopAsync();
235 }
236 }
237
238 /**
239 * Calls the serviceException method in an async thread. Since handling a
240 * service exception closes a socket, we should not tie up broker threads
241 * since client sockets may hang or cause deadlocks.
242 *
243 * @param e
244 */
245 public void serviceExceptionAsync(final IOException e) {
246 if (asyncException.compareAndSet(false, true)) {
247 new Thread("Async Exception Handler") {
248 @Override
249 public void run() {
250 serviceException(e);
251 }
252 }.start();
253 }
254 }
255
256 /**
257 * Closes a clients connection due to a detected error. Errors are ignored
258 * if: the client is closing or broker is closing. Otherwise, the connection
259 * error transmitted to the client before stopping it's transport.
260 */
261 public void serviceException(Throwable e) {
262 // are we a transport exception such as not being able to dispatch
263 // synchronously to a transport
264 if (e instanceof IOException) {
265 serviceTransportException((IOException) e);
266 } else if (e.getClass() == BrokerStoppedException.class) {
267 // Handle the case where the broker is stopped
268 // But the client is still connected.
269 if (!stopping.get()) {
270 if (SERVICELOG.isDebugEnabled()) {
271 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
272 }
273 ConnectionError ce = new ConnectionError();
274 ce.setException(e);
275 dispatchSync(ce);
276 // Wait a little bit to try to get the output buffer to flush
277 // the exption notification to the client.
278 try {
279 Thread.sleep(500);
280 } catch (InterruptedException ie) {
281 Thread.currentThread().interrupt();
282 }
283 // Worst case is we just kill the connection before the
284 // notification gets to him.
285 stopAsync();
286 }
287 } else if (!stopping.get() && !inServiceException) {
288 inServiceException = true;
289 try {
290 SERVICELOG.warn("Async error occurred: " + e, e);
291 ConnectionError ce = new ConnectionError();
292 ce.setException(e);
293 dispatchAsync(ce);
294 } finally {
295 inServiceException = false;
296 }
297 }
298 }
299
300 public Response service(Command command) {
301 MDC.put("activemq.connector", connector.getUri().toString());
302 Response response = null;
303 boolean responseRequired = command.isResponseRequired();
304 int commandId = command.getCommandId();
305 try {
306 response = command.visit(this);
307 } catch (Throwable e) {
308 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
309 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
310 + " command: " + command + ", exception: " + e, e);
311 }
312 if (responseRequired) {
313 response = new ExceptionResponse(e);
314 if(e instanceof java.lang.SecurityException){
315 //still need to close this down - incase the peer of this transport doesn't play nice
316 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
317 }
318 } else {
319 serviceException(e);
320 }
321 }
322 if (responseRequired) {
323 if (response == null) {
324 response = new Response();
325 }
326 response.setCorrelationId(commandId);
327 }
328 // The context may have been flagged so that the response is not
329 // sent.
330 if (context != null) {
331 if (context.isDontSendReponse()) {
332 context.setDontSendReponse(false);
333 response = null;
334 }
335 context = null;
336 }
337 MDC.remove("activemq.connector");
338 return response;
339 }
340
341 public Response processKeepAlive(KeepAliveInfo info) throws Exception {
342 return null;
343 }
344
345 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
346 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
347 return null;
348 }
349
350 public Response processWireFormat(WireFormatInfo info) throws Exception {
351 wireFormatInfo = info;
352 protocolVersion.set(info.getVersion());
353 return null;
354 }
355
356 public Response processShutdown(ShutdownInfo info) throws Exception {
357 stopAsync();
358 return null;
359 }
360
361 public Response processFlush(FlushCommand command) throws Exception {
362 return null;
363 }
364
365 public Response processBeginTransaction(TransactionInfo info) throws Exception {
366 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
367 context = null;
368 if (cs != null) {
369 context = cs.getContext();
370 }
371 if (cs == null) {
372 throw new NullPointerException("Context is null");
373 }
374 // Avoid replaying dup commands
375 if (cs.getTransactionState(info.getTransactionId()) == null) {
376 cs.addTransactionState(info.getTransactionId());
377 broker.beginTransaction(context, info.getTransactionId());
378 }
379 return null;
380 }
381
382 public Response processEndTransaction(TransactionInfo info) throws Exception {
383 // No need to do anything. This packet is just sent by the client
384 // make sure he is synced with the server as commit command could
385 // come from a different connection.
386 return null;
387 }
388
389 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
390 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
391 context = null;
392 if (cs != null) {
393 context = cs.getContext();
394 }
395 if (cs == null) {
396 throw new NullPointerException("Context is null");
397 }
398 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
399 if (transactionState == null) {
400 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
401 + info.getTransactionId());
402 }
403 // Avoid dups.
404 if (!transactionState.isPrepared()) {
405 transactionState.setPrepared(true);
406 int result = broker.prepareTransaction(context, info.getTransactionId());
407 transactionState.setPreparedResult(result);
408 if (result == XAResource.XA_RDONLY) {
409 // we are done, no further rollback or commit from TM
410 cs.removeTransactionState(info.getTransactionId());
411 }
412 IntegerResponse response = new IntegerResponse(result);
413 return response;
414 } else {
415 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
416 return response;
417 }
418 }
419
420 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
421 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
422 context = cs.getContext();
423 cs.removeTransactionState(info.getTransactionId());
424 broker.commitTransaction(context, info.getTransactionId(), true);
425 return null;
426 }
427
428 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
429 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
430 context = cs.getContext();
431 cs.removeTransactionState(info.getTransactionId());
432 broker.commitTransaction(context, info.getTransactionId(), false);
433 return null;
434 }
435
436 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
437 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
438 context = cs.getContext();
439 cs.removeTransactionState(info.getTransactionId());
440 broker.rollbackTransaction(context, info.getTransactionId());
441 return null;
442 }
443
444 public Response processForgetTransaction(TransactionInfo info) throws Exception {
445 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
446 context = cs.getContext();
447 broker.forgetTransaction(context, info.getTransactionId());
448 return null;
449 }
450
451 public Response processRecoverTransactions(TransactionInfo info) throws Exception {
452 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
453 context = cs.getContext();
454 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
455 return new DataArrayResponse(preparedTransactions);
456 }
457
458 public Response processMessage(Message messageSend) throws Exception {
459 ProducerId producerId = messageSend.getProducerId();
460 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
461 if (producerExchange.canDispatch(messageSend)) {
462 broker.send(producerExchange, messageSend);
463 }
464 return null;
465 }
466
467 public Response processMessageAck(MessageAck ack) throws Exception {
468 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
469 broker.acknowledge(consumerExchange, ack);
470 return null;
471 }
472
473 public Response processMessagePull(MessagePull pull) throws Exception {
474 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
475 }
476
477 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
478 broker.processDispatchNotification(notification);
479 return null;
480 }
481
482 public Response processAddDestination(DestinationInfo info) throws Exception {
483 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
484 broker.addDestinationInfo(cs.getContext(), info);
485 if (info.getDestination().isTemporary()) {
486 cs.addTempDestination(info);
487 }
488 return null;
489 }
490
491 public Response processRemoveDestination(DestinationInfo info) throws Exception {
492 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
493 broker.removeDestinationInfo(cs.getContext(), info);
494 if (info.getDestination().isTemporary()) {
495 cs.removeTempDestination(info.getDestination());
496 }
497 return null;
498 }
499
500 public Response processAddProducer(ProducerInfo info) throws Exception {
501 SessionId sessionId = info.getProducerId().getParentId();
502 ConnectionId connectionId = sessionId.getParentId();
503 TransportConnectionState cs = lookupConnectionState(connectionId);
504 SessionState ss = cs.getSessionState(sessionId);
505 if (ss == null) {
506 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
507 + sessionId);
508 }
509 // Avoid replaying dup commands
510 if (!ss.getProducerIds().contains(info.getProducerId())) {
511 broker.addProducer(cs.getContext(), info);
512 try {
513 ss.addProducer(info);
514 } catch (IllegalStateException e) {
515 broker.removeProducer(cs.getContext(), info);
516 }
517 }
518 return null;
519 }
520
521 public Response processRemoveProducer(ProducerId id) throws Exception {
522 SessionId sessionId = id.getParentId();
523 ConnectionId connectionId = sessionId.getParentId();
524 TransportConnectionState cs = lookupConnectionState(connectionId);
525 SessionState ss = cs.getSessionState(sessionId);
526 if (ss == null) {
527 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
528 + sessionId);
529 }
530 ProducerState ps = ss.removeProducer(id);
531 if (ps == null) {
532 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
533 }
534 removeProducerBrokerExchange(id);
535 broker.removeProducer(cs.getContext(), ps.getInfo());
536 return null;
537 }
538
539 public Response processAddConsumer(ConsumerInfo info) throws Exception {
540 SessionId sessionId = info.getConsumerId().getParentId();
541 ConnectionId connectionId = sessionId.getParentId();
542 TransportConnectionState cs = lookupConnectionState(connectionId);
543 SessionState ss = cs.getSessionState(sessionId);
544 if (ss == null) {
545 throw new IllegalStateException(broker.getBrokerName()
546 + " Cannot add a consumer to a session that had not been registered: " + sessionId);
547 }
548 // Avoid replaying dup commands
549 if (!ss.getConsumerIds().contains(info.getConsumerId())) {
550 broker.addConsumer(cs.getContext(), info);
551 try {
552 ss.addConsumer(info);
553 } catch (IllegalStateException e) {
554 broker.removeConsumer(cs.getContext(), info);
555 }
556 }
557 return null;
558 }
559
560 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
561 SessionId sessionId = id.getParentId();
562 ConnectionId connectionId = sessionId.getParentId();
563 TransportConnectionState cs = lookupConnectionState(connectionId);
564 if (cs == null) {
565 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
566 + connectionId);
567 }
568 SessionState ss = cs.getSessionState(sessionId);
569 if (ss == null) {
570 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
571 + sessionId);
572 }
573 ConsumerState consumerState = ss.removeConsumer(id);
574 if (consumerState == null) {
575 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
576 }
577 ConsumerInfo info = consumerState.getInfo();
578 info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
579 broker.removeConsumer(cs.getContext(), consumerState.getInfo());
580 removeConsumerBrokerExchange(id);
581 return null;
582 }
583
584 public Response processAddSession(SessionInfo info) throws Exception {
585 ConnectionId connectionId = info.getSessionId().getParentId();
586 TransportConnectionState cs = lookupConnectionState(connectionId);
587 // Avoid replaying dup commands
588 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
589 broker.addSession(cs.getContext(), info);
590 try {
591 cs.addSession(info);
592 } catch (IllegalStateException e) {
593 e.printStackTrace();
594 broker.removeSession(cs.getContext(), info);
595 }
596 }
597 return null;
598 }
599
600 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
601 ConnectionId connectionId = id.getParentId();
602 TransportConnectionState cs = lookupConnectionState(connectionId);
603 if (cs == null) {
604 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
605 }
606 SessionState session = cs.getSessionState(id);
607 if (session == null) {
608 throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
609 }
610 // Don't let new consumers or producers get added while we are closing
611 // this down.
612 session.shutdown();
613 // Cascade the connection stop to the consumers and producers.
614 for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
615 ConsumerId consumerId = (ConsumerId) iter.next();
616 try {
617 processRemoveConsumer(consumerId, lastDeliveredSequenceId);
618 } catch (Throwable e) {
619 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
620 }
621 }
622 for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
623 ProducerId producerId = (ProducerId) iter.next();
624 try {
625 processRemoveProducer(producerId);
626 } catch (Throwable e) {
627 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
628 }
629 }
630 cs.removeSession(id);
631 broker.removeSession(cs.getContext(), session.getInfo());
632 return null;
633 }
634
635 public Response processAddConnection(ConnectionInfo info) throws Exception {
636 // if the broker service has slave attached, wait for the slave to be
637 // attached to allow client connection. slave connection is fine
638 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
639 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
640 ServiceSupport.dispose(transport);
641 return new ExceptionResponse(new Exception("Master's slave not attached yet."));
642 }
643 // Older clients should have been defaulting this field to true.. but
644 // they were not.
645 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
646 info.setClientMaster(true);
647 }
648 TransportConnectionState state;
649 // Make sure 2 concurrent connections by the same ID only generate 1
650 // TransportConnectionState object.
651 synchronized (brokerConnectionStates) {
652 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
653 if (state == null) {
654 state = new TransportConnectionState(info, this);
655 brokerConnectionStates.put(info.getConnectionId(), state);
656 }
657 state.incrementReference();
658 }
659 // If there are 2 concurrent connections for the same connection id,
660 // then last one in wins, we need to sync here
661 // to figure out the winner.
662 synchronized (state.getConnectionMutex()) {
663 if (state.getConnection() != this) {
664 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
665 state.getConnection().stop();
666 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
667 + state.getConnection().getRemoteAddress());
668 state.setConnection(this);
669 state.reset(info);
670 }
671 }
672 registerConnectionState(info.getConnectionId(), state);
673 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
674 this.faultTolerantConnection=info.isFaultTolerant();
675 // Setup the context.
676 String clientId = info.getClientId();
677 context = new ConnectionContext();
678 context.setBroker(broker);
679 context.setClientId(clientId);
680 context.setClientMaster(info.isClientMaster());
681 context.setConnection(this);
682 context.setConnectionId(info.getConnectionId());
683 context.setConnector(connector);
684 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
685 context.setNetworkConnection(networkConnection);
686 context.setFaultTolerant(faultTolerantConnection);
687 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
688 context.setUserName(info.getUserName());
689 context.setWireFormatInfo(wireFormatInfo);
690 context.setReconnect(info.isFailoverReconnect());
691 this.manageable = info.isManageable();
692 state.setContext(context);
693 state.setConnection(this);
694
695 try {
696 broker.addConnection(context, info);
697 } catch (Exception e) {
698 synchronized (brokerConnectionStates) {
699 brokerConnectionStates.remove(info.getConnectionId());
700 }
701 unregisterConnectionState(info.getConnectionId());
702 LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
703 if (LOG.isDebugEnabled()) {
704 LOG.debug("Exception detail:", e);
705 }
706 throw e;
707 }
708 if (info.isManageable()) {
709 // send ConnectionCommand
710 ConnectionControl command = this.connector.getConnectionControl();
711 command.setFaultTolerant(broker.isFaultTolerantConfiguration());
712 dispatchAsync(command);
713 }
714 return null;
715 }
716
717 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
718 throws InterruptedException {
719 LOG.debug("remove connection id: " + id);
720 TransportConnectionState cs = lookupConnectionState(id);
721 if (cs != null) {
722 // Don't allow things to be added to the connection state while we
723 // are
724 // shutting down.
725 cs.shutdown();
726 // Cascade the connection stop to the sessions.
727 for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
728 SessionId sessionId = (SessionId) iter.next();
729 try {
730 processRemoveSession(sessionId, lastDeliveredSequenceId);
731 } catch (Throwable e) {
732 SERVICELOG.warn("Failed to remove session " + sessionId, e);
733 }
734 }
735 // Cascade the connection stop to temp destinations.
736 for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
737 DestinationInfo di = (DestinationInfo) iter.next();
738 try {
739 broker.removeDestination(cs.getContext(), di.getDestination(), 0);
740 } catch (Throwable e) {
741 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
742 }
743 iter.remove();
744 }
745 try {
746 broker.removeConnection(cs.getContext(), cs.getInfo(), null);
747 } catch (Throwable e) {
748 SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
749 if (LOG.isDebugEnabled()) {
750 SERVICELOG.debug("Exception detail:", e);
751 }
752 }
753 TransportConnectionState state = unregisterConnectionState(id);
754 if (state != null) {
755 synchronized (brokerConnectionStates) {
756 // If we are the last reference, we should remove the state
757 // from the broker.
758 if (state.decrementReference() == 0) {
759 brokerConnectionStates.remove(id);
760 }
761 }
762 }
763 }
764 return null;
765 }
766
767 public Response processProducerAck(ProducerAck ack) throws Exception {
768 // A broker should not get ProducerAck messages.
769 return null;
770 }
771
772 public Connector getConnector() {
773 return connector;
774 }
775
776 public void dispatchSync(Command message) {
777 // getStatistics().getEnqueues().increment();
778 try {
779 processDispatch(message);
780 } catch (IOException e) {
781 serviceExceptionAsync(e);
782 }
783 }
784
785 public void dispatchAsync(Command message) {
786 if (!stopping.get()) {
787 // getStatistics().getEnqueues().increment();
788 if (taskRunner == null) {
789 dispatchSync(message);
790 } else {
791 synchronized (dispatchQueue) {
792 dispatchQueue.add(message);
793 }
794 try {
795 taskRunner.wakeup();
796 } catch (InterruptedException e) {
797 Thread.currentThread().interrupt();
798 }
799 }
800 } else {
801 if (message.isMessageDispatch()) {
802 MessageDispatch md = (MessageDispatch) message;
803 Runnable sub = md.getTransmitCallback();
804 broker.postProcessDispatch(md);
805 if (sub != null) {
806 sub.run();
807 }
808 }
809 }
810 }
811
812 protected void processDispatch(Command command) throws IOException {
813 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
814 try {
815 if (!stopping.get()) {
816 if (messageDispatch != null) {
817 broker.preProcessDispatch(messageDispatch);
818 }
819 dispatch(command);
820 }
821 } finally {
822 if (messageDispatch != null) {
823 Runnable sub = messageDispatch.getTransmitCallback();
824 broker.postProcessDispatch(messageDispatch);
825 if (sub != null) {
826 sub.run();
827 }
828 }
829 // getStatistics().getDequeues().increment();
830 }
831 }
832
833 public boolean iterate() {
834 try {
835 if (stopping.get()) {
836 if (dispatchStopped.compareAndSet(false, true)) {
837 if (transportException.get() == null) {
838 try {
839 dispatch(new ShutdownInfo());
840 } catch (Throwable ignore) {
841 }
842 }
843 dispatchStoppedLatch.countDown();
844 }
845 return false;
846 }
847 if (!dispatchStopped.get()) {
848 Command command = null;
849 synchronized (dispatchQueue) {
850 if (dispatchQueue.isEmpty()) {
851 return false;
852 }
853 command = dispatchQueue.remove(0);
854 }
855 processDispatch(command);
856 return true;
857 }
858 return false;
859 } catch (IOException e) {
860 if (dispatchStopped.compareAndSet(false, true)) {
861 dispatchStoppedLatch.countDown();
862 }
863 serviceExceptionAsync(e);
864 return false;
865 }
866 }
867
868 /**
869 * Returns the statistics for this connection
870 */
871 public ConnectionStatistics getStatistics() {
872 return statistics;
873 }
874
875 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
876 return messageAuthorizationPolicy;
877 }
878
879 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
880 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
881 }
882
883 public boolean isManageable() {
884 return manageable;
885 }
886
887 public void start() throws Exception {
888 starting = true;
889 try {
890 synchronized (this) {
891 if (taskRunnerFactory != null) {
892 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
893 + getRemoteAddress());
894 } else {
895 taskRunner = null;
896 }
897 transport.start();
898 active = true;
899 BrokerInfo info = connector.getBrokerInfo().copy();
900 if (connector.isUpdateClusterClients()) {
901 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
902 } else {
903 info.setPeerBrokerInfos(null);
904 }
905 dispatchAsync(info);
906
907 connector.onStarted(this);
908 }
909 } catch (Exception e) {
910 // Force clean up on an error starting up.
911 stop();
912 throw e;
913 } finally {
914 // stop() can be called from within the above block,
915 // but we want to be sure start() completes before
916 // stop() runs, so queue the stop until right now:
917 starting = false;
918 if (pendingStop) {
919 LOG.debug("Calling the delayed stop()");
920 stop();
921 }
922 }
923 }
924
925 public void stop() throws Exception {
926 synchronized (this) {
927 pendingStop = true;
928 if (starting) {
929 LOG.debug("stop() called in the middle of start(). Delaying...");
930 return;
931 }
932 }
933 stopAsync();
934 while (!stopped.await(5, TimeUnit.SECONDS)) {
935 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
936 }
937 }
938
939 public void delayedStop(final int waitTime, final String reason) {
940 if (waitTime > 0) {
941 try {
942 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
943 public void run() {
944 try {
945 Thread.sleep(waitTime);
946 stopAsync();
947 LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
948 } catch (InterruptedException e) {
949 }
950 }
951 }, "delayedStop:" + transport.getRemoteAddress());
952 } catch (Throwable t) {
953 LOG.warn("cannot create stopAsync :", t);
954 }
955 }
956 }
957
958 public void stopAsync() {
959 // If we're in the middle of starting
960 // then go no further... for now.
961 if (stopping.compareAndSet(false, true)) {
962 // Let all the connection contexts know we are shutting down
963 // so that in progress operations can notice and unblock.
964 List<TransportConnectionState> connectionStates = listConnectionStates();
965 for (TransportConnectionState cs : connectionStates) {
966 cs.getContext().getStopping().set(true);
967 }
968 try {
969 final Map context = MDCHelper.getCopyOfContextMap();
970 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
971 public void run() {
972 serviceLock.writeLock().lock();
973 try {
974 MDCHelper.setContextMap(context);
975 doStop();
976 } catch (Throwable e) {
977 LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
978 + "': ", e);
979 } finally {
980 stopped.countDown();
981 serviceLock.writeLock().unlock();
982 }
983 }
984 }, "StopAsync:" + transport.getRemoteAddress());
985 } catch (Throwable t) {
986 LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
987 stopped.countDown();
988 }
989 }
990 }
991
992 @Override
993 public String toString() {
994 return "Transport Connection to: " + transport.getRemoteAddress();
995 }
996
997 protected void doStop() throws Exception, InterruptedException {
998 LOG.debug("Stopping connection: " + transport.getRemoteAddress());
999 connector.onStopped(this);
1000 try {
1001 synchronized (this) {
1002 if (masterBroker != null) {
1003 masterBroker.stop();
1004 }
1005 if (duplexBridge != null) {
1006 duplexBridge.stop();
1007 }
1008 }
1009 } catch (Exception ignore) {
1010 LOG.trace("Exception caught stopping", ignore);
1011 }
1012 try {
1013 transport.stop();
1014 LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1015 } catch (Exception e) {
1016 LOG.debug("Could not stop transport: " + e, e);
1017 }
1018 if (taskRunner != null) {
1019 taskRunner.shutdown(1);
1020 }
1021 active = false;
1022 // Run the MessageDispatch callbacks so that message references get
1023 // cleaned up.
1024 synchronized (dispatchQueue) {
1025 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
1026 Command command = iter.next();
1027 if (command.isMessageDispatch()) {
1028 MessageDispatch md = (MessageDispatch) command;
1029 Runnable sub = md.getTransmitCallback();
1030 broker.postProcessDispatch(md);
1031 if (sub != null) {
1032 sub.run();
1033 }
1034 }
1035 }
1036 dispatchQueue.clear();
1037 }
1038 //
1039 // Remove all logical connection associated with this connection
1040 // from the broker.
1041 if (!broker.isStopped()) {
1042 List<TransportConnectionState> connectionStates = listConnectionStates();
1043 connectionStates = listConnectionStates();
1044 for (TransportConnectionState cs : connectionStates) {
1045 cs.getContext().getStopping().set(true);
1046 try {
1047 LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1048 processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1049 } catch (Throwable ignore) {
1050 ignore.printStackTrace();
1051 }
1052 }
1053 }
1054 LOG.debug("Connection Stopped: " + getRemoteAddress());
1055 }
1056
1057 /**
1058 * @return Returns the blockedCandidate.
1059 */
1060 public boolean isBlockedCandidate() {
1061 return blockedCandidate;
1062 }
1063
1064 /**
1065 * @param blockedCandidate
1066 * The blockedCandidate to set.
1067 */
1068 public void setBlockedCandidate(boolean blockedCandidate) {
1069 this.blockedCandidate = blockedCandidate;
1070 }
1071
1072 /**
1073 * @return Returns the markedCandidate.
1074 */
1075 public boolean isMarkedCandidate() {
1076 return markedCandidate;
1077 }
1078
1079 /**
1080 * @param markedCandidate
1081 * The markedCandidate to set.
1082 */
1083 public void setMarkedCandidate(boolean markedCandidate) {
1084 this.markedCandidate = markedCandidate;
1085 if (!markedCandidate) {
1086 timeStamp = 0;
1087 blockedCandidate = false;
1088 }
1089 }
1090
1091 /**
1092 * @param slow
1093 * The slow to set.
1094 */
1095 public void setSlow(boolean slow) {
1096 this.slow = slow;
1097 }
1098
1099 /**
1100 * @return true if the Connection is slow
1101 */
1102 public boolean isSlow() {
1103 return slow;
1104 }
1105
1106 /**
1107 * @return true if the Connection is potentially blocked
1108 */
1109 public boolean isMarkedBlockedCandidate() {
1110 return markedCandidate;
1111 }
1112
1113 /**
1114 * Mark the Connection, so we can deem if it's collectable on the next sweep
1115 */
1116 public void doMark() {
1117 if (timeStamp == 0) {
1118 timeStamp = System.currentTimeMillis();
1119 }
1120 }
1121
1122 /**
1123 * @return if after being marked, the Connection is still writing
1124 */
1125 public boolean isBlocked() {
1126 return blocked;
1127 }
1128
1129 /**
1130 * @return true if the Connection is connected
1131 */
1132 public boolean isConnected() {
1133 return connected;
1134 }
1135
1136 /**
1137 * @param blocked
1138 * The blocked to set.
1139 */
1140 public void setBlocked(boolean blocked) {
1141 this.blocked = blocked;
1142 }
1143
1144 /**
1145 * @param connected
1146 * The connected to set.
1147 */
1148 public void setConnected(boolean connected) {
1149 this.connected = connected;
1150 }
1151
1152 /**
1153 * @return true if the Connection is active
1154 */
1155 public boolean isActive() {
1156 return active;
1157 }
1158
1159 /**
1160 * @param active
1161 * The active to set.
1162 */
1163 public void setActive(boolean active) {
1164 this.active = active;
1165 }
1166
1167 /**
1168 * @return true if the Connection is starting
1169 */
1170 public synchronized boolean isStarting() {
1171 return starting;
1172 }
1173
1174 public synchronized boolean isNetworkConnection() {
1175 return networkConnection;
1176 }
1177
1178 public boolean isFaultTolerantConnection() {
1179 return this.faultTolerantConnection;
1180 }
1181
1182 protected synchronized void setStarting(boolean starting) {
1183 this.starting = starting;
1184 }
1185
1186 /**
1187 * @return true if the Connection needs to stop
1188 */
1189 public synchronized boolean isPendingStop() {
1190 return pendingStop;
1191 }
1192
1193 protected synchronized void setPendingStop(boolean pendingStop) {
1194 this.pendingStop = pendingStop;
1195 }
1196
1197 public Response processBrokerInfo(BrokerInfo info) {
1198 if (info.isSlaveBroker()) {
1199 BrokerService bService = connector.getBrokerService();
1200 // Do we only support passive slaves - or does the slave want to be
1201 // passive ?
1202 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1203 if (passive == false) {
1204
1205 // stream messages from this broker (the master) to
1206 // the slave
1207 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1208 masterBroker = new MasterBroker(parent, transport);
1209 masterBroker.startProcessing();
1210 }
1211 LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
1212 bService.slaveConnectionEstablished();
1213 } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1214 // so this TransportConnection is the rear end of a network bridge
1215 // We have been requested to create a two way pipe ...
1216 try {
1217 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1218 Map<String, String> props = createMap(properties);
1219 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1220 IntrospectionSupport.setProperties(config, props, "");
1221 config.setBrokerName(broker.getBrokerName());
1222
1223 // check for existing duplex connection hanging about
1224
1225 // We first look if existing network connection already exists for the same broker Id and network connector name
1226 // It's possible in case of brief network fault to have this transport connector side of the connection always active
1227 // and the duplex network connector side wanting to open a new one
1228 // In this case, the old connection must be broken
1229 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1230 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1231 synchronized (connections) {
1232 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
1233 TransportConnection c = iter.next();
1234 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1235 LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1236 c.stopAsync();
1237 // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1238 c.getStopped().await(1, TimeUnit.SECONDS);
1239 }
1240 }
1241 setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1242 }
1243 URI uri = broker.getVmConnectorURI();
1244 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1245 map.put("network", "true");
1246 map.put("async", "false");
1247 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1248 Transport localTransport = TransportFactory.connect(uri);
1249 Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1250 String duplexName = localTransport.toString();
1251 if (duplexName.contains("#")) {
1252 duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1253 }
1254 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1255 listener.setCreatedByDuplex(true);
1256 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1257 duplexBridge.setBrokerService(broker.getBrokerService());
1258 // now turn duplex off this side
1259 info.setDuplexConnection(false);
1260 duplexBridge.setCreatedByDuplex(true);
1261 duplexBridge.duplexStart(this, brokerInfo, info);
1262 LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1263 return null;
1264 } catch (TransportDisposedIOException e) {
1265 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1266 return null;
1267 } catch (Exception e) {
1268 LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
1269 return null;
1270 }
1271 }
1272 // We only expect to get one broker info command per connection
1273 if (this.brokerInfo != null) {
1274 LOG.warn("Unexpected extra broker info command received: " + info);
1275 }
1276 this.brokerInfo = info;
1277 networkConnection = true;
1278 List<TransportConnectionState> connectionStates = listConnectionStates();
1279 for (TransportConnectionState cs : connectionStates) {
1280 cs.getContext().setNetworkConnection(true);
1281 }
1282 return null;
1283 }
1284
1285 @SuppressWarnings("unchecked")
1286 private HashMap<String, String> createMap(Properties properties) {
1287 return new HashMap(properties);
1288 }
1289
1290 protected void dispatch(Command command) throws IOException {
1291 try {
1292 setMarkedCandidate(true);
1293 transport.oneway(command);
1294 } finally {
1295 setMarkedCandidate(false);
1296 }
1297 }
1298
1299 public String getRemoteAddress() {
1300 return transport.getRemoteAddress();
1301 }
1302
1303 public String getConnectionId() {
1304 List<TransportConnectionState> connectionStates = listConnectionStates();
1305 for (TransportConnectionState cs : connectionStates) {
1306 if (cs.getInfo().getClientId() != null) {
1307 return cs.getInfo().getClientId();
1308 }
1309 return cs.getInfo().getConnectionId().toString();
1310 }
1311 return null;
1312 }
1313
1314 public void updateClient(ConnectionControl control) {
1315 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1316 && this.wireFormatInfo.getVersion() >= 6) {
1317 dispatchAsync(control);
1318 }
1319 }
1320
1321 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1322 ProducerBrokerExchange result = producerExchanges.get(id);
1323 if (result == null) {
1324 synchronized (producerExchanges) {
1325 result = new ProducerBrokerExchange();
1326 TransportConnectionState state = lookupConnectionState(id);
1327 context = state.getContext();
1328 if (context.isReconnect()) {
1329 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1330 }
1331 result.setConnectionContext(context);
1332 SessionState ss = state.getSessionState(id.getParentId());
1333 if (ss != null) {
1334 result.setProducerState(ss.getProducerState(id));
1335 ProducerState producerState = ss.getProducerState(id);
1336 if (producerState != null && producerState.getInfo() != null) {
1337 ProducerInfo info = producerState.getInfo();
1338 result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1339 }
1340 }
1341 producerExchanges.put(id, result);
1342 }
1343 } else {
1344 context = result.getConnectionContext();
1345 }
1346 return result;
1347 }
1348
1349 private void removeProducerBrokerExchange(ProducerId id) {
1350 synchronized (producerExchanges) {
1351 producerExchanges.remove(id);
1352 }
1353 }
1354
1355 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1356 ConsumerBrokerExchange result = consumerExchanges.get(id);
1357 if (result == null) {
1358 synchronized (consumerExchanges) {
1359 result = new ConsumerBrokerExchange();
1360 TransportConnectionState state = lookupConnectionState(id);
1361 context = state.getContext();
1362 result.setConnectionContext(context);
1363 SessionState ss = state.getSessionState(id.getParentId());
1364 if (ss != null) {
1365 ConsumerState cs = ss.getConsumerState(id);
1366 if (cs != null) {
1367 ConsumerInfo info = cs.getInfo();
1368 if (info != null) {
1369 if (info.getDestination() != null && info.getDestination().isPattern()) {
1370 result.setWildcard(true);
1371 }
1372 }
1373 }
1374 }
1375 consumerExchanges.put(id, result);
1376 }
1377 }
1378 return result;
1379 }
1380
1381 private void removeConsumerBrokerExchange(ConsumerId id) {
1382 synchronized (consumerExchanges) {
1383 consumerExchanges.remove(id);
1384 }
1385 }
1386
1387 public int getProtocolVersion() {
1388 return protocolVersion.get();
1389 }
1390
1391 public Response processControlCommand(ControlCommand command) throws Exception {
1392 String control = command.getCommand();
1393 if (control != null && control.equals("shutdown")) {
1394 System.exit(0);
1395 }
1396 return null;
1397 }
1398
1399 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1400 return null;
1401 }
1402
1403 public Response processConnectionControl(ConnectionControl control) throws Exception {
1404 if (control != null) {
1405 faultTolerantConnection = control.isFaultTolerant();
1406 }
1407 return null;
1408 }
1409
1410 public Response processConnectionError(ConnectionError error) throws Exception {
1411 return null;
1412 }
1413
1414 public Response processConsumerControl(ConsumerControl control) throws Exception {
1415 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1416 broker.processConsumerControl(consumerExchange, control);
1417 return null;
1418 }
1419
1420 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1421 TransportConnectionState state) {
1422 TransportConnectionState cs = null;
1423 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1424 // swap implementations
1425 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1426 newRegister.intialize(connectionStateRegister);
1427 connectionStateRegister = newRegister;
1428 }
1429 cs = connectionStateRegister.registerConnectionState(connectionId, state);
1430 return cs;
1431 }
1432
1433 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1434 return connectionStateRegister.unregisterConnectionState(connectionId);
1435 }
1436
1437 protected synchronized List<TransportConnectionState> listConnectionStates() {
1438 return connectionStateRegister.listConnectionStates();
1439 }
1440
1441 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1442 return connectionStateRegister.lookupConnectionState(connectionId);
1443 }
1444
1445 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1446 return connectionStateRegister.lookupConnectionState(id);
1447 }
1448
1449 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1450 return connectionStateRegister.lookupConnectionState(id);
1451 }
1452
1453 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1454 return connectionStateRegister.lookupConnectionState(id);
1455 }
1456
1457 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1458 return connectionStateRegister.lookupConnectionState(connectionId);
1459 }
1460
1461 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1462 this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1463 }
1464
1465 protected synchronized String getDuplexNetworkConnectorId() {
1466 return this.duplexNetworkConnectorId;
1467 }
1468
1469 protected CountDownLatch getStopped() {
1470 return stopped;
1471 }
1472 }