001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import java.util.concurrent.atomic.AtomicBoolean;
020 import javax.jms.Connection;
021 import javax.jms.Destination;
022 import javax.jms.JMSException;
023 import javax.jms.Message;
024 import javax.jms.MessageConsumer;
025 import javax.jms.MessageListener;
026 import javax.jms.MessageProducer;
027 import javax.naming.NamingException;
028 import org.apache.activemq.Service;
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 /**
033 * A Destination bridge is used to bridge between to different JMS systems
034 *
035 *
036 */
037 public abstract class DestinationBridge implements Service, MessageListener {
038 private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
039 protected MessageConsumer consumer;
040 protected AtomicBoolean started = new AtomicBoolean(false);
041 protected JmsMesageConvertor jmsMessageConvertor;
042 protected boolean doHandleReplyTo = true;
043 protected JmsConnector jmsConnector;
044 private int maximumRetries = 10;
045
046 /**
047 * @return Returns the consumer.
048 */
049 public MessageConsumer getConsumer() {
050 return consumer;
051 }
052
053 /**
054 * @param consumer The consumer to set.
055 */
056 public void setConsumer(MessageConsumer consumer) {
057 this.consumer = consumer;
058 }
059
060 /**
061 * @param connector
062 */
063 public void setJmsConnector(JmsConnector connector) {
064 this.jmsConnector = connector;
065 }
066
067 /**
068 * @return Returns the inboundMessageConvertor.
069 */
070 public JmsMesageConvertor getJmsMessageConvertor() {
071 return jmsMessageConvertor;
072 }
073
074 /**
075 * @param jmsMessageConvertor
076 */
077 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
078 this.jmsMessageConvertor = jmsMessageConvertor;
079 }
080
081 public int getMaximumRetries() {
082 return maximumRetries;
083 }
084
085 /**
086 * Sets the maximum number of retries if a send fails before closing the
087 * bridge
088 */
089 public void setMaximumRetries(int maximumRetries) {
090 this.maximumRetries = maximumRetries;
091 }
092
093 protected Destination processReplyToDestination(Destination destination) {
094 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
095 }
096
097 public void start() throws Exception {
098 if (started.compareAndSet(false, true)) {
099 MessageConsumer consumer = createConsumer();
100 consumer.setMessageListener(this);
101 createProducer();
102 }
103 }
104
105 public void stop() throws Exception {
106 started.set(false);
107 }
108
109 public void onMessage(Message message) {
110 int attempt = 0;
111 while (started.get() && message != null) {
112
113 try {
114 if (attempt > 0) {
115 restartProducer();
116 }
117 Message converted;
118 if (doHandleReplyTo) {
119 Destination replyTo = message.getJMSReplyTo();
120 if (replyTo != null) {
121 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
122 } else {
123 converted = jmsMessageConvertor.convert(message);
124 }
125 } else {
126 message.setJMSReplyTo(null);
127 converted = jmsMessageConvertor.convert(message);
128 }
129 sendMessage(converted);
130 message.acknowledge();
131 return;
132 } catch (Exception e) {
133 LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
134 if (maximumRetries > 0 && attempt >= maximumRetries) {
135 try {
136 stop();
137 } catch (Exception e1) {
138 LOG.warn("Failed to stop cleanly", e1);
139 }
140 }
141 }
142 }
143 }
144
145 /**
146 * @return Returns the doHandleReplyTo.
147 */
148 protected boolean isDoHandleReplyTo() {
149 return doHandleReplyTo;
150 }
151
152 /**
153 * @param doHandleReplyTo The doHandleReplyTo to set.
154 */
155 protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
156 this.doHandleReplyTo = doHandleReplyTo;
157 }
158
159 protected abstract MessageConsumer createConsumer() throws JMSException;
160
161 protected abstract MessageProducer createProducer() throws JMSException;
162
163 protected abstract void sendMessage(Message message) throws JMSException;
164
165 protected abstract Connection getConnnectionForConsumer();
166
167 protected abstract Connection getConnectionForProducer();
168
169 protected void restartProducer() throws JMSException, NamingException {
170 try {
171 //don't reconnect immediately
172 Thread.sleep(1000);
173 getConnectionForProducer().close();
174 } catch (Exception e) {
175 LOG.debug("Ignoring failure to close producer connection: " + e, e);
176 }
177 jmsConnector.restartProducerConnection();
178 createProducer();
179 }
180 }