001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.memory;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.concurrent.ConcurrentHashMap;
023 import java.util.concurrent.Future;
024 import javax.transaction.xa.XAException;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.command.Message;
027 import org.apache.activemq.command.MessageAck;
028 import org.apache.activemq.command.MessageId;
029 import org.apache.activemq.command.TransactionId;
030 import org.apache.activemq.command.XATransactionId;
031 import org.apache.activemq.store.AbstractMessageStore;
032 import org.apache.activemq.store.MessageStore;
033 import org.apache.activemq.store.PersistenceAdapter;
034 import org.apache.activemq.store.ProxyMessageStore;
035 import org.apache.activemq.store.ProxyTopicMessageStore;
036 import org.apache.activemq.store.TopicMessageStore;
037 import org.apache.activemq.store.TransactionRecoveryListener;
038 import org.apache.activemq.store.TransactionStore;
039
040 /**
041 * Provides a TransactionStore implementation that can create transaction aware
042 * MessageStore objects from non transaction aware MessageStore objects.
043 *
044 *
045 */
046 public class MemoryTransactionStore implements TransactionStore {
047
048 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049 ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050 final PersistenceAdapter persistenceAdapter;
051
052 private boolean doingRecover;
053
054 public class Tx {
055 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056
057 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058
059 public void add(AddMessageCommand msg) {
060 messages.add(msg);
061 }
062
063 public void add(RemoveMessageCommand ack) {
064 acks.add(ack);
065 }
066
067 public Message[] getMessages() {
068 Message rc[] = new Message[messages.size()];
069 int count = 0;
070 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071 AddMessageCommand cmd = iter.next();
072 rc[count++] = cmd.getMessage();
073 }
074 return rc;
075 }
076
077 public MessageAck[] getAcks() {
078 MessageAck rc[] = new MessageAck[acks.size()];
079 int count = 0;
080 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081 RemoveMessageCommand cmd = iter.next();
082 rc[count++] = cmd.getMessageAck();
083 }
084 return rc;
085 }
086
087 /**
088 * @throws IOException
089 */
090 public void commit() throws IOException {
091 ConnectionContext ctx = new ConnectionContext();
092 persistenceAdapter.beginTransaction(ctx);
093 try {
094
095 // Do all the message adds.
096 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097 AddMessageCommand cmd = iter.next();
098 cmd.run(ctx);
099 }
100 // And removes..
101 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102 RemoveMessageCommand cmd = iter.next();
103 cmd.run(ctx);
104 }
105
106 } catch ( IOException e ) {
107 persistenceAdapter.rollbackTransaction(ctx);
108 throw e;
109 }
110 persistenceAdapter.commitTransaction(ctx);
111 }
112 }
113
114 public interface AddMessageCommand {
115 Message getMessage();
116
117 void run(ConnectionContext context) throws IOException;
118 }
119
120 public interface RemoveMessageCommand {
121 MessageAck getMessageAck();
122
123 void run(ConnectionContext context) throws IOException;
124 }
125
126 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
127 this.persistenceAdapter=persistenceAdapter;
128 }
129
130 public MessageStore proxy(MessageStore messageStore) {
131 return new ProxyMessageStore(messageStore) {
132 @Override
133 public void addMessage(ConnectionContext context, final Message send) throws IOException {
134 MemoryTransactionStore.this.addMessage(getDelegate(), send);
135 }
136
137 @Override
138 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
139 MemoryTransactionStore.this.addMessage(getDelegate(), message);
140 return AbstractMessageStore.FUTURE;
141 }
142
143 @Override
144 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
145 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
146 }
147
148 @Override
149 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
150 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
151 }
152 };
153 }
154
155 public TopicMessageStore proxy(TopicMessageStore messageStore) {
156 return new ProxyTopicMessageStore(messageStore) {
157 @Override
158 public void addMessage(ConnectionContext context, final Message send) throws IOException {
159 MemoryTransactionStore.this.addMessage(getDelegate(), send);
160 }
161
162 @Override
163 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
164 MemoryTransactionStore.this.addMessage(getDelegate(), message);
165 return AbstractMessageStore.FUTURE;
166 }
167
168 @Override
169 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
170 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
171 }
172
173 @Override
174 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
175 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
176 }
177
178 @Override
179 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
180 MessageId messageId, MessageAck ack) throws IOException {
181 MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
182 subscriptionName, messageId, ack);
183 }
184 };
185 }
186
187 /**
188 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
189 */
190 public void prepare(TransactionId txid) {
191 Tx tx = inflightTransactions.remove(txid);
192 if (tx == null) {
193 return;
194 }
195 preparedTransactions.put(txid, tx);
196 }
197
198 public Tx getTx(Object txid) {
199 Tx tx = inflightTransactions.get(txid);
200 if (tx == null) {
201 tx = new Tx();
202 inflightTransactions.put(txid, tx);
203 }
204 return tx;
205 }
206
207 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
208 if (preCommit != null) {
209 preCommit.run();
210 }
211 Tx tx;
212 if (wasPrepared) {
213 tx = preparedTransactions.remove(txid);
214 } else {
215 tx = inflightTransactions.remove(txid);
216 }
217
218 if (tx == null) {
219 if (postCommit != null) {
220 postCommit.run();
221 }
222 return;
223 }
224 // ensure message order w.r.t to cursor and store for setBatch()
225 synchronized (this) {
226 tx.commit();
227 if (postCommit != null) {
228 postCommit.run();
229 }
230 }
231 }
232
233 /**
234 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
235 */
236 public void rollback(TransactionId txid) {
237 preparedTransactions.remove(txid);
238 inflightTransactions.remove(txid);
239 }
240
241 public void start() throws Exception {
242 }
243
244 public void stop() throws Exception {
245 }
246
247 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
248 // All the inflight transactions get rolled back..
249 inflightTransactions.clear();
250 this.doingRecover = true;
251 try {
252 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
253 Object txid = iter.next();
254 Tx tx = preparedTransactions.get(txid);
255 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
256 }
257 } finally {
258 this.doingRecover = false;
259 }
260 }
261
262 /**
263 * @param message
264 * @throws IOException
265 */
266 void addMessage(final MessageStore destination, final Message message) throws IOException {
267
268 if (doingRecover) {
269 return;
270 }
271
272 if (message.getTransactionId() != null) {
273 Tx tx = getTx(message.getTransactionId());
274 tx.add(new AddMessageCommand() {
275 public Message getMessage() {
276 return message;
277 }
278
279 public void run(ConnectionContext ctx) throws IOException {
280 destination.addMessage(ctx, message);
281 }
282
283 });
284 } else {
285 destination.addMessage(null, message);
286 }
287 }
288
289 /**
290 * @param ack
291 * @throws IOException
292 */
293 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
294 if (doingRecover) {
295 return;
296 }
297
298 if (ack.isInTransaction()) {
299 Tx tx = getTx(ack.getTransactionId());
300 tx.add(new RemoveMessageCommand() {
301 public MessageAck getMessageAck() {
302 return ack;
303 }
304
305 public void run(ConnectionContext ctx) throws IOException {
306 destination.removeMessage(ctx, ack);
307 }
308 });
309 } else {
310 destination.removeMessage(null, ack);
311 }
312 }
313
314 final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
315 final MessageId messageId, final MessageAck ack) throws IOException {
316 if (doingRecover) {
317 return;
318 }
319
320 if (ack.isInTransaction()) {
321 Tx tx = getTx(ack.getTransactionId());
322 tx.add(new RemoveMessageCommand() {
323 public MessageAck getMessageAck() {
324 return ack;
325 }
326
327 public void run(ConnectionContext ctx) throws IOException {
328 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
329 }
330 });
331 } else {
332 destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
333 }
334 }
335
336
337 public void delete() {
338 inflightTransactions.clear();
339 preparedTransactions.clear();
340 doingRecover = false;
341 }
342
343 }