001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import javax.jms.Connection;
020 import javax.jms.Destination;
021 import javax.jms.JMSException;
022 import javax.jms.Queue;
023 import javax.jms.QueueConnection;
024 import javax.jms.QueueConnectionFactory;
025 import javax.jms.QueueSession;
026 import javax.jms.Session;
027 import javax.naming.NamingException;
028
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 /**
033 * A Bridge to other JMS Queue providers
034 *
035 * @org.apache.xbean.XBean
036 *
037 *
038 */
039 public class JmsQueueConnector extends JmsConnector {
040 private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
041 private String outboundQueueConnectionFactoryName;
042 private String localConnectionFactoryName;
043 private QueueConnectionFactory outboundQueueConnectionFactory;
044 private QueueConnectionFactory localQueueConnectionFactory;
045 private QueueConnection outboundQueueConnection;
046 private QueueConnection localQueueConnection;
047 private InboundQueueBridge[] inboundQueueBridges;
048 private OutboundQueueBridge[] outboundQueueBridges;
049
050 public boolean init() {
051 boolean result = super.init();
052 if (result) {
053 try {
054 initializeForeignQueueConnection();
055 initializeLocalQueueConnection();
056 initializeInboundJmsMessageConvertor();
057 initializeOutboundJmsMessageConvertor();
058 initializeInboundQueueBridges();
059 initializeOutboundQueueBridges();
060 } catch (Exception e) {
061 LOG.error("Failed to initialize the JMSConnector", e);
062 }
063 }
064 return result;
065 }
066
067 /**
068 * @return Returns the inboundQueueBridges.
069 */
070 public InboundQueueBridge[] getInboundQueueBridges() {
071 return inboundQueueBridges;
072 }
073
074 /**
075 * @param inboundQueueBridges The inboundQueueBridges to set.
076 */
077 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
078 this.inboundQueueBridges = inboundQueueBridges;
079 }
080
081 /**
082 * @return Returns the outboundQueueBridges.
083 */
084 public OutboundQueueBridge[] getOutboundQueueBridges() {
085 return outboundQueueBridges;
086 }
087
088 /**
089 * @param outboundQueueBridges The outboundQueueBridges to set.
090 */
091 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
092 this.outboundQueueBridges = outboundQueueBridges;
093 }
094
095 /**
096 * @return Returns the localQueueConnectionFactory.
097 */
098 public QueueConnectionFactory getLocalQueueConnectionFactory() {
099 return localQueueConnectionFactory;
100 }
101
102 /**
103 * @param localQueueConnectionFactory The localQueueConnectionFactory to
104 * set.
105 */
106 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
107 this.localQueueConnectionFactory = localConnectionFactory;
108 }
109
110 /**
111 * @return Returns the outboundQueueConnectionFactory.
112 */
113 public QueueConnectionFactory getOutboundQueueConnectionFactory() {
114 return outboundQueueConnectionFactory;
115 }
116
117 /**
118 * @return Returns the outboundQueueConnectionFactoryName.
119 */
120 public String getOutboundQueueConnectionFactoryName() {
121 return outboundQueueConnectionFactoryName;
122 }
123
124 /**
125 * @param outboundQueueConnectionFactoryName The
126 * outboundQueueConnectionFactoryName to set.
127 */
128 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
129 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
130 }
131
132 /**
133 * @return Returns the localConnectionFactoryName.
134 */
135 public String getLocalConnectionFactoryName() {
136 return localConnectionFactoryName;
137 }
138
139 /**
140 * @param localConnectionFactoryName The localConnectionFactoryName to set.
141 */
142 public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143 this.localConnectionFactoryName = localConnectionFactoryName;
144 }
145
146 /**
147 * @return Returns the localQueueConnection.
148 */
149 public QueueConnection getLocalQueueConnection() {
150 return localQueueConnection;
151 }
152
153 /**
154 * @param localQueueConnection The localQueueConnection to set.
155 */
156 public void setLocalQueueConnection(QueueConnection localQueueConnection) {
157 this.localQueueConnection = localQueueConnection;
158 }
159
160 /**
161 * @return Returns the outboundQueueConnection.
162 */
163 public QueueConnection getOutboundQueueConnection() {
164 return outboundQueueConnection;
165 }
166
167 /**
168 * @param outboundQueueConnection The outboundQueueConnection to set.
169 */
170 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
171 this.outboundQueueConnection = foreignQueueConnection;
172 }
173
174 /**
175 * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
176 * to set.
177 */
178 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
179 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
180 }
181
182 public void restartProducerConnection() throws NamingException, JMSException {
183 outboundQueueConnection = null;
184 initializeForeignQueueConnection();
185
186 // the outboundQueueConnection was reestablished - publish the new connection to the bridges
187 if (inboundQueueBridges != null) {
188 for (int i = 0; i < inboundQueueBridges.length; i++) {
189 InboundQueueBridge bridge = inboundQueueBridges[i];
190 bridge.setConsumerConnection(outboundQueueConnection);
191 }
192 }
193 if (outboundQueueBridges != null) {
194 for (int i = 0; i < outboundQueueBridges.length; i++) {
195 OutboundQueueBridge bridge = outboundQueueBridges[i];
196 bridge.setProducerConnection(outboundQueueConnection);
197 }
198 }
199 }
200
201 protected void initializeForeignQueueConnection() throws NamingException, JMSException {
202 if (outboundQueueConnection == null) {
203 // get the connection factories
204 if (outboundQueueConnectionFactory == null) {
205 // look it up from JNDI
206 if (outboundQueueConnectionFactoryName != null) {
207 outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
208 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
209 if (outboundUsername != null) {
210 outboundQueueConnection = outboundQueueConnectionFactory
211 .createQueueConnection(outboundUsername, outboundPassword);
212 } else {
213 outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
214 }
215 } else {
216 throw new JMSException("Cannot create foreignConnection - no information");
217 }
218 } else {
219 if (outboundUsername != null) {
220 outboundQueueConnection = outboundQueueConnectionFactory
221 .createQueueConnection(outboundUsername, outboundPassword);
222 } else {
223 outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
224 }
225 }
226 }
227 if (localClientId != null && localClientId.length() > 0) {
228 outboundQueueConnection.setClientID(getOutboundClientId());
229 }
230 outboundQueueConnection.start();
231 }
232
233 protected void initializeLocalQueueConnection() throws NamingException, JMSException {
234 if (localQueueConnection == null) {
235 // get the connection factories
236 if (localQueueConnectionFactory == null) {
237 if (embeddedConnectionFactory == null) {
238 // look it up from JNDI
239 if (localConnectionFactoryName != null) {
240 localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
241 .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
242 if (localUsername != null) {
243 localQueueConnection = localQueueConnectionFactory
244 .createQueueConnection(localUsername, localPassword);
245 } else {
246 localQueueConnection = localQueueConnectionFactory.createQueueConnection();
247 }
248 } else {
249 throw new JMSException("Cannot create localConnection - no information");
250 }
251 } else {
252 localQueueConnection = embeddedConnectionFactory.createQueueConnection();
253 }
254 } else {
255 if (localUsername != null) {
256 localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername,
257 localPassword);
258 } else {
259 localQueueConnection = localQueueConnectionFactory.createQueueConnection();
260 }
261 }
262 }
263 if (localClientId != null && localClientId.length() > 0) {
264 localQueueConnection.setClientID(getLocalClientId());
265 }
266 localQueueConnection.start();
267 }
268
269 protected void initializeInboundJmsMessageConvertor() {
270 inboundMessageConvertor.setConnection(localQueueConnection);
271 }
272
273 protected void initializeOutboundJmsMessageConvertor() {
274 outboundMessageConvertor.setConnection(outboundQueueConnection);
275 }
276
277 protected void initializeInboundQueueBridges() throws JMSException {
278 if (inboundQueueBridges != null) {
279 QueueSession outboundSession = outboundQueueConnection
280 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
281 QueueSession localSession = localQueueConnection.createQueueSession(false,
282 Session.AUTO_ACKNOWLEDGE);
283 for (int i = 0; i < inboundQueueBridges.length; i++) {
284 InboundQueueBridge bridge = inboundQueueBridges[i];
285 String localQueueName = bridge.getLocalQueueName();
286 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
287 String queueName = bridge.getInboundQueueName();
288 Queue foreignQueue = createForeignQueue(outboundSession, queueName);
289 bridge.setConsumerQueue(foreignQueue);
290 bridge.setProducerQueue(activemqQueue);
291 bridge.setProducerConnection(localQueueConnection);
292 bridge.setConsumerConnection(outboundQueueConnection);
293 if (bridge.getJmsMessageConvertor() == null) {
294 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
295 }
296 bridge.setJmsConnector(this);
297 addInboundBridge(bridge);
298 }
299 outboundSession.close();
300 localSession.close();
301 }
302 }
303
304 protected void initializeOutboundQueueBridges() throws JMSException {
305 if (outboundQueueBridges != null) {
306 QueueSession outboundSession = outboundQueueConnection
307 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
308 QueueSession localSession = localQueueConnection.createQueueSession(false,
309 Session.AUTO_ACKNOWLEDGE);
310 for (int i = 0; i < outboundQueueBridges.length; i++) {
311 OutboundQueueBridge bridge = outboundQueueBridges[i];
312 String localQueueName = bridge.getLocalQueueName();
313 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
314 String queueName = bridge.getOutboundQueueName();
315 Queue foreignQueue = createForeignQueue(outboundSession, queueName);
316 bridge.setConsumerQueue(activemqQueue);
317 bridge.setProducerQueue(foreignQueue);
318 bridge.setProducerConnection(outboundQueueConnection);
319 bridge.setConsumerConnection(localQueueConnection);
320 if (bridge.getJmsMessageConvertor() == null) {
321 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
322 }
323 bridge.setJmsConnector(this);
324 addOutboundBridge(bridge);
325 }
326 outboundSession.close();
327 localSession.close();
328 }
329 }
330
331 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
332 Connection replyToConsumerConnection) {
333 Queue replyToProducerQueue = (Queue)destination;
334 boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
335
336 if (isInbound) {
337 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
338 if (bridge == null) {
339 bridge = new InboundQueueBridge() {
340 protected Destination processReplyToDestination(Destination destination) {
341 return null;
342 }
343 };
344 try {
345 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
346 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
347 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
348 replyToConsumerSession.close();
349 bridge.setConsumerQueue(replyToConsumerQueue);
350 bridge.setProducerQueue(replyToProducerQueue);
351 bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
352 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
353 bridge.setDoHandleReplyTo(false);
354 if (bridge.getJmsMessageConvertor() == null) {
355 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
356 }
357 bridge.setJmsConnector(this);
358 bridge.start();
359 LOG.info("Created replyTo bridge for " + replyToProducerQueue);
360 } catch (Exception e) {
361 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
362 return null;
363 }
364 replyToBridges.put(replyToProducerQueue, bridge);
365 }
366 return bridge.getConsumerQueue();
367 } else {
368 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
369 if (bridge == null) {
370 bridge = new OutboundQueueBridge() {
371 protected Destination processReplyToDestination(Destination destination) {
372 return null;
373 }
374 };
375 try {
376 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
377 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
378 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
379 replyToConsumerSession.close();
380 bridge.setConsumerQueue(replyToConsumerQueue);
381 bridge.setProducerQueue(replyToProducerQueue);
382 bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
383 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
384 bridge.setDoHandleReplyTo(false);
385 if (bridge.getJmsMessageConvertor() == null) {
386 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
387 }
388 bridge.setJmsConnector(this);
389 bridge.start();
390 LOG.info("Created replyTo bridge for " + replyToProducerQueue);
391 } catch (Exception e) {
392 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
393 return null;
394 }
395 replyToBridges.put(replyToProducerQueue, bridge);
396 }
397 return bridge.getConsumerQueue();
398 }
399 }
400
401 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
402 return session.createQueue(queueName);
403 }
404
405 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
406 Queue result = null;
407 try {
408 result = session.createQueue(queueName);
409 } catch (JMSException e) {
410 // look-up the Queue
411 try {
412 result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
413 } catch (NamingException e1) {
414 String errStr = "Failed to look-up Queue for name: " + queueName;
415 LOG.error(errStr, e);
416 JMSException jmsEx = new JMSException(errStr);
417 jmsEx.setLinkedException(e1);
418 throw jmsEx;
419 }
420 }
421 return result;
422 }
423
424 }