001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.discovery.simple;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.util.Map;
022 import java.util.concurrent.SynchronousQueue;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.ThreadPoolExecutor;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.atomic.AtomicBoolean;
027
028 import org.apache.activemq.command.DiscoveryEvent;
029 import org.apache.activemq.thread.DefaultThreadPools;
030 import org.apache.activemq.transport.discovery.DiscoveryAgent;
031 import org.apache.activemq.transport.discovery.DiscoveryListener;
032 import org.apache.activemq.util.MDCHelper;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * A simple DiscoveryAgent that allows static configuration of the discovered
038 * services.
039 *
040 *
041 */
042 public class SimpleDiscoveryAgent implements DiscoveryAgent {
043
044 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
045 private long initialReconnectDelay = 1000;
046 private long maxReconnectDelay = 1000 * 30;
047 private long backOffMultiplier = 2;
048 private boolean useExponentialBackOff=true;
049 private int maxReconnectAttempts;
050 private final Object sleepMutex = new Object();
051 private long minConnectTime = 5000;
052 private DiscoveryListener listener;
053 private String services[] = new String[] {};
054 private final AtomicBoolean running = new AtomicBoolean(false);
055
056 class SimpleDiscoveryEvent extends DiscoveryEvent {
057
058 private int connectFailures;
059 private long reconnectDelay = initialReconnectDelay;
060 private long connectTime = System.currentTimeMillis();
061 private AtomicBoolean failed = new AtomicBoolean(false);
062
063 public SimpleDiscoveryEvent(String service) {
064 super(service);
065 }
066
067 }
068
069 public void setDiscoveryListener(DiscoveryListener listener) {
070 this.listener = listener;
071 }
072
073 public void registerService(String name) throws IOException {
074 }
075
076 public void start() throws Exception {
077 running.set(true);
078 for (int i = 0; i < services.length; i++) {
079 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
080 }
081 }
082
083 public void stop() throws Exception {
084 running.set(false);
085 synchronized (sleepMutex) {
086 sleepMutex.notifyAll();
087 }
088 }
089
090 public String[] getServices() {
091 return services;
092 }
093
094 public void setServices(String services) {
095 this.services = services.split(",");
096 }
097
098 public void setServices(String services[]) {
099 this.services = services;
100 }
101
102 public void setServices(URI services[]) {
103 this.services = new String[services.length];
104 for (int i = 0; i < services.length; i++) {
105 this.services[i] = services[i].toString();
106 }
107 }
108
109 public void serviceFailed(DiscoveryEvent devent) throws IOException {
110
111 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
112 if (event.failed.compareAndSet(false, true)) {
113
114 listener.onServiceRemove(event);
115 final Map context = MDCHelper.getCopyOfContextMap();
116 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
117 public void run() {
118
119 MDCHelper.setContextMap(context);
120
121 // We detect a failed connection attempt because the service
122 // fails right
123 // away.
124 if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
125 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event);
126
127 event.connectFailures++;
128
129 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
130 LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
131 return;
132 }
133
134 synchronized (sleepMutex) {
135 try {
136 if (!running.get()) {
137 return;
138 }
139
140 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
141 sleepMutex.wait(event.reconnectDelay);
142 } catch (InterruptedException ie) {
143 Thread.currentThread().interrupt();
144 return;
145 }
146 }
147
148 if (!useExponentialBackOff) {
149 event.reconnectDelay = initialReconnectDelay;
150 } else {
151 // Exponential increment of reconnect delay.
152 event.reconnectDelay *= backOffMultiplier;
153 if (event.reconnectDelay > maxReconnectDelay) {
154 event.reconnectDelay = maxReconnectDelay;
155 }
156 }
157
158 } else {
159 event.connectFailures = 0;
160 event.reconnectDelay = initialReconnectDelay;
161 }
162
163 if (!running.get()) {
164 return;
165 }
166
167 event.connectTime = System.currentTimeMillis();
168 event.failed.set(false);
169 listener.onServiceAdd(event);
170 }
171 }, "Simple Discovery Agent");
172 }
173 }
174
175 public long getBackOffMultiplier() {
176 return backOffMultiplier;
177 }
178
179 public void setBackOffMultiplier(long backOffMultiplier) {
180 this.backOffMultiplier = backOffMultiplier;
181 }
182
183 public long getInitialReconnectDelay() {
184 return initialReconnectDelay;
185 }
186
187 public void setInitialReconnectDelay(long initialReconnectDelay) {
188 this.initialReconnectDelay = initialReconnectDelay;
189 }
190
191 public int getMaxReconnectAttempts() {
192 return maxReconnectAttempts;
193 }
194
195 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
196 this.maxReconnectAttempts = maxReconnectAttempts;
197 }
198
199 public long getMaxReconnectDelay() {
200 return maxReconnectDelay;
201 }
202
203 public void setMaxReconnectDelay(long maxReconnectDelay) {
204 this.maxReconnectDelay = maxReconnectDelay;
205 }
206
207 public long getMinConnectTime() {
208 return minConnectTime;
209 }
210
211 public void setMinConnectTime(long minConnectTime) {
212 this.minConnectTime = minConnectTime;
213 }
214
215 public boolean isUseExponentialBackOff() {
216 return useExponentialBackOff;
217 }
218
219 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
220 this.useExponentialBackOff = useExponentialBackOff;
221 }
222 }