001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.concurrent.atomic.AtomicLong;
022
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.command.ActiveMQDestination;
026 import org.apache.activemq.command.Message;
027 import org.apache.activemq.command.MessageAck;
028 import org.apache.activemq.command.MessageId;
029 import org.apache.activemq.store.AbstractMessageStore;
030 import org.apache.activemq.store.MessageRecoveryListener;
031 import org.apache.activemq.util.ByteSequence;
032 import org.apache.activemq.util.ByteSequenceData;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.wireformat.WireFormat;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 /**
039 *
040 */
041 public class JDBCMessageStore extends AbstractMessageStore {
042
043 class Duration {
044 static final int LIMIT = 100;
045 final long start = System.currentTimeMillis();
046 final String name;
047
048 Duration(String name) {
049 this.name = name;
050 }
051 void end() {
052 end(null);
053 }
054 void end(Object o) {
055 long duration = System.currentTimeMillis() - start;
056
057 if (duration > LIMIT) {
058 System.err.println(name + " took a long time: " + duration + "ms " + o);
059 }
060 }
061 }
062 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063 protected final WireFormat wireFormat;
064 protected final JDBCAdapter adapter;
065 protected final JDBCPersistenceAdapter persistenceAdapter;
066 protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067 protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068
069 protected ActiveMQMessageAudit audit;
070
071 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
072 super(destination);
073 this.persistenceAdapter = persistenceAdapter;
074 this.adapter = adapter;
075 this.wireFormat = wireFormat;
076 this.audit = audit;
077 }
078
079 public void addMessage(ConnectionContext context, Message message) throws IOException {
080 MessageId messageId = message.getMessageId();
081 if (audit != null && audit.isDuplicate(message)) {
082 if (LOG.isDebugEnabled()) {
083 LOG.debug(destination.getPhysicalName()
084 + " ignoring duplicated (add) message, already stored: "
085 + messageId);
086 }
087 return;
088 }
089
090 long sequenceId = persistenceAdapter.getNextSequenceId();
091
092 // Serialize the Message..
093 byte data[];
094 try {
095 ByteSequence packet = wireFormat.marshal(message);
096 data = ByteSequenceData.toByteArray(packet);
097 } catch (IOException e) {
098 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
099 }
100
101 // Get a connection and insert the message into the DB.
102 TransactionContext c = persistenceAdapter.getTransactionContext(context);
103 try {
104 adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority());
105 } catch (SQLException e) {
106 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
107 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
108 } finally {
109 c.close();
110 }
111 onAdd(sequenceId, message.getPriority());
112 }
113
114 protected void onAdd(long sequenceId, byte priority) {
115 }
116
117 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
118 // Get a connection and insert the message into the DB.
119 TransactionContext c = persistenceAdapter.getTransactionContext(context);
120 try {
121 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
122 } catch (SQLException e) {
123 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
124 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
125 } finally {
126 c.close();
127 }
128 }
129
130 public Message getMessage(MessageId messageId) throws IOException {
131 // Get a connection and pull the message out of the DB
132 TransactionContext c = persistenceAdapter.getTransactionContext();
133 try {
134 byte data[] = adapter.doGetMessage(c, messageId);
135 if (data == null) {
136 return null;
137 }
138
139 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
140 return answer;
141 } catch (IOException e) {
142 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
143 } catch (SQLException e) {
144 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
145 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
146 } finally {
147 c.close();
148 }
149 }
150
151 public String getMessageReference(MessageId messageId) throws IOException {
152 long id = messageId.getBrokerSequenceId();
153
154 // Get a connection and pull the message out of the DB
155 TransactionContext c = persistenceAdapter.getTransactionContext();
156 try {
157 return adapter.doGetMessageReference(c, id);
158 } catch (IOException e) {
159 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
160 } catch (SQLException e) {
161 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
162 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
163 } finally {
164 c.close();
165 }
166 }
167
168 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
169
170 long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
171
172 // Get a connection and remove the message from the DB
173 TransactionContext c = persistenceAdapter.getTransactionContext(context);
174 try {
175 adapter.doRemoveMessage(c, seq);
176 } catch (SQLException e) {
177 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
178 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
179 } finally {
180 c.close();
181 }
182 }
183
184 public void recover(final MessageRecoveryListener listener) throws Exception {
185
186 // Get all the Message ids out of the database.
187 TransactionContext c = persistenceAdapter.getTransactionContext();
188 try {
189 c = persistenceAdapter.getTransactionContext();
190 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
191 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
192 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
193 msg.getMessageId().setBrokerSequenceId(sequenceId);
194 return listener.recoverMessage(msg);
195 }
196
197 public boolean recoverMessageReference(String reference) throws Exception {
198 return listener.recoverMessageReference(new MessageId(reference));
199 }
200 });
201 } catch (SQLException e) {
202 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
203 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
204 } finally {
205 c.close();
206 }
207 }
208
209 /**
210 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
211 */
212 public void removeAllMessages(ConnectionContext context) throws IOException {
213 // Get a connection and remove the message from the DB
214 TransactionContext c = persistenceAdapter.getTransactionContext(context);
215 try {
216 adapter.doRemoveAllMessages(c, destination);
217 } catch (SQLException e) {
218 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
219 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
220 } finally {
221 c.close();
222 }
223 }
224
225 public int getMessageCount() throws IOException {
226 int result = 0;
227 TransactionContext c = persistenceAdapter.getTransactionContext();
228 try {
229
230 result = adapter.doGetMessageCount(c, destination);
231
232 } catch (SQLException e) {
233 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
234 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
235 } finally {
236 c.close();
237 }
238 return result;
239 }
240
241 /**
242 * @param maxReturned
243 * @param listener
244 * @throws Exception
245 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
246 * org.apache.activemq.store.MessageRecoveryListener)
247 */
248 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
249 TransactionContext c = persistenceAdapter.getTransactionContext();
250 try {
251 adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
252 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
253
254 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
255 if (listener.hasSpace()) {
256 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
257 msg.getMessageId().setBrokerSequenceId(sequenceId);
258 listener.recoverMessage(msg);
259 lastRecoveredSequenceId.set(sequenceId);
260 lastRecoveredPriority.set(msg.getPriority());
261 return true;
262 }
263 return false;
264 }
265
266 public boolean recoverMessageReference(String reference) throws Exception {
267 if (listener.hasSpace()) {
268 listener.recoverMessageReference(new MessageId(reference));
269 return true;
270 }
271 return false;
272 }
273
274 });
275 } catch (SQLException e) {
276 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
277 } finally {
278 c.close();
279 }
280
281 }
282
283 /**
284 * @see org.apache.activemq.store.MessageStore#resetBatching()
285 */
286 public void resetBatching() {
287 if (LOG.isTraceEnabled()) {
288 LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
289 }
290 lastRecoveredSequenceId.set(-1);
291 lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
292
293 }
294
295 @Override
296 public void setBatch(MessageId messageId) {
297 try {
298 long[] storedValues = getStoreSequenceIdForMessageId(messageId);
299 lastRecoveredSequenceId.set(storedValues[0]);
300 lastRecoveredPriority.set(storedValues[1]);
301 } catch (IOException ignoredAsAlreadyLogged) {
302 lastRecoveredSequenceId.set(-1);
303 lastRecoveredPriority.set(Byte.MAX_VALUE -1);
304 }
305 if (LOG.isTraceEnabled()) {
306 LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
307 + ", priority: " + lastRecoveredPriority.get());
308 }
309 }
310
311 private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
312 long[] result = new long[]{-1, Byte.MAX_VALUE -1};
313 TransactionContext c = persistenceAdapter.getTransactionContext();
314 try {
315 result = adapter.getStoreSequenceId(c, destination, messageId);
316 } catch (SQLException e) {
317 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
318 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
319 } finally {
320 c.close();
321 }
322 return result;
323 }
324
325 public void setPrioritizedMessages(boolean prioritizedMessages) {
326 super.setPrioritizedMessages(prioritizedMessages);
327 }
328 }