001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.failover;
019
020 import java.io.BufferedReader;
021 import java.io.FileNotFoundException;
022 import java.io.FileReader;
023 import java.io.IOException;
024 import java.io.InputStreamReader;
025 import java.io.InterruptedIOException;
026 import java.net.InetAddress;
027 import java.net.MalformedURLException;
028 import java.net.URI;
029 import java.net.URL;
030 import java.util.ArrayList;
031 import java.util.HashSet;
032 import java.util.Iterator;
033 import java.util.LinkedHashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Set;
037 import java.util.StringTokenizer;
038 import java.util.concurrent.CopyOnWriteArrayList;
039 import java.util.concurrent.atomic.AtomicReference;
040 import org.apache.activemq.broker.SslContext;
041 import org.apache.activemq.command.Command;
042 import org.apache.activemq.command.ConnectionControl;
043 import org.apache.activemq.command.ConnectionId;
044 import org.apache.activemq.command.RemoveInfo;
045 import org.apache.activemq.command.Response;
046 import org.apache.activemq.state.ConnectionStateTracker;
047 import org.apache.activemq.state.Tracked;
048 import org.apache.activemq.thread.DefaultThreadPools;
049 import org.apache.activemq.thread.Task;
050 import org.apache.activemq.thread.TaskRunner;
051 import org.apache.activemq.transport.CompositeTransport;
052 import org.apache.activemq.transport.DefaultTransportListener;
053 import org.apache.activemq.transport.FutureResponse;
054 import org.apache.activemq.transport.ResponseCallback;
055 import org.apache.activemq.transport.Transport;
056 import org.apache.activemq.transport.TransportFactory;
057 import org.apache.activemq.transport.TransportListener;
058 import org.apache.activemq.util.IOExceptionSupport;
059 import org.apache.activemq.util.ServiceSupport;
060 import org.slf4j.Logger;
061 import org.slf4j.LoggerFactory;
062
063
064 /**
065 * A Transport that is made reliable by being able to fail over to another
066 * transport when a transport failure is detected.
067 *
068 *
069 */
070 public class FailoverTransport implements CompositeTransport {
071
072 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
073 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
074 private TransportListener transportListener;
075 private boolean disposed;
076 private boolean connected;
077 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
078 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
079
080 private final Object reconnectMutex = new Object();
081 private final Object backupMutex = new Object();
082 private final Object sleepMutex = new Object();
083 private final Object listenerMutex = new Object();
084 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
085 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
086
087 private URI connectedTransportURI;
088 private URI failedConnectTransportURI;
089 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
090 private final TaskRunner reconnectTask;
091 private boolean started;
092 private boolean initialized;
093 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094 private long maxReconnectDelay = 1000 * 30;
095 private double backOffMultiplier = 2d;
096 private long timeout = -1;
097 private boolean useExponentialBackOff = true;
098 private boolean randomize = true;
099 private int maxReconnectAttempts;
100 private int startupMaxReconnectAttempts;
101 private int connectFailures;
102 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
103 private Exception connectionFailure;
104 private boolean firstConnection = true;
105 // optionally always have a backup created
106 private boolean backup = false;
107 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
108 private int backupPoolSize = 1;
109 private boolean trackMessages = false;
110 private boolean trackTransactionProducers = true;
111 private int maxCacheSize = 128 * 1024;
112 private final TransportListener disposedListener = new DefaultTransportListener() {
113 };
114 //private boolean connectionInterruptProcessingComplete;
115
116 private final TransportListener myTransportListener = createTransportListener();
117 private boolean updateURIsSupported=true;
118 private boolean reconnectSupported=true;
119 // remember for reconnect thread
120 private SslContext brokerSslContext;
121 private String updateURIsURL = null;
122 private boolean rebalanceUpdateURIs=true;
123 private boolean doRebalance = false;
124
125 public FailoverTransport() throws InterruptedIOException {
126 brokerSslContext = SslContext.getCurrentSslContext();
127 stateTracker.setTrackTransactions(true);
128 // Setup a task that is used to reconnect the a connection async.
129 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
130 public boolean iterate() {
131 boolean result = false;
132 boolean buildBackup = true;
133 boolean doReconnect = !disposed;
134 synchronized (backupMutex) {
135 if ((connectedTransport.get() == null || doRebalance) && !disposed) {
136 result = doReconnect();
137 buildBackup = false;
138 }
139 }
140 if (buildBackup) {
141 buildBackups();
142 } else {
143 // build backups on the next iteration
144 buildBackup = true;
145 try {
146 reconnectTask.wakeup();
147 } catch (InterruptedException e) {
148 LOG.debug("Reconnect task has been interrupted.", e);
149 }
150 }
151 return result;
152 }
153
154 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
155 }
156
157 TransportListener createTransportListener() {
158 return new TransportListener() {
159 public void onCommand(Object o) {
160 Command command = (Command) o;
161 if (command == null) {
162 return;
163 }
164 if (command.isResponse()) {
165 Object object = null;
166 synchronized (requestMap) {
167 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
168 }
169 if (object != null && object.getClass() == Tracked.class) {
170 ((Tracked) object).onResponses(command);
171 }
172 }
173 if (!initialized) {
174 initialized = true;
175 }
176
177 if(command.isConnectionControl()) {
178 handleConnectionControl((ConnectionControl) command);
179 }
180 if (transportListener != null) {
181 transportListener.onCommand(command);
182 }
183 }
184
185 public void onException(IOException error) {
186 try {
187 handleTransportFailure(error);
188 } catch (InterruptedException e) {
189 Thread.currentThread().interrupt();
190 transportListener.onException(new InterruptedIOException());
191 }
192 }
193
194 public void transportInterupted() {
195 if (transportListener != null) {
196 transportListener.transportInterupted();
197 }
198 }
199
200 public void transportResumed() {
201 if (transportListener != null) {
202 transportListener.transportResumed();
203 }
204 }
205 };
206 }
207
208 public final void disposeTransport(Transport transport) {
209 transport.setTransportListener(disposedListener);
210 ServiceSupport.dispose(transport);
211 }
212
213 public final void handleTransportFailure(IOException e) throws InterruptedException {
214 if (LOG.isTraceEnabled()) {
215 LOG.trace(this + " handleTransportFailure: " + e);
216 }
217 Transport transport = connectedTransport.getAndSet(null);
218 if (transport == null) {
219 // sync with possible in progress reconnect
220 synchronized (reconnectMutex) {
221 transport = connectedTransport.getAndSet(null);
222 }
223 }
224 if (transport != null) {
225
226 disposeTransport(transport);
227
228 boolean reconnectOk = false;
229 synchronized (reconnectMutex) {
230 if (started) {
231 LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI
232 + " , attempting to automatically reconnect due to: " + e);
233 LOG.debug("Transport failed with the following exception:", e);
234 reconnectOk = true;
235 }
236 initialized = false;
237 failedConnectTransportURI = connectedTransportURI;
238 connectedTransportURI = null;
239 connected = false;
240
241 // notify before any reconnect attempt so ack state can be
242 // whacked
243 if (transportListener != null) {
244 transportListener.transportInterupted();
245 }
246
247 if (reconnectOk) {
248 reconnectTask.wakeup();
249 }
250 }
251 }
252 }
253
254 public final void handleConnectionControl(ConnectionControl control) {
255 String reconnectStr = control.getReconnectTo();
256 if (reconnectStr != null) {
257 reconnectStr = reconnectStr.trim();
258 if (reconnectStr.length() > 0) {
259 try {
260 URI uri = new URI(reconnectStr);
261 if (isReconnectSupported()) {
262 reconnect(uri);
263 LOG.info("Reconnected to: " + uri);
264 }
265 } catch (Exception e) {
266 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
267 }
268 }
269 }
270 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
271 }
272
273 private final void processNewTransports(boolean rebalance, String newTransports) {
274 if (newTransports != null) {
275 newTransports = newTransports.trim();
276 if (newTransports.length() > 0 && isUpdateURIsSupported()) {
277 List<URI> list = new ArrayList<URI>();
278 StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
279 while (tokenizer.hasMoreTokens()) {
280 String str = tokenizer.nextToken();
281 try {
282 URI uri = new URI(str);
283 list.add(uri);
284 } catch (Exception e) {
285 LOG.error("Failed to parse broker address: " + str, e);
286 }
287 }
288 if (list.isEmpty() == false) {
289 try {
290 updateURIs(rebalance, list.toArray(new URI[list.size()]));
291 } catch (IOException e) {
292 LOG.error("Failed to update transport URI's from: " + newTransports, e);
293 }
294 }
295
296 }
297 }
298 }
299
300 public void start() throws Exception {
301 synchronized (reconnectMutex) {
302 LOG.debug("Started.");
303 if (started) {
304 return;
305 }
306 started = true;
307 stateTracker.setMaxCacheSize(getMaxCacheSize());
308 stateTracker.setTrackMessages(isTrackMessages());
309 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
310 if (connectedTransport.get() != null) {
311 stateTracker.restore(connectedTransport.get());
312 } else {
313 reconnect(false);
314 }
315 }
316 }
317
318 public void stop() throws Exception {
319 Transport transportToStop = null;
320 synchronized (reconnectMutex) {
321 LOG.debug("Stopped.");
322 if (!started) {
323 return;
324 }
325 started = false;
326 disposed = true;
327 connected = false;
328 for (BackupTransport t : backups) {
329 t.setDisposed(true);
330 }
331 backups.clear();
332
333 if (connectedTransport.get() != null) {
334 transportToStop = connectedTransport.getAndSet(null);
335 }
336 reconnectMutex.notifyAll();
337 }
338 synchronized (sleepMutex) {
339 sleepMutex.notifyAll();
340 }
341 reconnectTask.shutdown();
342 if (transportToStop != null) {
343 transportToStop.stop();
344 }
345 }
346
347 public long getInitialReconnectDelay() {
348 return initialReconnectDelay;
349 }
350
351 public void setInitialReconnectDelay(long initialReconnectDelay) {
352 this.initialReconnectDelay = initialReconnectDelay;
353 }
354
355 public long getMaxReconnectDelay() {
356 return maxReconnectDelay;
357 }
358
359 public void setMaxReconnectDelay(long maxReconnectDelay) {
360 this.maxReconnectDelay = maxReconnectDelay;
361 }
362
363 public long getReconnectDelay() {
364 return reconnectDelay;
365 }
366
367 public void setReconnectDelay(long reconnectDelay) {
368 this.reconnectDelay = reconnectDelay;
369 }
370
371 public double getReconnectDelayExponent() {
372 return backOffMultiplier;
373 }
374
375 public void setReconnectDelayExponent(double reconnectDelayExponent) {
376 this.backOffMultiplier = reconnectDelayExponent;
377 }
378
379 public Transport getConnectedTransport() {
380 return connectedTransport.get();
381 }
382
383 public URI getConnectedTransportURI() {
384 return connectedTransportURI;
385 }
386
387 public int getMaxReconnectAttempts() {
388 return maxReconnectAttempts;
389 }
390
391 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
392 this.maxReconnectAttempts = maxReconnectAttempts;
393 }
394
395 public int getStartupMaxReconnectAttempts() {
396 return this.startupMaxReconnectAttempts;
397 }
398
399 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
400 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
401 }
402
403 public long getTimeout() {
404 return timeout;
405 }
406
407 public void setTimeout(long timeout) {
408 this.timeout = timeout;
409 }
410
411 /**
412 * @return Returns the randomize.
413 */
414 public boolean isRandomize() {
415 return randomize;
416 }
417
418 /**
419 * @param randomize
420 * The randomize to set.
421 */
422 public void setRandomize(boolean randomize) {
423 this.randomize = randomize;
424 }
425
426 public boolean isBackup() {
427 return backup;
428 }
429
430 public void setBackup(boolean backup) {
431 this.backup = backup;
432 }
433
434 public int getBackupPoolSize() {
435 return backupPoolSize;
436 }
437
438 public void setBackupPoolSize(int backupPoolSize) {
439 this.backupPoolSize = backupPoolSize;
440 }
441
442 public boolean isTrackMessages() {
443 return trackMessages;
444 }
445
446 public void setTrackMessages(boolean trackMessages) {
447 this.trackMessages = trackMessages;
448 }
449
450 public boolean isTrackTransactionProducers() {
451 return this.trackTransactionProducers;
452 }
453
454 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
455 this.trackTransactionProducers = trackTransactionProducers;
456 }
457
458 public int getMaxCacheSize() {
459 return maxCacheSize;
460 }
461
462 public void setMaxCacheSize(int maxCacheSize) {
463 this.maxCacheSize = maxCacheSize;
464 }
465
466 /**
467 * @return Returns true if the command is one sent when a connection is
468 * being closed.
469 */
470 private boolean isShutdownCommand(Command command) {
471 return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
472 }
473
474 public void oneway(Object o) throws IOException {
475
476 Command command = (Command) o;
477 Exception error = null;
478 try {
479
480 synchronized (reconnectMutex) {
481
482 if (isShutdownCommand(command) && connectedTransport.get() == null) {
483 if (command.isShutdownInfo()) {
484 // Skipping send of ShutdownInfo command when not
485 // connected.
486 return;
487 }
488 if (command instanceof RemoveInfo || command.isMessageAck()) {
489 // Simulate response to RemoveInfo command or ack (as it
490 // will be stale)
491 stateTracker.track(command);
492 Response response = new Response();
493 response.setCorrelationId(command.getCommandId());
494 myTransportListener.onCommand(response);
495 return;
496 }
497 }
498 // Keep trying until the message is sent.
499 for (int i = 0; !disposed; i++) {
500 try {
501
502 // Wait for transport to be connected.
503 Transport transport = connectedTransport.get();
504 long start = System.currentTimeMillis();
505 boolean timedout = false;
506 while (transport == null && !disposed && connectionFailure == null
507 && !Thread.currentThread().isInterrupted()) {
508 LOG.trace("Waiting for transport to reconnect..: " + command);
509 long end = System.currentTimeMillis();
510 if (timeout > 0 && (end - start > timeout)) {
511 timedout = true;
512 LOG.info("Failover timed out after " + (end - start) + "ms");
513 break;
514 }
515 try {
516 reconnectMutex.wait(100);
517 } catch (InterruptedException e) {
518 Thread.currentThread().interrupt();
519 LOG.debug("Interupted: " + e, e);
520 }
521 transport = connectedTransport.get();
522 }
523
524 if (transport == null) {
525 // Previous loop may have exited due to use being
526 // disposed.
527 if (disposed) {
528 error = new IOException("Transport disposed.");
529 } else if (connectionFailure != null) {
530 error = connectionFailure;
531 } else if (timedout == true) {
532 error = new IOException("Failover timeout of " + timeout + " ms reached.");
533 } else {
534 error = new IOException("Unexpected failure.");
535 }
536 break;
537 }
538
539 // If it was a request and it was not being tracked by
540 // the state tracker,
541 // then hold it in the requestMap so that we can replay
542 // it later.
543 Tracked tracked = stateTracker.track(command);
544 synchronized (requestMap) {
545 if (tracked != null && tracked.isWaitingForResponse()) {
546 requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
547 } else if (tracked == null && command.isResponseRequired()) {
548 requestMap.put(Integer.valueOf(command.getCommandId()), command);
549 }
550 }
551
552 // Send the message.
553 try {
554 transport.oneway(command);
555 stateTracker.trackBack(command);
556 } catch (IOException e) {
557
558 // If the command was not tracked.. we will retry in
559 // this method
560 if (tracked == null) {
561
562 // since we will retry in this method.. take it
563 // out of the request
564 // map so that it is not sent 2 times on
565 // recovery
566 if (command.isResponseRequired()) {
567 requestMap.remove(Integer.valueOf(command.getCommandId()));
568 }
569
570 // Rethrow the exception so it will handled by
571 // the outer catch
572 throw e;
573 }
574
575 }
576
577 return;
578
579 } catch (IOException e) {
580 if (LOG.isDebugEnabled()) {
581 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
582 }
583 handleTransportFailure(e);
584 }
585 }
586 }
587 } catch (InterruptedException e) {
588 // Some one may be trying to stop our thread.
589 Thread.currentThread().interrupt();
590 throw new InterruptedIOException();
591 }
592 if (!disposed) {
593 if (error != null) {
594 if (error instanceof IOException) {
595 throw (IOException) error;
596 }
597 throw IOExceptionSupport.create(error);
598 }
599 }
600 }
601
602 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
603 throw new AssertionError("Unsupported Method");
604 }
605
606 public Object request(Object command) throws IOException {
607 throw new AssertionError("Unsupported Method");
608 }
609
610 public Object request(Object command, int timeout) throws IOException {
611 throw new AssertionError("Unsupported Method");
612 }
613
614 public void add(boolean rebalance, URI u[]) {
615 boolean newURI = false;
616 for (int i = 0; i < u.length; i++) {
617 if (!contains(u[i])) {
618 uris.add(u[i]);
619 newURI = true;
620 }
621 }
622 if (newURI) {
623 reconnect(rebalance);
624 }
625 }
626
627 public void remove(boolean rebalance, URI u[]) {
628 for (int i = 0; i < u.length; i++) {
629 uris.remove(u[i]);
630 }
631 // rebalance is automatic if any connected to removed/stopped broker
632 }
633
634 public void add(boolean rebalance, String u) {
635 try {
636 URI newURI = new URI(u);
637 if (contains(newURI)==false) {
638 uris.add(newURI);
639 reconnect(rebalance);
640 }
641
642 } catch (Exception e) {
643 LOG.error("Failed to parse URI: " + u);
644 }
645 }
646
647 public void reconnect(boolean rebalance) {
648 synchronized (reconnectMutex) {
649 if (started) {
650 if (rebalance) {
651 doRebalance = true;
652 }
653 LOG.debug("Waking up reconnect task");
654 try {
655 reconnectTask.wakeup();
656 } catch (InterruptedException e) {
657 Thread.currentThread().interrupt();
658 }
659 } else {
660 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
661 }
662 }
663 }
664
665 private List<URI> getConnectList() {
666 ArrayList<URI> l = new ArrayList<URI>(uris);
667 boolean removed = false;
668 if (failedConnectTransportURI != null) {
669 removed = l.remove(failedConnectTransportURI);
670 }
671 if (randomize) {
672 // Randomly, reorder the list by random swapping
673 for (int i = 0; i < l.size(); i++) {
674 int p = (int) (Math.random() * 100 % l.size());
675 URI t = l.get(p);
676 l.set(p, l.get(i));
677 l.set(i, t);
678 }
679 }
680 if (removed) {
681 l.add(failedConnectTransportURI);
682 }
683 LOG.debug("urlList connectionList:" + l + ", from: " + uris);
684 return l;
685 }
686
687 public TransportListener getTransportListener() {
688 return transportListener;
689 }
690
691 public void setTransportListener(TransportListener commandListener) {
692 synchronized (listenerMutex) {
693 this.transportListener = commandListener;
694 listenerMutex.notifyAll();
695 }
696 }
697
698 public <T> T narrow(Class<T> target) {
699
700 if (target.isAssignableFrom(getClass())) {
701 return target.cast(this);
702 }
703 Transport transport = connectedTransport.get();
704 if (transport != null) {
705 return transport.narrow(target);
706 }
707 return null;
708
709 }
710
711 protected void restoreTransport(Transport t) throws Exception, IOException {
712 t.start();
713 // send information to the broker - informing it we are an ft client
714 ConnectionControl cc = new ConnectionControl();
715 cc.setFaultTolerant(true);
716 t.oneway(cc);
717 stateTracker.restore(t);
718 Map tmpMap = null;
719 synchronized (requestMap) {
720 tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
721 }
722 for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
723 Command command = iter2.next();
724 if (LOG.isTraceEnabled()) {
725 LOG.trace("restore requestMap, replay: " + command);
726 }
727 t.oneway(command);
728 }
729 }
730
731 public boolean isUseExponentialBackOff() {
732 return useExponentialBackOff;
733 }
734
735 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
736 this.useExponentialBackOff = useExponentialBackOff;
737 }
738
739 @Override
740 public String toString() {
741 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
742 }
743
744 public String getRemoteAddress() {
745 Transport transport = connectedTransport.get();
746 if (transport != null) {
747 return transport.getRemoteAddress();
748 }
749 return null;
750 }
751
752 public boolean isFaultTolerant() {
753 return true;
754 }
755
756 final boolean doReconnect() {
757 Exception failure = null;
758 synchronized (reconnectMutex) {
759
760 // If updateURIsURL is specified, read the file and add any new
761 // transport URI's to this FailOverTransport.
762 // Note: Could track file timestamp to avoid unnecessary reading.
763 String fileURL = getUpdateURIsURL();
764 if (fileURL != null) {
765 BufferedReader in = null;
766 String newUris = null;
767 StringBuffer buffer = new StringBuffer();
768
769 try {
770 in = new BufferedReader(getURLStream(fileURL));
771 while (true) {
772 String line = in.readLine();
773 if (line == null) {
774 break;
775 }
776 buffer.append(line);
777 }
778 newUris = buffer.toString();
779 } catch (IOException ioe) {
780 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
781 } finally {
782 if (in != null) {
783 try {
784 in.close();
785 } catch (IOException ioe) {
786 // ignore
787 }
788 }
789 }
790
791 processNewTransports(isRebalanceUpdateURIs(), newUris);
792 }
793
794 if (disposed || connectionFailure != null) {
795 reconnectMutex.notifyAll();
796 }
797
798 if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
799 return false;
800 } else {
801 List<URI> connectList = getConnectList();
802 if (connectList.isEmpty()) {
803 failure = new IOException("No uris available to connect to.");
804 } else {
805 if (doRebalance) {
806 if (connectList.get(0).equals(connectedTransportURI)) {
807 // already connected to first in the list, no need to rebalance
808 doRebalance = false;
809 return false;
810 } else {
811 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
812 try {
813 Transport transport = this.connectedTransport.getAndSet(null);
814 if (transport != null) {
815 disposeTransport(transport);
816 }
817 } catch (Exception e) {
818 LOG.debug("Caught an exception stopping existing transport for rebalance", e);
819 }
820 }
821 doRebalance = false;
822 }
823 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
824 reconnectDelay = initialReconnectDelay;
825 }
826 synchronized (backupMutex) {
827 if (backup && !backups.isEmpty()) {
828 BackupTransport bt = backups.remove(0);
829 Transport t = bt.getTransport();
830 URI uri = bt.getUri();
831 t.setTransportListener(myTransportListener);
832 try {
833 if (started) {
834 restoreTransport(t);
835 }
836 reconnectDelay = initialReconnectDelay;
837 failedConnectTransportURI = null;
838 connectedTransportURI = uri;
839 connectedTransport.set(t);
840 reconnectMutex.notifyAll();
841 connectFailures = 0;
842 LOG.info("Successfully reconnected to backup " + uri);
843 return false;
844 } catch (Exception e) {
845 LOG.debug("Backup transport failed", e);
846 }
847 }
848 }
849
850 Iterator<URI> iter = connectList.iterator();
851 while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
852 URI uri = iter.next();
853 Transport t = null;
854 try {
855 LOG.debug("Attempting connect to: " + uri);
856 SslContext.setCurrentSslContext(brokerSslContext);
857 t = TransportFactory.compositeConnect(uri);
858 t.setTransportListener(myTransportListener);
859 t.start();
860
861 if (started) {
862 restoreTransport(t);
863 }
864
865 LOG.debug("Connection established");
866 reconnectDelay = initialReconnectDelay;
867 connectedTransportURI = uri;
868 connectedTransport.set(t);
869 reconnectMutex.notifyAll();
870 connectFailures = 0;
871 // Make sure on initial startup, that the
872 // transportListener
873 // has been initialized for this instance.
874 synchronized (listenerMutex) {
875 if (transportListener == null) {
876 try {
877 // if it isn't set after 2secs - it
878 // probably never will be
879 listenerMutex.wait(2000);
880 } catch (InterruptedException ex) {
881 }
882 }
883 }
884 if (transportListener != null) {
885 transportListener.transportResumed();
886 } else {
887 LOG.debug("transport resumed by transport listener not set");
888 }
889 if (firstConnection) {
890 firstConnection = false;
891 LOG.info("Successfully connected to " + uri);
892 } else {
893 LOG.info("Successfully reconnected to " + uri);
894 }
895 connected = true;
896 return false;
897 } catch (Exception e) {
898 failure = e;
899 LOG.debug("Connect fail to: " + uri + ", reason: " + e);
900 if (t != null) {
901 try {
902 t.stop();
903 } catch (Exception ee) {
904 LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
905 }
906 }
907 } finally {
908 SslContext.setCurrentSslContext(null);
909 }
910 }
911 }
912 }
913 int reconnectAttempts = 0;
914 if (firstConnection) {
915 if (this.startupMaxReconnectAttempts != 0) {
916 reconnectAttempts = this.startupMaxReconnectAttempts;
917 }
918 }
919 if (reconnectAttempts == 0) {
920 reconnectAttempts = this.maxReconnectAttempts;
921 }
922 if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
923 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
924 connectionFailure = failure;
925
926 // Make sure on initial startup, that the transportListener has
927 // been initialized
928 // for this instance.
929 synchronized (listenerMutex) {
930 if (transportListener == null) {
931 try {
932 listenerMutex.wait(2000);
933 } catch (InterruptedException ex) {
934 }
935 }
936 }
937
938 if (transportListener != null) {
939 if (connectionFailure instanceof IOException) {
940 transportListener.onException((IOException) connectionFailure);
941 } else {
942 transportListener.onException(IOExceptionSupport.create(connectionFailure));
943 }
944 }
945 reconnectMutex.notifyAll();
946 return false;
947 }
948 }
949 if (!disposed) {
950
951 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
952 synchronized (sleepMutex) {
953 try {
954 sleepMutex.wait(reconnectDelay);
955 } catch (InterruptedException e) {
956 Thread.currentThread().interrupt();
957 }
958 }
959
960 if (useExponentialBackOff) {
961 // Exponential increment of reconnect delay.
962 reconnectDelay *= backOffMultiplier;
963 if (reconnectDelay > maxReconnectDelay) {
964 reconnectDelay = maxReconnectDelay;
965 }
966 }
967 }
968 return !disposed;
969 }
970
971 final boolean buildBackups() {
972 synchronized (backupMutex) {
973 if (!disposed && backup && backups.size() < backupPoolSize) {
974 List<URI> connectList = getConnectList();
975 // removed disposed backups
976 List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
977 for (BackupTransport bt : backups) {
978 if (bt.isDisposed()) {
979 disposedList.add(bt);
980 }
981 }
982 backups.removeAll(disposedList);
983 disposedList.clear();
984 for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
985 URI uri = iter.next();
986 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
987 try {
988 SslContext.setCurrentSslContext(brokerSslContext);
989 BackupTransport bt = new BackupTransport(this);
990 bt.setUri(uri);
991 if (!backups.contains(bt)) {
992 Transport t = TransportFactory.compositeConnect(uri);
993 t.setTransportListener(bt);
994 t.start();
995 bt.setTransport(t);
996 backups.add(bt);
997 }
998 } catch (Exception e) {
999 LOG.debug("Failed to build backup ", e);
1000 } finally {
1001 SslContext.setCurrentSslContext(null);
1002 }
1003 }
1004 }
1005 }
1006 }
1007 return false;
1008 }
1009
1010 public boolean isDisposed() {
1011 return disposed;
1012 }
1013
1014 public boolean isConnected() {
1015 return connected;
1016 }
1017
1018 public void reconnect(URI uri) throws IOException {
1019 add(true, new URI[] { uri });
1020 }
1021
1022 public boolean isReconnectSupported() {
1023 return this.reconnectSupported;
1024 }
1025
1026 public void setReconnectSupported(boolean value) {
1027 this.reconnectSupported=value;
1028 }
1029
1030 public boolean isUpdateURIsSupported() {
1031 return this.updateURIsSupported;
1032 }
1033
1034 public void setUpdateURIsSupported(boolean value) {
1035 this.updateURIsSupported=value;
1036 }
1037
1038 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1039 if (isUpdateURIsSupported()) {
1040 List<URI> copy = new ArrayList<URI>(this.updated);
1041 List<URI> add = new ArrayList<URI>();
1042 if (updatedURIs != null && updatedURIs.length > 0) {
1043 Set<URI> set = new HashSet<URI>();
1044 for (int i = 0; i < updatedURIs.length; i++) {
1045 URI uri = updatedURIs[i];
1046 if (uri != null) {
1047 set.add(uri);
1048 }
1049 }
1050 for (URI uri : set) {
1051 if (copy.remove(uri) == false) {
1052 add.add(uri);
1053 }
1054 }
1055 synchronized (reconnectMutex) {
1056 this.updated.clear();
1057 this.updated.addAll(add);
1058 for (URI uri : copy) {
1059 this.uris.remove(uri);
1060 }
1061 add(rebalance, add.toArray(new URI[add.size()]));
1062 }
1063 }
1064 }
1065 }
1066
1067 /**
1068 * @return the updateURIsURL
1069 */
1070 public String getUpdateURIsURL() {
1071 return this.updateURIsURL;
1072 }
1073
1074 /**
1075 * @param updateURIsURL the updateURIsURL to set
1076 */
1077 public void setUpdateURIsURL(String updateURIsURL) {
1078 this.updateURIsURL = updateURIsURL;
1079 }
1080
1081 /**
1082 * @return the rebalanceUpdateURIs
1083 */
1084 public boolean isRebalanceUpdateURIs() {
1085 return this.rebalanceUpdateURIs;
1086 }
1087
1088 /**
1089 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1090 */
1091 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1092 this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1093 }
1094
1095 public int getReceiveCounter() {
1096 Transport transport = connectedTransport.get();
1097 if (transport == null) {
1098 return 0;
1099 }
1100 return transport.getReceiveCounter();
1101 }
1102
1103 public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1104 synchronized (reconnectMutex) {
1105 stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1106 }
1107 }
1108
1109 public ConnectionStateTracker getStateTracker() {
1110 return stateTracker;
1111 }
1112
1113 private boolean contains(URI newURI) {
1114
1115 boolean result = false;
1116 try {
1117 for (URI uri:uris) {
1118 if (newURI.getPort()==uri.getPort()) {
1119 InetAddress newAddr = InetAddress.getByName(newURI.getHost());
1120 InetAddress addr = InetAddress.getByName(uri.getHost());
1121 if (addr.equals(newAddr)) {
1122 result = true;
1123 break;
1124 }
1125 }
1126 }
1127 }catch(IOException e) {
1128 result = true;
1129 LOG.error("Failed to verify URI " + newURI + " already known: " + e);
1130 }
1131 return result;
1132 }
1133
1134 private InputStreamReader getURLStream(String path) throws IOException {
1135 InputStreamReader result = null;
1136 URL url = null;
1137 try {
1138 url = new URL(path);
1139 result = new InputStreamReader(url.openStream());
1140 } catch (MalformedURLException e) {
1141 // ignore - it could be a path to a a local file
1142 }
1143 if (result == null) {
1144 result = new FileReader(path);
1145 }
1146 return result;
1147 }
1148 }