001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import java.util.Iterator;
020 import java.util.List;
021 import java.util.Map;
022 import java.util.concurrent.CopyOnWriteArrayList;
023 import java.util.concurrent.atomic.AtomicBoolean;
024
025 import javax.jms.Connection;
026 import javax.jms.Destination;
027 import javax.jms.JMSException;
028 import javax.naming.NamingException;
029
030 import org.apache.activemq.ActiveMQConnectionFactory;
031 import org.apache.activemq.Service;
032 import org.apache.activemq.broker.BrokerService;
033 import org.apache.activemq.util.LRUCache;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036 import org.springframework.jndi.JndiTemplate;
037
038 /**
039 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
040 * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
041 * JMS 1.0.2 compliant.
042 *
043 *
044 */
045 public abstract class JmsConnector implements Service {
046
047 private static int nextId;
048 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
049
050 protected JndiTemplate jndiLocalTemplate;
051 protected JndiTemplate jndiOutboundTemplate;
052 protected JmsMesageConvertor inboundMessageConvertor;
053 protected JmsMesageConvertor outboundMessageConvertor;
054 protected AtomicBoolean initialized = new AtomicBoolean(false);
055 protected AtomicBoolean started = new AtomicBoolean(false);
056 protected ActiveMQConnectionFactory embeddedConnectionFactory;
057 protected int replyToDestinationCacheSize = 10000;
058 protected String outboundUsername;
059 protected String outboundPassword;
060 protected String localUsername;
061 protected String localPassword;
062 protected String outboundClientId;
063 protected String localClientId;
064 protected LRUCache replyToBridges = createLRUCache();
065
066 private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
067 private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
068 private String name;
069
070
071 private static LRUCache createLRUCache() {
072 return new LRUCache() {
073 private static final long serialVersionUID = -7446792754185879286L;
074
075 protected boolean removeEldestEntry(Map.Entry enty) {
076 if (size() > maxCacheSize) {
077 Iterator iter = entrySet().iterator();
078 Map.Entry lru = (Map.Entry)iter.next();
079 remove(lru.getKey());
080 DestinationBridge bridge = (DestinationBridge)lru.getValue();
081 try {
082 bridge.stop();
083 LOG.info("Expired bridge: " + bridge);
084 } catch (Exception e) {
085 LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
086 }
087 }
088 return false;
089 }
090 };
091 }
092
093 /**
094 */
095 public boolean init() {
096 boolean result = initialized.compareAndSet(false, true);
097 if (result) {
098 if (jndiLocalTemplate == null) {
099 jndiLocalTemplate = new JndiTemplate();
100 }
101 if (jndiOutboundTemplate == null) {
102 jndiOutboundTemplate = new JndiTemplate();
103 }
104 if (inboundMessageConvertor == null) {
105 inboundMessageConvertor = new SimpleJmsMessageConvertor();
106 }
107 if (outboundMessageConvertor == null) {
108 outboundMessageConvertor = new SimpleJmsMessageConvertor();
109 }
110 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
111 }
112 return result;
113 }
114
115 public void start() throws Exception {
116 init();
117 if (started.compareAndSet(false, true)) {
118 for (int i = 0; i < inboundBridges.size(); i++) {
119 DestinationBridge bridge = inboundBridges.get(i);
120 bridge.start();
121 }
122 for (int i = 0; i < outboundBridges.size(); i++) {
123 DestinationBridge bridge = outboundBridges.get(i);
124 bridge.start();
125 }
126 LOG.info("JMS Connector " + getName() + " Started");
127 }
128 }
129
130 public void stop() throws Exception {
131 if (started.compareAndSet(true, false)) {
132 for (int i = 0; i < inboundBridges.size(); i++) {
133 DestinationBridge bridge = inboundBridges.get(i);
134 bridge.stop();
135 }
136 for (int i = 0; i < outboundBridges.size(); i++) {
137 DestinationBridge bridge = outboundBridges.get(i);
138 bridge.stop();
139 }
140 LOG.info("JMS Connector " + getName() + " Stopped");
141 }
142 }
143
144 public void clearBridges() {
145 inboundBridges.clear();
146 outboundBridges.clear();
147 }
148
149 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
150
151 /**
152 * One way to configure the local connection - this is called by The
153 * BrokerService when the Connector is embedded
154 *
155 * @param service
156 */
157 public void setBrokerService(BrokerService service) {
158 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
159 }
160
161 /**
162 * @return Returns the jndiTemplate.
163 */
164 public JndiTemplate getJndiLocalTemplate() {
165 return jndiLocalTemplate;
166 }
167
168 /**
169 * @param jndiTemplate The jndiTemplate to set.
170 */
171 public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
172 this.jndiLocalTemplate = jndiTemplate;
173 }
174
175 /**
176 * @return Returns the jndiOutboundTemplate.
177 */
178 public JndiTemplate getJndiOutboundTemplate() {
179 return jndiOutboundTemplate;
180 }
181
182 /**
183 * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
184 */
185 public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
186 this.jndiOutboundTemplate = jndiOutboundTemplate;
187 }
188
189 /**
190 * @return Returns the inboundMessageConvertor.
191 */
192 public JmsMesageConvertor getInboundMessageConvertor() {
193 return inboundMessageConvertor;
194 }
195
196 /**
197 * @param inboundMessageConvertor The inboundMessageConvertor to set.
198 */
199 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
200 this.inboundMessageConvertor = jmsMessageConvertor;
201 }
202
203 /**
204 * @return Returns the outboundMessageConvertor.
205 */
206 public JmsMesageConvertor getOutboundMessageConvertor() {
207 return outboundMessageConvertor;
208 }
209
210 /**
211 * @param outboundMessageConvertor The outboundMessageConvertor to set.
212 */
213 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
214 this.outboundMessageConvertor = outboundMessageConvertor;
215 }
216
217 /**
218 * @return Returns the replyToDestinationCacheSize.
219 */
220 public int getReplyToDestinationCacheSize() {
221 return replyToDestinationCacheSize;
222 }
223
224 /**
225 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
226 * set.
227 */
228 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
229 this.replyToDestinationCacheSize = replyToDestinationCacheSize;
230 }
231
232 /**
233 * @return Returns the localPassword.
234 */
235 public String getLocalPassword() {
236 return localPassword;
237 }
238
239 /**
240 * @param localPassword The localPassword to set.
241 */
242 public void setLocalPassword(String localPassword) {
243 this.localPassword = localPassword;
244 }
245
246 /**
247 * @return Returns the localUsername.
248 */
249 public String getLocalUsername() {
250 return localUsername;
251 }
252
253 /**
254 * @param localUsername The localUsername to set.
255 */
256 public void setLocalUsername(String localUsername) {
257 this.localUsername = localUsername;
258 }
259
260 /**
261 * @return Returns the outboundPassword.
262 */
263 public String getOutboundPassword() {
264 return outboundPassword;
265 }
266
267 /**
268 * @param outboundPassword The outboundPassword to set.
269 */
270 public void setOutboundPassword(String outboundPassword) {
271 this.outboundPassword = outboundPassword;
272 }
273
274 /**
275 * @return Returns the outboundUsername.
276 */
277 public String getOutboundUsername() {
278 return outboundUsername;
279 }
280
281 /**
282 * @param outboundUsername The outboundUsername to set.
283 */
284 public void setOutboundUsername(String outboundUsername) {
285 this.outboundUsername = outboundUsername;
286 }
287
288 /**
289 * @return the outboundClientId
290 */
291 public String getOutboundClientId() {
292 return outboundClientId;
293 }
294
295 /**
296 * @param outboundClientId the outboundClientId to set
297 */
298 public void setOutboundClientId(String outboundClientId) {
299 this.outboundClientId = outboundClientId;
300 }
301
302 /**
303 * @return the localClientId
304 */
305 public String getLocalClientId() {
306 return localClientId;
307 }
308
309 /**
310 * @param localClientId the localClientId to set
311 */
312 public void setLocalClientId(String localClientId) {
313 this.localClientId = localClientId;
314 }
315
316
317 protected void addInboundBridge(DestinationBridge bridge) {
318 inboundBridges.add(bridge);
319 }
320
321 protected void addOutboundBridge(DestinationBridge bridge) {
322 outboundBridges.add(bridge);
323 }
324
325 protected void removeInboundBridge(DestinationBridge bridge) {
326 inboundBridges.remove(bridge);
327 }
328
329 protected void removeOutboundBridge(DestinationBridge bridge) {
330 outboundBridges.remove(bridge);
331 }
332
333 public String getName() {
334 if (name == null) {
335 name = "Connector:" + getNextId();
336 }
337 return name;
338 }
339
340 private static synchronized int getNextId() {
341 return nextId++;
342 }
343
344 public void setName(String name) {
345 this.name = name;
346 }
347
348 public abstract void restartProducerConnection() throws NamingException, JMSException;
349 }