001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019
020 import java.util.*;
021 import java.util.concurrent.ConcurrentHashMap;
022
023 import javax.jms.JMSException;
024 import javax.transaction.xa.XAException;
025
026 import org.apache.activemq.ActiveMQMessageAudit;
027 import org.apache.activemq.command.ConnectionInfo;
028 import org.apache.activemq.command.LocalTransactionId;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.command.MessageAck;
031 import org.apache.activemq.command.ProducerInfo;
032 import org.apache.activemq.command.TransactionId;
033 import org.apache.activemq.command.XATransactionId;
034 import org.apache.activemq.state.ProducerState;
035 import org.apache.activemq.store.TransactionRecoveryListener;
036 import org.apache.activemq.store.TransactionStore;
037 import org.apache.activemq.transaction.LocalTransaction;
038 import org.apache.activemq.transaction.Synchronization;
039 import org.apache.activemq.transaction.Transaction;
040 import org.apache.activemq.transaction.XATransaction;
041 import org.apache.activemq.util.IOExceptionSupport;
042 import org.apache.activemq.util.WrappedException;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 /**
047 * This broker filter handles the transaction related operations in the Broker
048 * interface.
049 *
050 *
051 */
052 public class TransactionBroker extends BrokerFilter {
053
054 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
055
056 // The prepared XA transactions.
057 private TransactionStore transactionStore;
058 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
059 private ActiveMQMessageAudit audit;
060
061 public TransactionBroker(Broker next, TransactionStore transactionStore) {
062 super(next);
063 this.transactionStore = transactionStore;
064 }
065
066 // ////////////////////////////////////////////////////////////////////////////
067 //
068 // Life cycle Methods
069 //
070 // ////////////////////////////////////////////////////////////////////////////
071
072 /**
073 * Recovers any prepared transactions.
074 */
075 public void start() throws Exception {
076 transactionStore.start();
077 try {
078 final ConnectionContext context = new ConnectionContext();
079 context.setBroker(this);
080 context.setInRecoveryMode(true);
081 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
082 context.setProducerFlowControl(false);
083 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
084 producerExchange.setMutable(true);
085 producerExchange.setConnectionContext(context);
086 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
087 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
088 consumerExchange.setConnectionContext(context);
089 transactionStore.recover(new TransactionRecoveryListener() {
090 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
091 try {
092 beginTransaction(context, xid);
093 for (int i = 0; i < addedMessages.length; i++) {
094 send(producerExchange, addedMessages[i]);
095 }
096 for (int i = 0; i < aks.length; i++) {
097 acknowledge(consumerExchange, aks[i]);
098 }
099 prepareTransaction(context, xid);
100 } catch (Throwable e) {
101 throw new WrappedException(e);
102 }
103 }
104 });
105 } catch (WrappedException e) {
106 Throwable cause = e.getCause();
107 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
108 }
109 next.start();
110 }
111
112 public void stop() throws Exception {
113 transactionStore.stop();
114 next.stop();
115 }
116
117 // ////////////////////////////////////////////////////////////////////////////
118 //
119 // BrokerFilter overrides
120 //
121 // ////////////////////////////////////////////////////////////////////////////
122 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
123 List<TransactionId> txs = new ArrayList<TransactionId>();
124 synchronized (xaTransactions) {
125 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
126 Transaction tx = iter.next();
127 if (tx.isPrepared()) {
128 if (LOG.isDebugEnabled()) {
129 LOG.debug("prepared transaction: " + tx.getTransactionId());
130 }
131 txs.add(tx.getTransactionId());
132 }
133 }
134 }
135 XATransactionId rc[] = new XATransactionId[txs.size()];
136 txs.toArray(rc);
137 if (LOG.isDebugEnabled()) {
138 LOG.debug("prepared transacton list size: " + rc.length);
139 }
140 return rc;
141 }
142
143 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
144 // the transaction may have already been started.
145 if (xid.isXATransaction()) {
146 XATransaction transaction = null;
147 synchronized (xaTransactions) {
148 transaction = xaTransactions.get(xid);
149 if (transaction != null) {
150 return;
151 }
152 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
153 xaTransactions.put(xid, transaction);
154 }
155 } else {
156 Map<TransactionId, Transaction> transactionMap = context.getTransactions();
157 Transaction transaction = transactionMap.get(xid);
158 if (transaction != null) {
159 throw new JMSException("Transaction '" + xid + "' has already been started.");
160 }
161 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
162 transactionMap.put(xid, transaction);
163 }
164 }
165
166 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
167 Transaction transaction = getTransaction(context, xid, false);
168 return transaction.prepare();
169 }
170
171 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
172 Transaction transaction = getTransaction(context, xid, true);
173 transaction.commit(onePhase);
174 }
175
176 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
177 Transaction transaction = getTransaction(context, xid, true);
178 transaction.rollback();
179 }
180
181 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
182 Transaction transaction = getTransaction(context, xid, true);
183 transaction.rollback();
184 }
185
186 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
187 // This method may be invoked recursively.
188 // Track original tx so that it can be restored.
189 final ConnectionContext context = consumerExchange.getConnectionContext();
190 Transaction originalTx = context.getTransaction();
191 Transaction transaction = null;
192 if (ack.isInTransaction()) {
193 transaction = getTransaction(context, ack.getTransactionId(), false);
194 }
195 context.setTransaction(transaction);
196 try {
197 next.acknowledge(consumerExchange, ack);
198 } finally {
199 context.setTransaction(originalTx);
200 }
201 }
202
203 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
204 // This method may be invoked recursively.
205 // Track original tx so that it can be restored.
206 final ConnectionContext context = producerExchange.getConnectionContext();
207 Transaction originalTx = context.getTransaction();
208 Transaction transaction = null;
209 Synchronization sync = null;
210 if (message.getTransactionId() != null) {
211 transaction = getTransaction(context, message.getTransactionId(), false);
212 if (transaction != null) {
213 sync = new Synchronization() {
214
215 public void afterRollback() {
216 if (audit != null) {
217 audit.rollback(message);
218 }
219 }
220 };
221 transaction.addSynchronization(sync);
222 }
223 }
224 if (audit == null || !audit.isDuplicate(message)) {
225 context.setTransaction(transaction);
226 try {
227 next.send(producerExchange, message);
228 } finally {
229 context.setTransaction(originalTx);
230 }
231 } else {
232 if (sync != null && transaction != null) {
233 transaction.removeSynchronization(sync);
234 }
235 if (LOG.isDebugEnabled()) {
236 LOG.debug("IGNORING duplicate message " + message);
237 }
238 }
239 }
240
241 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
242 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
243 try {
244 Transaction transaction = iter.next();
245 transaction.rollback();
246 } catch (Exception e) {
247 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
248 }
249 iter.remove();
250 }
251
252 synchronized (xaTransactions) {
253 // first find all txs that belongs to the connection
254 ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
255 for (XATransaction tx : xaTransactions.values()) {
256 if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
257 txs.add(tx);
258 }
259 }
260
261 // then remove them
262 // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
263 for (XATransaction tx : txs) {
264 try {
265 tx.rollback();
266 } catch (Exception e) {
267 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
268 }
269 }
270
271 }
272 next.removeConnection(context, info, error);
273 }
274
275 // ////////////////////////////////////////////////////////////////////////////
276 //
277 // Implementation help methods.
278 //
279 // ////////////////////////////////////////////////////////////////////////////
280 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
281 Map transactionMap = null;
282 synchronized (xaTransactions) {
283 transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
284 }
285 Transaction transaction = (Transaction)transactionMap.get(xid);
286 if (transaction != null) {
287 return transaction;
288 }
289 if (xid.isXATransaction()) {
290 XAException e = new XAException("Transaction '" + xid + "' has not been started.");
291 e.errorCode = XAException.XAER_NOTA;
292 throw e;
293 } else {
294 throw new JMSException("Transaction '" + xid + "' has not been started.");
295 }
296 }
297
298 public void removeTransaction(XATransactionId xid) {
299 synchronized (xaTransactions) {
300 xaTransactions.remove(xid);
301 }
302 }
303
304 public synchronized void brokerServiceStarted() {
305 super.brokerServiceStarted();
306 if (getBrokerService().isSupportFailOver() && audit == null) {
307 audit = new ActiveMQMessageAudit();
308 }
309 }
310
311 }