001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import javax.jms.Connection;
020 import javax.jms.Destination;
021 import javax.jms.JMSException;
022 import javax.jms.Session;
023 import javax.jms.Topic;
024 import javax.jms.TopicConnection;
025 import javax.jms.TopicConnectionFactory;
026 import javax.jms.TopicSession;
027 import javax.naming.NamingException;
028
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 /**
033 * A Bridge to other JMS Topic providers
034 *
035 * @org.apache.xbean.XBean
036 *
037 *
038 */
039 public class JmsTopicConnector extends JmsConnector {
040 private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
041 private String outboundTopicConnectionFactoryName;
042 private String localConnectionFactoryName;
043 private TopicConnectionFactory outboundTopicConnectionFactory;
044 private TopicConnectionFactory localTopicConnectionFactory;
045 private TopicConnection outboundTopicConnection;
046 private TopicConnection localTopicConnection;
047 private InboundTopicBridge[] inboundTopicBridges;
048 private OutboundTopicBridge[] outboundTopicBridges;
049
050 public boolean init() {
051 boolean result = super.init();
052 if (result) {
053 try {
054 initializeForeignTopicConnection();
055 initializeLocalTopicConnection();
056 initializeInboundJmsMessageConvertor();
057 initializeOutboundJmsMessageConvertor();
058 initializeInboundTopicBridges();
059 initializeOutboundTopicBridges();
060 } catch (Exception e) {
061 LOG.error("Failed to initialize the JMSConnector", e);
062 }
063 }
064 return result;
065 }
066
067 /**
068 * @return Returns the inboundTopicBridges.
069 */
070 public InboundTopicBridge[] getInboundTopicBridges() {
071 return inboundTopicBridges;
072 }
073
074 /**
075 * @param inboundTopicBridges The inboundTopicBridges to set.
076 */
077 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
078 this.inboundTopicBridges = inboundTopicBridges;
079 }
080
081 /**
082 * @return Returns the outboundTopicBridges.
083 */
084 public OutboundTopicBridge[] getOutboundTopicBridges() {
085 return outboundTopicBridges;
086 }
087
088 /**
089 * @param outboundTopicBridges The outboundTopicBridges to set.
090 */
091 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
092 this.outboundTopicBridges = outboundTopicBridges;
093 }
094
095 /**
096 * @return Returns the localTopicConnectionFactory.
097 */
098 public TopicConnectionFactory getLocalTopicConnectionFactory() {
099 return localTopicConnectionFactory;
100 }
101
102 /**
103 * @param localTopicConnectionFactory The localTopicConnectionFactory to
104 * set.
105 */
106 public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
107 this.localTopicConnectionFactory = localConnectionFactory;
108 }
109
110 /**
111 * @return Returns the outboundTopicConnectionFactory.
112 */
113 public TopicConnectionFactory getOutboundTopicConnectionFactory() {
114 return outboundTopicConnectionFactory;
115 }
116
117 /**
118 * @return Returns the outboundTopicConnectionFactoryName.
119 */
120 public String getOutboundTopicConnectionFactoryName() {
121 return outboundTopicConnectionFactoryName;
122 }
123
124 /**
125 * @param outboundTopicConnectionFactoryName The
126 * outboundTopicConnectionFactoryName to set.
127 */
128 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
129 this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
130 }
131
132 /**
133 * @return Returns the localConnectionFactoryName.
134 */
135 public String getLocalConnectionFactoryName() {
136 return localConnectionFactoryName;
137 }
138
139 /**
140 * @param localConnectionFactoryName The localConnectionFactoryName to set.
141 */
142 public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143 this.localConnectionFactoryName = localConnectionFactoryName;
144 }
145
146 /**
147 * @return Returns the localTopicConnection.
148 */
149 public TopicConnection getLocalTopicConnection() {
150 return localTopicConnection;
151 }
152
153 /**
154 * @param localTopicConnection The localTopicConnection to set.
155 */
156 public void setLocalTopicConnection(TopicConnection localTopicConnection) {
157 this.localTopicConnection = localTopicConnection;
158 }
159
160 /**
161 * @return Returns the outboundTopicConnection.
162 */
163 public TopicConnection getOutboundTopicConnection() {
164 return outboundTopicConnection;
165 }
166
167 /**
168 * @param outboundTopicConnection The outboundTopicConnection to set.
169 */
170 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
171 this.outboundTopicConnection = foreignTopicConnection;
172 }
173
174 /**
175 * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory
176 * to set.
177 */
178 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
179 this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
180 }
181
182 public void restartProducerConnection() throws NamingException, JMSException {
183 outboundTopicConnection = null;
184 initializeForeignTopicConnection();
185 }
186
187 protected void initializeForeignTopicConnection() throws NamingException, JMSException {
188 if (outboundTopicConnection == null) {
189 // get the connection factories
190 if (outboundTopicConnectionFactory == null) {
191 // look it up from JNDI
192 if (outboundTopicConnectionFactoryName != null) {
193 outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
194 .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
195 if (outboundUsername != null) {
196 outboundTopicConnection = outboundTopicConnectionFactory
197 .createTopicConnection(outboundUsername, outboundPassword);
198 } else {
199 outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
200 }
201 } else {
202 throw new JMSException("Cannot create localConnection - no information");
203 }
204 } else {
205 if (outboundUsername != null) {
206 outboundTopicConnection = outboundTopicConnectionFactory
207 .createTopicConnection(outboundUsername, outboundPassword);
208 } else {
209 outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
210 }
211 }
212 }
213 if (localClientId != null && localClientId.length() > 0) {
214 outboundTopicConnection.setClientID(getOutboundClientId());
215 }
216 outboundTopicConnection.start();
217 }
218
219 protected void initializeLocalTopicConnection() throws NamingException, JMSException {
220 if (localTopicConnection == null) {
221 // get the connection factories
222 if (localTopicConnectionFactory == null) {
223 if (embeddedConnectionFactory == null) {
224 // look it up from JNDI
225 if (localConnectionFactoryName != null) {
226 localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
227 .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
228 if (localUsername != null) {
229 localTopicConnection = localTopicConnectionFactory
230 .createTopicConnection(localUsername, localPassword);
231 } else {
232 localTopicConnection = localTopicConnectionFactory.createTopicConnection();
233 }
234 } else {
235 throw new JMSException("Cannot create localConnection - no information");
236 }
237 } else {
238 localTopicConnection = embeddedConnectionFactory.createTopicConnection();
239 }
240 } else {
241 if (localUsername != null) {
242 localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername,
243 localPassword);
244 } else {
245 localTopicConnection = localTopicConnectionFactory.createTopicConnection();
246 }
247 }
248 }
249 if (localClientId != null && localClientId.length() > 0) {
250 localTopicConnection.setClientID(getLocalClientId());
251 }
252 localTopicConnection.start();
253 }
254
255 protected void initializeInboundJmsMessageConvertor() {
256 inboundMessageConvertor.setConnection(localTopicConnection);
257 }
258
259 protected void initializeOutboundJmsMessageConvertor() {
260 outboundMessageConvertor.setConnection(outboundTopicConnection);
261 }
262
263 protected void initializeInboundTopicBridges() throws JMSException {
264 if (inboundTopicBridges != null) {
265 TopicSession outboundSession = outboundTopicConnection
266 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
267 TopicSession localSession = localTopicConnection.createTopicSession(false,
268 Session.AUTO_ACKNOWLEDGE);
269 for (int i = 0; i < inboundTopicBridges.length; i++) {
270 InboundTopicBridge bridge = inboundTopicBridges[i];
271 String localTopicName = bridge.getLocalTopicName();
272 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
273 String topicName = bridge.getInboundTopicName();
274 Topic foreignTopic = createForeignTopic(outboundSession, topicName);
275 bridge.setConsumerTopic(foreignTopic);
276 bridge.setProducerTopic(activemqTopic);
277 bridge.setProducerConnection(localTopicConnection);
278 bridge.setConsumerConnection(outboundTopicConnection);
279 if (bridge.getJmsMessageConvertor() == null) {
280 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
281 }
282 bridge.setJmsConnector(this);
283 addInboundBridge(bridge);
284 }
285 outboundSession.close();
286 localSession.close();
287 }
288 }
289
290 protected void initializeOutboundTopicBridges() throws JMSException {
291 if (outboundTopicBridges != null) {
292 TopicSession outboundSession = outboundTopicConnection
293 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
294 TopicSession localSession = localTopicConnection.createTopicSession(false,
295 Session.AUTO_ACKNOWLEDGE);
296 for (int i = 0; i < outboundTopicBridges.length; i++) {
297 OutboundTopicBridge bridge = outboundTopicBridges[i];
298 String localTopicName = bridge.getLocalTopicName();
299 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
300 String topicName = bridge.getOutboundTopicName();
301 Topic foreignTopic = createForeignTopic(outboundSession, topicName);
302 bridge.setConsumerTopic(activemqTopic);
303 bridge.setProducerTopic(foreignTopic);
304 bridge.setProducerConnection(outboundTopicConnection);
305 bridge.setConsumerConnection(localTopicConnection);
306 if (bridge.getJmsMessageConvertor() == null) {
307 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
308 }
309 bridge.setJmsConnector(this);
310 addOutboundBridge(bridge);
311 }
312 outboundSession.close();
313 localSession.close();
314 }
315 }
316
317 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
318 Connection replyToConsumerConnection) {
319 Topic replyToProducerTopic = (Topic)destination;
320 boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
321
322 if (isInbound) {
323 InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
324 if (bridge == null) {
325 bridge = new InboundTopicBridge() {
326 protected Destination processReplyToDestination(Destination destination) {
327 return null;
328 }
329 };
330 try {
331 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
332 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
333 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
334 replyToConsumerSession.close();
335 bridge.setConsumerTopic(replyToConsumerTopic);
336 bridge.setProducerTopic(replyToProducerTopic);
337 bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
338 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
339 bridge.setDoHandleReplyTo(false);
340 if (bridge.getJmsMessageConvertor() == null) {
341 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
342 }
343 bridge.setJmsConnector(this);
344 bridge.start();
345 LOG.info("Created replyTo bridge for " + replyToProducerTopic);
346 } catch (Exception e) {
347 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
348 return null;
349 }
350 replyToBridges.put(replyToProducerTopic, bridge);
351 }
352 return bridge.getConsumerTopic();
353 } else {
354 OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
355 if (bridge == null) {
356 bridge = new OutboundTopicBridge() {
357 protected Destination processReplyToDestination(Destination destination) {
358 return null;
359 }
360 };
361 try {
362 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
363 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
364 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
365 replyToConsumerSession.close();
366 bridge.setConsumerTopic(replyToConsumerTopic);
367 bridge.setProducerTopic(replyToProducerTopic);
368 bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
369 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
370 bridge.setDoHandleReplyTo(false);
371 if (bridge.getJmsMessageConvertor() == null) {
372 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
373 }
374 bridge.setJmsConnector(this);
375 bridge.start();
376 LOG.info("Created replyTo bridge for " + replyToProducerTopic);
377 } catch (Exception e) {
378 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
379 return null;
380 }
381 replyToBridges.put(replyToProducerTopic, bridge);
382 }
383 return bridge.getConsumerTopic();
384 }
385 }
386
387 protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
388 return session.createTopic(topicName);
389 }
390
391 protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
392 Topic result = null;
393 try {
394 result = session.createTopic(topicName);
395 } catch (JMSException e) {
396 // look-up the Topic
397 try {
398 result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
399 } catch (NamingException e1) {
400 String errStr = "Failed to look-up Topic for name: " + topicName;
401 LOG.error(errStr, e);
402 JMSException jmsEx = new JMSException(errStr);
403 jmsEx.setLinkedException(e1);
404 throw jmsEx;
405 }
406 }
407 return result;
408 }
409
410 }