001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc.adapter;
018
019 import java.io.IOException;
020 import java.sql.PreparedStatement;
021 import java.sql.ResultSet;
022 import java.sql.SQLException;
023 import java.sql.Statement;
024 import java.util.ArrayList;
025 import java.util.HashSet;
026 import java.util.LinkedList;
027 import java.util.Set;
028 import java.util.concurrent.locks.ReadWriteLock;
029 import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.MessageId;
033 import org.apache.activemq.command.ProducerId;
034 import org.apache.activemq.command.SubscriptionInfo;
035 import org.apache.activemq.store.jdbc.JDBCAdapter;
036 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
037 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
038 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
039 import org.apache.activemq.store.jdbc.Statements;
040 import org.apache.activemq.store.jdbc.TransactionContext;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 /**
045 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
046 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
047 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
048 * The databases/JDBC drivers that use this adapter are:
049 * <ul>
050 * <li></li>
051 * </ul>
052 *
053 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
054 *
055 *
056 */
057 public class DefaultJDBCAdapter implements JDBCAdapter {
058 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
059 public static final int MAX_ROWS = 10000;
060 protected Statements statements;
061 protected boolean batchStatments = true;
062 protected boolean prioritizedMessages;
063 protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
064 // needs to be min twice the prefetch for a durable sub and large enough for selector range
065 protected int maxRows = MAX_ROWS;
066
067 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
068 s.setBytes(index, data);
069 }
070
071 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
072 return rs.getBytes(index);
073 }
074
075 public void doCreateTables(TransactionContext c) throws SQLException, IOException {
076 Statement s = null;
077 cleanupExclusiveLock.writeLock().lock();
078 try {
079 // Check to see if the table already exists. If it does, then don't
080 // log warnings during startup.
081 // Need to run the scripts anyways since they may contain ALTER
082 // statements that upgrade a previous version
083 // of the table
084 boolean alreadyExists = false;
085 ResultSet rs = null;
086 try {
087 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
088 new String[] { "TABLE" });
089 alreadyExists = rs.next();
090 } catch (Throwable ignore) {
091 } finally {
092 close(rs);
093 }
094 s = c.getConnection().createStatement();
095 String[] createStatments = this.statements.getCreateSchemaStatements();
096 for (int i = 0; i < createStatments.length; i++) {
097 // This will fail usually since the tables will be
098 // created already.
099 try {
100 LOG.debug("Executing SQL: " + createStatments[i]);
101 s.execute(createStatments[i]);
102 } catch (SQLException e) {
103 if (alreadyExists) {
104 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
105 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
106 + " Vendor code: " + e.getErrorCode());
107 } else {
108 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
109 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
110 + " Vendor code: " + e.getErrorCode());
111 JDBCPersistenceAdapter.log("Failure details: ", e);
112 }
113 }
114 }
115 c.getConnection().commit();
116 } finally {
117 cleanupExclusiveLock.writeLock().unlock();
118 try {
119 s.close();
120 } catch (Throwable e) {
121 }
122 }
123 }
124
125 public void doDropTables(TransactionContext c) throws SQLException, IOException {
126 Statement s = null;
127 cleanupExclusiveLock.writeLock().lock();
128 try {
129 s = c.getConnection().createStatement();
130 String[] dropStatments = this.statements.getDropSchemaStatements();
131 for (int i = 0; i < dropStatments.length; i++) {
132 // This will fail usually since the tables will be
133 // created already.
134 try {
135 LOG.debug("Executing SQL: " + dropStatments[i]);
136 s.execute(dropStatments[i]);
137 } catch (SQLException e) {
138 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
139 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
140 + e.getErrorCode());
141 JDBCPersistenceAdapter.log("Failure details: ", e);
142 }
143 }
144 c.getConnection().commit();
145 } finally {
146 cleanupExclusiveLock.writeLock().unlock();
147 try {
148 s.close();
149 } catch (Throwable e) {
150 }
151 }
152 }
153
154 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
155 PreparedStatement s = null;
156 ResultSet rs = null;
157 cleanupExclusiveLock.readLock().lock();
158 try {
159 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
160 rs = s.executeQuery();
161 long seq1 = 0;
162 if (rs.next()) {
163 seq1 = rs.getLong(1);
164 }
165 rs.close();
166 s.close();
167 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
168 rs = s.executeQuery();
169 long seq2 = 0;
170 if (rs.next()) {
171 seq2 = rs.getLong(1);
172 }
173 long seq = Math.max(seq1, seq2);
174 return seq;
175 } finally {
176 cleanupExclusiveLock.readLock().unlock();
177 close(rs);
178 close(s);
179 }
180 }
181
182 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
183 PreparedStatement s = null;
184 ResultSet rs = null;
185 cleanupExclusiveLock.readLock().lock();
186 try {
187 s = c.getConnection().prepareStatement(
188 this.statements.getFindMessageByIdStatement());
189 s.setLong(1, storeSequenceId);
190 rs = s.executeQuery();
191 if (!rs.next()) {
192 return null;
193 }
194 return getBinaryData(rs, 1);
195 } finally {
196 cleanupExclusiveLock.readLock().unlock();
197 close(rs);
198 close(s);
199 }
200 }
201
202
203 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
204 long expiration, byte priority) throws SQLException, IOException {
205 PreparedStatement s = c.getAddMessageStatement();
206 cleanupExclusiveLock.readLock().lock();
207 try {
208 if (s == null) {
209 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
210 if (this.batchStatments) {
211 c.setAddMessageStatement(s);
212 }
213 }
214 s.setLong(1, sequence);
215 s.setString(2, messageID.getProducerId().toString());
216 s.setLong(3, messageID.getProducerSequenceId());
217 s.setString(4, destination.getQualifiedName());
218 s.setLong(5, expiration);
219 s.setLong(6, priority);
220 setBinaryData(s, 7, data);
221 if (this.batchStatments) {
222 s.addBatch();
223 } else if (s.executeUpdate() != 1) {
224 throw new SQLException("Failed add a message");
225 }
226 } finally {
227 cleanupExclusiveLock.readLock().unlock();
228 if (!this.batchStatments) {
229 if (s != null) {
230 s.close();
231 }
232 }
233 }
234 }
235
236 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
237 long expirationTime, String messageRef) throws SQLException, IOException {
238 PreparedStatement s = c.getAddMessageStatement();
239 cleanupExclusiveLock.readLock().lock();
240 try {
241 if (s == null) {
242 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
243 if (this.batchStatments) {
244 c.setAddMessageStatement(s);
245 }
246 }
247 s.setLong(1, messageID.getBrokerSequenceId());
248 s.setString(2, messageID.getProducerId().toString());
249 s.setLong(3, messageID.getProducerSequenceId());
250 s.setString(4, destination.getQualifiedName());
251 s.setLong(5, expirationTime);
252 s.setString(6, messageRef);
253 if (this.batchStatments) {
254 s.addBatch();
255 } else if (s.executeUpdate() != 1) {
256 throw new SQLException("Failed add a message");
257 }
258 } finally {
259 cleanupExclusiveLock.readLock().unlock();
260 if (!this.batchStatments) {
261 s.close();
262 }
263 }
264 }
265
266 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
267 PreparedStatement s = null;
268 ResultSet rs = null;
269 cleanupExclusiveLock.readLock().lock();
270 try {
271 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
272 s.setString(1, messageID.getProducerId().toString());
273 s.setLong(2, messageID.getProducerSequenceId());
274 s.setString(3, destination.getQualifiedName());
275 rs = s.executeQuery();
276 if (!rs.next()) {
277 return new long[]{0,0};
278 }
279 return new long[]{rs.getLong(1), rs.getLong(2)};
280 } finally {
281 cleanupExclusiveLock.readLock().unlock();
282 close(rs);
283 close(s);
284 }
285 }
286
287 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
288 PreparedStatement s = null;
289 ResultSet rs = null;
290 cleanupExclusiveLock.readLock().lock();
291 try {
292 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
293 s.setString(1, id.getProducerId().toString());
294 s.setLong(2, id.getProducerSequenceId());
295 rs = s.executeQuery();
296 if (!rs.next()) {
297 return null;
298 }
299 return getBinaryData(rs, 1);
300 } finally {
301 cleanupExclusiveLock.readLock().unlock();
302 close(rs);
303 close(s);
304 }
305 }
306
307 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
308 PreparedStatement s = null;
309 ResultSet rs = null;
310 cleanupExclusiveLock.readLock().lock();
311 try {
312 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
313 s.setLong(1, seq);
314 rs = s.executeQuery();
315 if (!rs.next()) {
316 return null;
317 }
318 return rs.getString(1);
319 } finally {
320 cleanupExclusiveLock.readLock().unlock();
321 close(rs);
322 close(s);
323 }
324 }
325
326 public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
327 PreparedStatement s = c.getRemovedMessageStatement();
328 cleanupExclusiveLock.readLock().lock();
329 try {
330 if (s == null) {
331 s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
332 if (this.batchStatments) {
333 c.setRemovedMessageStatement(s);
334 }
335 }
336 s.setLong(1, seq);
337 if (this.batchStatments) {
338 s.addBatch();
339 } else if (s.executeUpdate() != 1) {
340 throw new SQLException("Failed to remove message");
341 }
342 } finally {
343 cleanupExclusiveLock.readLock().unlock();
344 if (!this.batchStatments && s != null) {
345 s.close();
346 }
347 }
348 }
349
350 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
351 throws Exception {
352 PreparedStatement s = null;
353 ResultSet rs = null;
354 cleanupExclusiveLock.readLock().lock();
355 try {
356 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
357 s.setString(1, destination.getQualifiedName());
358 rs = s.executeQuery();
359 if (this.statements.isUseExternalMessageReferences()) {
360 while (rs.next()) {
361 if (!listener.recoverMessageReference(rs.getString(2))) {
362 break;
363 }
364 }
365 } else {
366 while (rs.next()) {
367 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
368 break;
369 }
370 }
371 }
372 } finally {
373 cleanupExclusiveLock.readLock().unlock();
374 close(rs);
375 close(s);
376 }
377 }
378
379 public void doMessageIdScan(TransactionContext c, int limit,
380 JDBCMessageIdScanListener listener) throws SQLException, IOException {
381 PreparedStatement s = null;
382 ResultSet rs = null;
383 cleanupExclusiveLock.readLock().lock();
384 try {
385 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
386 s.setMaxRows(limit);
387 rs = s.executeQuery();
388 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
389 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
390 while (rs.next()) {
391 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
392 }
393 if (LOG.isDebugEnabled()) {
394 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
395 }
396 for (MessageId id : reverseOrderIds) {
397 listener.messageId(id);
398 }
399 } finally {
400 cleanupExclusiveLock.readLock().unlock();
401 close(rs);
402 close(s);
403 }
404 }
405
406 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
407 String subscriptionName, long seq, long prio) throws SQLException, IOException {
408 PreparedStatement s = c.getUpdateLastAckStatement();
409 cleanupExclusiveLock.readLock().lock();
410 try {
411 if (s == null) {
412 s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
413 if (this.batchStatments) {
414 c.setUpdateLastAckStatement(s);
415 }
416 }
417 s.setLong(1, seq);
418 s.setString(2, destination.getQualifiedName());
419 s.setString(3, clientId);
420 s.setString(4, subscriptionName);
421 s.setLong(5, prio);
422 if (this.batchStatments) {
423 s.addBatch();
424 } else if (s.executeUpdate() != 1) {
425 throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
426 }
427 } finally {
428 cleanupExclusiveLock.readLock().unlock();
429 if (!this.batchStatments) {
430 close(s);
431 }
432 }
433 }
434
435
436 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
437 String subscriptionName, long seq, long priority) throws SQLException, IOException {
438 PreparedStatement s = c.getUpdateLastAckStatement();
439 cleanupExclusiveLock.readLock().lock();
440 try {
441 if (s == null) {
442 s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
443 if (this.batchStatments) {
444 c.setUpdateLastAckStatement(s);
445 }
446 }
447 s.setLong(1, seq);
448 s.setString(2, destination.getQualifiedName());
449 s.setString(3, clientId);
450 s.setString(4, subscriptionName);
451
452 if (this.batchStatments) {
453 s.addBatch();
454 } else if (s.executeUpdate() != 1) {
455 throw new IOException("Could not update last ack seq : "
456 + seq + ", for sub: " + subscriptionName);
457 }
458 } finally {
459 cleanupExclusiveLock.readLock().unlock();
460 if (!this.batchStatments) {
461 close(s);
462 }
463 }
464 }
465
466 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
467 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
468 // dumpTables(c,
469 // destination.getQualifiedName(),clientId,subscriptionName);
470 PreparedStatement s = null;
471 ResultSet rs = null;
472 cleanupExclusiveLock.readLock().lock();
473 try {
474 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
475 s.setString(1, destination.getQualifiedName());
476 s.setString(2, clientId);
477 s.setString(3, subscriptionName);
478 rs = s.executeQuery();
479 if (this.statements.isUseExternalMessageReferences()) {
480 while (rs.next()) {
481 if (!listener.recoverMessageReference(rs.getString(2))) {
482 break;
483 }
484 }
485 } else {
486 while (rs.next()) {
487 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
488 break;
489 }
490 }
491 }
492 } finally {
493 cleanupExclusiveLock.readLock().unlock();
494 close(rs);
495 close(s);
496 }
497 }
498
499 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
500 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
501
502 PreparedStatement s = null;
503 ResultSet rs = null;
504 cleanupExclusiveLock.readLock().lock();
505 try {
506 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
507 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
508 s.setString(1, destination.getQualifiedName());
509 s.setString(2, clientId);
510 s.setString(3, subscriptionName);
511 s.setLong(4, seq);
512 rs = s.executeQuery();
513 int count = 0;
514 if (this.statements.isUseExternalMessageReferences()) {
515 while (rs.next() && count < maxReturned) {
516 if (listener.recoverMessageReference(rs.getString(1))) {
517 count++;
518 }
519 }
520 } else {
521 while (rs.next() && count < maxReturned) {
522 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
523 count++;
524 }
525 }
526 }
527 } finally {
528 cleanupExclusiveLock.readLock().unlock();
529 close(rs);
530 close(s);
531 }
532 }
533
534 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
535 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
536
537 PreparedStatement s = null;
538 ResultSet rs = null;
539 cleanupExclusiveLock.readLock().lock();
540 try {
541 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
542 s.setMaxRows(maxRows);
543 s.setString(1, destination.getQualifiedName());
544 s.setString(2, clientId);
545 s.setString(3, subscriptionName);
546 s.setLong(4, seq);
547 s.setLong(5, priority);
548 rs = s.executeQuery();
549 int count = 0;
550 if (this.statements.isUseExternalMessageReferences()) {
551 while (rs.next() && count < maxReturned) {
552 if (listener.recoverMessageReference(rs.getString(1))) {
553 count++;
554 }
555 }
556 } else {
557 while (rs.next() && count < maxReturned) {
558 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
559 count++;
560 }
561 }
562 }
563 } finally {
564 cleanupExclusiveLock.readLock().unlock();
565 close(rs);
566 close(s);
567 }
568 }
569
570 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
571 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
572 PreparedStatement s = null;
573 ResultSet rs = null;
574 int result = 0;
575 cleanupExclusiveLock.readLock().lock();
576 try {
577 if (isPrioritizedMessages) {
578 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
579 } else {
580 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
581 }
582 s.setString(1, destination.getQualifiedName());
583 s.setString(2, clientId);
584 s.setString(3, subscriptionName);
585 rs = s.executeQuery();
586 if (rs.next()) {
587 result = rs.getInt(1);
588 }
589 } finally {
590 cleanupExclusiveLock.readLock().unlock();
591 close(rs);
592 close(s);
593 }
594 return result;
595 }
596
597 /**
598 * @param c
599 * @param info
600 * @param retroactive
601 * @throws SQLException
602 * @throws IOException
603 */
604 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
605 throws SQLException, IOException {
606 // dumpTables(c, destination.getQualifiedName(), clientId,
607 // subscriptionName);
608 PreparedStatement s = null;
609 cleanupExclusiveLock.readLock().lock();
610 try {
611 long lastMessageId = -1;
612 if (!retroactive) {
613 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
614 ResultSet rs = null;
615 try {
616 rs = s.executeQuery();
617 if (rs.next()) {
618 lastMessageId = rs.getLong(1);
619 }
620 } finally {
621 close(rs);
622 close(s);
623 }
624 }
625 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
626 int maxPriority = 1;
627 if (isPrioritizedMessages) {
628 maxPriority = 10;
629 }
630
631 for (int priority = 0; priority < maxPriority; priority++) {
632 s.setString(1, info.getDestination().getQualifiedName());
633 s.setString(2, info.getClientId());
634 s.setString(3, info.getSubscriptionName());
635 s.setString(4, info.getSelector());
636 s.setLong(5, lastMessageId);
637 s.setString(6, info.getSubscribedDestination().getQualifiedName());
638 s.setLong(7, priority);
639
640 if (s.executeUpdate() != 1) {
641 throw new IOException("Could not create durable subscription for: " + info.getClientId());
642 }
643 }
644
645 } finally {
646 cleanupExclusiveLock.readLock().unlock();
647 close(s);
648 }
649 }
650
651 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
652 String clientId, String subscriptionName) throws SQLException, IOException {
653 PreparedStatement s = null;
654 ResultSet rs = null;
655 cleanupExclusiveLock.readLock().lock();
656 try {
657 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
658 s.setString(1, destination.getQualifiedName());
659 s.setString(2, clientId);
660 s.setString(3, subscriptionName);
661 rs = s.executeQuery();
662 if (!rs.next()) {
663 return null;
664 }
665 SubscriptionInfo subscription = new SubscriptionInfo();
666 subscription.setDestination(destination);
667 subscription.setClientId(clientId);
668 subscription.setSubscriptionName(subscriptionName);
669 subscription.setSelector(rs.getString(1));
670 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
671 ActiveMQDestination.QUEUE_TYPE));
672 return subscription;
673 } finally {
674 cleanupExclusiveLock.readLock().unlock();
675 close(rs);
676 close(s);
677 }
678 }
679
680 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
681 throws SQLException, IOException {
682 PreparedStatement s = null;
683 ResultSet rs = null;
684 cleanupExclusiveLock.readLock().lock();
685 try {
686 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
687 s.setString(1, destination.getQualifiedName());
688 rs = s.executeQuery();
689 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
690 while (rs.next()) {
691 SubscriptionInfo subscription = new SubscriptionInfo();
692 subscription.setDestination(destination);
693 subscription.setSelector(rs.getString(1));
694 subscription.setSubscriptionName(rs.getString(2));
695 subscription.setClientId(rs.getString(3));
696 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
697 ActiveMQDestination.QUEUE_TYPE));
698 rc.add(subscription);
699 }
700 return rc.toArray(new SubscriptionInfo[rc.size()]);
701 } finally {
702 cleanupExclusiveLock.readLock().unlock();
703 close(rs);
704 close(s);
705 }
706 }
707
708 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
709 IOException {
710 PreparedStatement s = null;
711 cleanupExclusiveLock.readLock().lock();
712 try {
713 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
714 s.setString(1, destinationName.getQualifiedName());
715 s.executeUpdate();
716 s.close();
717 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
718 s.setString(1, destinationName.getQualifiedName());
719 s.executeUpdate();
720 } finally {
721 cleanupExclusiveLock.readLock().unlock();
722 close(s);
723 }
724 }
725
726 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
727 String subscriptionName) throws SQLException, IOException {
728 PreparedStatement s = null;
729 cleanupExclusiveLock.readLock().lock();
730 try {
731 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
732 s.setString(1, destination.getQualifiedName());
733 s.setString(2, clientId);
734 s.setString(3, subscriptionName);
735 s.executeUpdate();
736 } finally {
737 cleanupExclusiveLock.readLock().unlock();
738 close(s);
739 }
740 }
741
742 public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
743 PreparedStatement s = null;
744 cleanupExclusiveLock.writeLock().lock();
745 try {
746 if (isPrioritizedMessages) {
747 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
748 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
749 } else {
750 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
751 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
752 }
753 s.setLong(1, System.currentTimeMillis());
754 int i = s.executeUpdate();
755 LOG.debug("Deleted " + i + " old message(s).");
756 } finally {
757 cleanupExclusiveLock.writeLock().unlock();
758 close(s);
759 }
760 }
761
762 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
763 String clientId, String subscriberName) throws SQLException, IOException {
764 PreparedStatement s = null;
765 ResultSet rs = null;
766 long result = -1;
767 cleanupExclusiveLock.readLock().lock();
768 try {
769 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
770 s.setString(1, destination.getQualifiedName());
771 s.setString(2, clientId);
772 s.setString(3, subscriberName);
773 rs = s.executeQuery();
774 if (rs.next()) {
775 result = rs.getLong(1);
776 }
777 } finally {
778 cleanupExclusiveLock.readLock().unlock();
779 close(rs);
780 close(s);
781 }
782 return result;
783 }
784
785 private static void close(PreparedStatement s) {
786 try {
787 s.close();
788 } catch (Throwable e) {
789 }
790 }
791
792 private static void close(ResultSet rs) {
793 try {
794 rs.close();
795 } catch (Throwable e) {
796 }
797 }
798
799 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
800 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
801 PreparedStatement s = null;
802 ResultSet rs = null;
803 cleanupExclusiveLock.readLock().lock();
804 try {
805 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
806 rs = s.executeQuery();
807 while (rs.next()) {
808 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
809 }
810 } finally {
811 cleanupExclusiveLock.readLock().unlock();
812 close(rs);
813 close(s);
814 }
815 return rc;
816 }
817
818 /**
819 * @return true if batchStements
820 */
821 public boolean isBatchStatments() {
822 return this.batchStatments;
823 }
824
825 /**
826 * @param batchStatments
827 */
828 public void setBatchStatments(boolean batchStatments) {
829 this.batchStatments = batchStatments;
830 }
831
832 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
833 this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
834 }
835
836 /**
837 * @return the statements
838 */
839 public Statements getStatements() {
840 return this.statements;
841 }
842
843 public void setStatements(Statements statements) {
844 this.statements = statements;
845 }
846
847 public int getMaxRows() {
848 return maxRows;
849 }
850
851 public void setMaxRows(int maxRows) {
852 this.maxRows = maxRows;
853 }
854
855 /**
856 * @param c
857 * @param destination
858 * @param clientId
859 * @param subscriberName
860 * @return
861 * @throws SQLException
862 * @throws IOException
863 */
864 public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
865 String clientId, String subscriberName) throws SQLException, IOException {
866 PreparedStatement s = null;
867 ResultSet rs = null;
868 cleanupExclusiveLock.readLock().lock();
869 try {
870 s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
871 s.setString(1, destination.getQualifiedName());
872 s.setString(2, clientId);
873 s.setString(3, subscriberName);
874 rs = s.executeQuery();
875 if (!rs.next()) {
876 return null;
877 }
878 return getBinaryData(rs, 1);
879 } finally {
880 close(rs);
881 cleanupExclusiveLock.readLock().unlock();
882 close(s);
883 }
884 }
885
886 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
887 IOException {
888 PreparedStatement s = null;
889 ResultSet rs = null;
890 int result = 0;
891 cleanupExclusiveLock.readLock().lock();
892 try {
893 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
894 s.setString(1, destination.getQualifiedName());
895 rs = s.executeQuery();
896 if (rs.next()) {
897 result = rs.getInt(1);
898 }
899 } finally {
900 cleanupExclusiveLock.readLock().unlock();
901 close(rs);
902 close(s);
903 }
904 return result;
905 }
906
907 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
908 long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
909 PreparedStatement s = null;
910 ResultSet rs = null;
911 cleanupExclusiveLock.readLock().lock();
912 try {
913 if (isPrioritizedMessages) {
914 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
915 } else {
916 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
917 }
918 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
919 s.setString(1, destination.getQualifiedName());
920 s.setLong(2, nextSeq);
921 if (isPrioritizedMessages) {
922 s.setLong(3, priority);
923 s.setLong(4, priority);
924 }
925 rs = s.executeQuery();
926 int count = 0;
927 if (this.statements.isUseExternalMessageReferences()) {
928 while (rs.next() && count < maxReturned) {
929 if (listener.recoverMessageReference(rs.getString(1))) {
930 count++;
931 } else {
932 LOG.debug("Stopped recover next messages");
933 break;
934 }
935 }
936 } else {
937 while (rs.next() && count < maxReturned) {
938 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
939 count++;
940 } else {
941 LOG.debug("Stopped recover next messages");
942 break;
943 }
944 }
945 }
946 } catch (Exception e) {
947 e.printStackTrace();
948 } finally {
949 cleanupExclusiveLock.readLock().unlock();
950 close(rs);
951 close(s);
952 }
953 }
954
955 /* public void dumpTables(Connection c, String destinationName, String clientId, String
956 subscriptionName) throws SQLException {
957 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
958 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
959 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
960 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
961 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
962 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
963 + " ORDER BY M.ID");
964 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
965 printQuery(s,System.out); }
966
967 public void dumpTables(Connection c) throws SQLException {
968 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
969 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
970 }
971
972 private void printQuery(Connection c, String query, PrintStream out)
973 throws SQLException {
974 printQuery(c.prepareStatement(query), out);
975 }
976
977 private void printQuery(PreparedStatement s, PrintStream out)
978 throws SQLException {
979
980 ResultSet set = null;
981 try {
982 set = s.executeQuery();
983 ResultSetMetaData metaData = set.getMetaData();
984 for (int i = 1; i <= metaData.getColumnCount(); i++) {
985 if (i == 1)
986 out.print("||");
987 out.print(metaData.getColumnName(i) + "||");
988 }
989 out.println();
990 while (set.next()) {
991 for (int i = 1; i <= metaData.getColumnCount(); i++) {
992 if (i == 1)
993 out.print("|");
994 out.print(set.getString(i) + "|");
995 }
996 out.println();
997 }
998 } finally {
999 try {
1000 set.close();
1001 } catch (Throwable ignore) {
1002 }
1003 try {
1004 s.close();
1005 } catch (Throwable ignore) {
1006 }
1007 }
1008 } */
1009
1010 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1011 throws SQLException, IOException {
1012 PreparedStatement s = null;
1013 ResultSet rs = null;
1014 cleanupExclusiveLock.readLock().lock();
1015 try {
1016 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1017 s.setString(1, id.toString());
1018 rs = s.executeQuery();
1019 long seq = -1;
1020 if (rs.next()) {
1021 seq = rs.getLong(1);
1022 }
1023 return seq;
1024 } finally {
1025 cleanupExclusiveLock.readLock().unlock();
1026 close(rs);
1027 close(s);
1028 }
1029 }
1030
1031 }