001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.EOFException;
024 import java.io.File;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.ObjectInputStream;
028 import java.io.ObjectOutputStream;
029 import java.io.OutputStream;
030 import java.util.*;
031 import java.util.Map.Entry;
032 import java.util.concurrent.atomic.AtomicBoolean;
033 import java.util.concurrent.atomic.AtomicLong;
034 import java.util.concurrent.locks.ReentrantReadWriteLock;
035
036 import org.apache.activemq.ActiveMQMessageAuditNoSync;
037 import org.apache.activemq.broker.BrokerService;
038 import org.apache.activemq.broker.BrokerServiceAware;
039 import org.apache.activemq.command.ConnectionId;
040 import org.apache.activemq.command.LocalTransactionId;
041 import org.apache.activemq.command.MessageId;
042 import org.apache.activemq.command.SubscriptionInfo;
043 import org.apache.activemq.command.TransactionId;
044 import org.apache.activemq.command.XATransactionId;
045 import org.apache.activemq.protobuf.Buffer;
046 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
047 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
048 import org.apache.activemq.store.kahadb.data.KahaDestination;
049 import org.apache.activemq.store.kahadb.data.KahaEntryType;
050 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
051 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
053 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
054 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
055 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
056 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
057 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
058 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
059 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
060 import org.apache.activemq.util.Callback;
061 import org.apache.activemq.util.IOHelper;
062 import org.apache.activemq.util.ServiceStopper;
063 import org.apache.activemq.util.ServiceSupport;
064 import org.slf4j.Logger;
065 import org.slf4j.LoggerFactory;
066 import org.apache.kahadb.index.BTreeIndex;
067 import org.apache.kahadb.index.BTreeVisitor;
068 import org.apache.kahadb.journal.DataFile;
069 import org.apache.kahadb.journal.Journal;
070 import org.apache.kahadb.journal.Location;
071 import org.apache.kahadb.page.Page;
072 import org.apache.kahadb.page.PageFile;
073 import org.apache.kahadb.page.Transaction;
074 import org.apache.kahadb.util.ByteSequence;
075 import org.apache.kahadb.util.DataByteArrayInputStream;
076 import org.apache.kahadb.util.DataByteArrayOutputStream;
077 import org.apache.kahadb.util.LockFile;
078 import org.apache.kahadb.util.LongMarshaller;
079 import org.apache.kahadb.util.Marshaller;
080 import org.apache.kahadb.util.Sequence;
081 import org.apache.kahadb.util.SequenceSet;
082 import org.apache.kahadb.util.StringMarshaller;
083 import org.apache.kahadb.util.VariableMarshaller;
084
085 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
086
087 protected BrokerService brokerService;
088
089 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
090 public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
091
092 protected static final Buffer UNMATCHED;
093 static {
094 UNMATCHED = new Buffer(new byte[]{});
095 }
096 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
097 private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
098
099 static final int CLOSED_STATE = 1;
100 static final int OPEN_STATE = 2;
101 static final long NOT_ACKED = -1;
102 static final long UNMATCHED_SEQ = -2;
103
104 static final int VERSION = 3;
105
106
107 protected class Metadata {
108 protected Page<Metadata> page;
109 protected int state;
110 protected BTreeIndex<String, StoredDestination> destinations;
111 protected Location lastUpdate;
112 protected Location firstInProgressTransactionLocation;
113 protected Location producerSequenceIdTrackerLocation = null;
114 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
115 protected int version = VERSION;
116 public void read(DataInput is) throws IOException {
117 state = is.readInt();
118 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
119 if (is.readBoolean()) {
120 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
121 } else {
122 lastUpdate = null;
123 }
124 if (is.readBoolean()) {
125 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
126 } else {
127 firstInProgressTransactionLocation = null;
128 }
129 try {
130 if (is.readBoolean()) {
131 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
132 } else {
133 producerSequenceIdTrackerLocation = null;
134 }
135 } catch (EOFException expectedOnUpgrade) {
136 }
137 try {
138 version = is.readInt();
139 }catch (EOFException expectedOnUpgrade) {
140 version=1;
141 }
142 LOG.info("KahaDB is version " + version);
143 }
144
145 public void write(DataOutput os) throws IOException {
146 os.writeInt(state);
147 os.writeLong(destinations.getPageId());
148
149 if (lastUpdate != null) {
150 os.writeBoolean(true);
151 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
152 } else {
153 os.writeBoolean(false);
154 }
155
156 if (firstInProgressTransactionLocation != null) {
157 os.writeBoolean(true);
158 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
159 } else {
160 os.writeBoolean(false);
161 }
162
163 if (producerSequenceIdTrackerLocation != null) {
164 os.writeBoolean(true);
165 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
166 } else {
167 os.writeBoolean(false);
168 }
169 os.writeInt(VERSION);
170 }
171 }
172
173 class MetadataMarshaller extends VariableMarshaller<Metadata> {
174 public Metadata readPayload(DataInput dataIn) throws IOException {
175 Metadata rc = new Metadata();
176 rc.read(dataIn);
177 return rc;
178 }
179
180 public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
181 object.write(dataOut);
182 }
183 }
184
185 protected PageFile pageFile;
186 protected Journal journal;
187 protected Metadata metadata = new Metadata();
188
189 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
190
191 protected boolean failIfDatabaseIsLocked;
192
193 protected boolean deleteAllMessages;
194 protected File directory = new File("KahaDB");
195 protected Thread checkpointThread;
196 protected boolean enableJournalDiskSyncs=true;
197 protected boolean archiveDataLogs;
198 protected File directoryArchive;
199 protected AtomicLong storeSize = new AtomicLong(0);
200 long checkpointInterval = 5*1000;
201 long cleanupInterval = 30*1000;
202 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
203 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
204 boolean enableIndexWriteAsync = false;
205 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
206
207
208 protected AtomicBoolean opened = new AtomicBoolean();
209 private LockFile lockFile;
210 private boolean ignoreMissingJournalfiles = false;
211 private int indexCacheSize = 10000;
212 private boolean checkForCorruptJournalFiles = false;
213 private boolean checksumJournalFiles = false;
214 private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
215 protected boolean forceRecoverIndex = false;
216 private final Object checkpointThreadLock = new Object();
217
218 public MessageDatabase() {
219 }
220
221 @Override
222 public void doStart() throws Exception {
223 load();
224 }
225
226 @Override
227 public void doStop(ServiceStopper stopper) throws Exception {
228 unload();
229 }
230
231 private void loadPageFile() throws IOException {
232 this.indexLock.writeLock().lock();
233 try {
234 final PageFile pageFile = getPageFile();
235 pageFile.load();
236 pageFile.tx().execute(new Transaction.Closure<IOException>() {
237 public void execute(Transaction tx) throws IOException {
238 if (pageFile.getPageCount() == 0) {
239 // First time this is created.. Initialize the metadata
240 Page<Metadata> page = tx.allocate();
241 assert page.getPageId() == 0;
242 page.set(metadata);
243 metadata.page = page;
244 metadata.state = CLOSED_STATE;
245 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
246
247 tx.store(metadata.page, metadataMarshaller, true);
248 } else {
249 Page<Metadata> page = tx.load(0, metadataMarshaller);
250 metadata = page.get();
251 metadata.page = page;
252 }
253 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
254 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
255 metadata.destinations.load(tx);
256 }
257 });
258 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
259 // Perhaps we should just keep an index of file
260 storedDestinations.clear();
261 pageFile.tx().execute(new Transaction.Closure<IOException>() {
262 public void execute(Transaction tx) throws IOException {
263 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
264 Entry<String, StoredDestination> entry = iterator.next();
265 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
266 storedDestinations.put(entry.getKey(), sd);
267 }
268 }
269 });
270 pageFile.flush();
271 }finally {
272 this.indexLock.writeLock().unlock();
273 }
274 }
275
276 private void startCheckpoint() {
277 synchronized (checkpointThreadLock) {
278 boolean start = false;
279 if (checkpointThread == null) {
280 start = true;
281 } else if (!checkpointThread.isAlive()) {
282 start = true;
283 LOG.info("KahaDB: Recovering checkpoint thread after death");
284 }
285 if (start) {
286 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
287 @Override
288 public void run() {
289 try {
290 long lastCleanup = System.currentTimeMillis();
291 long lastCheckpoint = System.currentTimeMillis();
292 // Sleep for a short time so we can periodically check
293 // to see if we need to exit this thread.
294 long sleepTime = Math.min(checkpointInterval, 500);
295 while (opened.get()) {
296 Thread.sleep(sleepTime);
297 long now = System.currentTimeMillis();
298 if( now - lastCleanup >= cleanupInterval ) {
299 checkpointCleanup(true);
300 lastCleanup = now;
301 lastCheckpoint = now;
302 } else if( now - lastCheckpoint >= checkpointInterval ) {
303 checkpointCleanup(false);
304 lastCheckpoint = now;
305 }
306 }
307 } catch (InterruptedException e) {
308 // Looks like someone really wants us to exit this thread...
309 } catch (IOException ioe) {
310 LOG.error("Checkpoint failed", ioe);
311 brokerService.handleIOException(ioe);
312 }
313 }
314 };
315
316 checkpointThread.setDaemon(true);
317 checkpointThread.start();
318 }
319 }
320 }
321
322 public void open() throws IOException {
323 if( opened.compareAndSet(false, true) ) {
324 getJournal().start();
325 loadPageFile();
326 startCheckpoint();
327 recover();
328 }
329 }
330
331 private void lock() throws IOException {
332 if( lockFile == null ) {
333 File lockFileName = new File(directory, "lock");
334 lockFile = new LockFile(lockFileName, true);
335 if (failIfDatabaseIsLocked) {
336 lockFile.lock();
337 } else {
338 while (true) {
339 try {
340 lockFile.lock();
341 break;
342 } catch (IOException e) {
343 LOG.info("Database "+lockFileName+" is locked... waiting " + (getDatabaseLockedWaitDelay() / 1000) + " seconds for the database to be unlocked. Reason: " + e);
344 try {
345 Thread.sleep(getDatabaseLockedWaitDelay());
346 } catch (InterruptedException e1) {
347 }
348 }
349 }
350 }
351 }
352 }
353
354 // for testing
355 public LockFile getLockFile() {
356 return lockFile;
357 }
358
359 public void load() throws IOException {
360
361 this.indexLock.writeLock().lock();
362 try {
363 lock();
364 if (deleteAllMessages) {
365 getJournal().start();
366 getJournal().delete();
367 getJournal().close();
368 journal = null;
369 getPageFile().delete();
370 LOG.info("Persistence store purged.");
371 deleteAllMessages = false;
372 }
373
374 open();
375 store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
376 }finally {
377 this.indexLock.writeLock().unlock();
378 }
379
380 }
381
382
383 public void close() throws IOException, InterruptedException {
384 if( opened.compareAndSet(true, false)) {
385 this.indexLock.writeLock().lock();
386 try {
387 pageFile.tx().execute(new Transaction.Closure<IOException>() {
388 public void execute(Transaction tx) throws IOException {
389 checkpointUpdate(tx, true);
390 }
391 });
392 pageFile.unload();
393 metadata = new Metadata();
394 }finally {
395 this.indexLock.writeLock().unlock();
396 }
397 journal.close();
398 synchronized (checkpointThreadLock) {
399 checkpointThread.join();
400 }
401 lockFile.unlock();
402 lockFile=null;
403 }
404 }
405
406 public void unload() throws IOException, InterruptedException {
407 this.indexLock.writeLock().lock();
408 try {
409 if( pageFile != null && pageFile.isLoaded() ) {
410 metadata.state = CLOSED_STATE;
411 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
412
413 pageFile.tx().execute(new Transaction.Closure<IOException>() {
414 public void execute(Transaction tx) throws IOException {
415 tx.store(metadata.page, metadataMarshaller, true);
416 }
417 });
418 }
419 }finally {
420 this.indexLock.writeLock().unlock();
421 }
422 close();
423 }
424
425 // public for testing
426 public Location getFirstInProgressTxLocation() {
427 Location l = null;
428 synchronized (inflightTransactions) {
429 if (!inflightTransactions.isEmpty()) {
430 l = inflightTransactions.values().iterator().next().get(0).getLocation();
431 }
432 if (!preparedTransactions.isEmpty()) {
433 Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
434 if (l==null || t.compareTo(l) <= 0) {
435 l = t;
436 }
437 }
438 }
439 return l;
440 }
441
442 /**
443 * Move all the messages that were in the journal into long term storage. We
444 * just replay and do a checkpoint.
445 *
446 * @throws IOException
447 * @throws IOException
448 * @throws IllegalStateException
449 */
450 private void recover() throws IllegalStateException, IOException {
451 this.indexLock.writeLock().lock();
452 try {
453
454 long start = System.currentTimeMillis();
455 Location producerAuditPosition = recoverProducerAudit();
456 Location lastIndoubtPosition = getRecoveryPosition();
457
458 Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
459
460 if (recoveryPosition != null) {
461 int redoCounter = 0;
462 LOG.info("Recovering from the journal ...");
463 while (recoveryPosition != null) {
464 JournalCommand<?> message = load(recoveryPosition);
465 metadata.lastUpdate = recoveryPosition;
466 process(message, recoveryPosition, lastIndoubtPosition);
467 redoCounter++;
468 recoveryPosition = journal.getNextLocation(recoveryPosition);
469 }
470 long end = System.currentTimeMillis();
471 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
472 }
473
474 // We may have to undo some index updates.
475 pageFile.tx().execute(new Transaction.Closure<IOException>() {
476 public void execute(Transaction tx) throws IOException {
477 recoverIndex(tx);
478 }
479 });
480
481 // rollback any recovered inflight local transactions
482 Set<TransactionId> toRollback = new HashSet<TransactionId>();
483 synchronized (inflightTransactions) {
484 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
485 TransactionId id = it.next();
486 if (id.isLocalTransaction()) {
487 toRollback.add(id);
488 }
489 }
490 for (TransactionId tx: toRollback) {
491 LOG.debug("rolling back recovered indoubt local transaction " + tx);
492 store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
493 }
494 }
495 }finally {
496 this.indexLock.writeLock().unlock();
497 }
498 }
499
500 private Location minimum(Location producerAuditPosition,
501 Location lastIndoubtPosition) {
502 Location min = null;
503 if (producerAuditPosition != null) {
504 min = producerAuditPosition;
505 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
506 min = lastIndoubtPosition;
507 }
508 } else {
509 min = lastIndoubtPosition;
510 }
511 return min;
512 }
513
514 private Location recoverProducerAudit() throws IOException {
515 if (metadata.producerSequenceIdTrackerLocation != null) {
516 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
517 try {
518 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
519 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
520 } catch (ClassNotFoundException cfe) {
521 IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
522 ioe.initCause(cfe);
523 throw ioe;
524 }
525 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
526 } else {
527 // got no audit stored so got to recreate via replay from start of the journal
528 return journal.getNextLocation(null);
529 }
530 }
531
532 protected void recoverIndex(Transaction tx) throws IOException {
533 long start = System.currentTimeMillis();
534 // It is possible index updates got applied before the journal updates..
535 // in that case we need to removed references to messages that are not in the journal
536 final Location lastAppendLocation = journal.getLastAppendLocation();
537 long undoCounter=0;
538
539 // Go through all the destinations to see if they have messages past the lastAppendLocation
540 for (StoredDestination sd : storedDestinations.values()) {
541
542 final ArrayList<Long> matches = new ArrayList<Long>();
543 // Find all the Locations that are >= than the last Append Location.
544 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
545 @Override
546 protected void matched(Location key, Long value) {
547 matches.add(value);
548 }
549 });
550
551
552 for (Long sequenceId : matches) {
553 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
554 sd.locationIndex.remove(tx, keys.location);
555 sd.messageIdIndex.remove(tx, keys.messageId);
556 metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
557 undoCounter++;
558 // TODO: do we need to modify the ack positions for the pub sub case?
559 }
560 }
561
562 long end = System.currentTimeMillis();
563 if( undoCounter > 0 ) {
564 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
565 // should do sync writes to the journal.
566 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
567 }
568
569 undoCounter = 0;
570 start = System.currentTimeMillis();
571
572 // Lets be extra paranoid here and verify that all the datafiles being referenced
573 // by the indexes still exists.
574
575 final SequenceSet ss = new SequenceSet();
576 for (StoredDestination sd : storedDestinations.values()) {
577 // Use a visitor to cut down the number of pages that we load
578 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
579 int last=-1;
580
581 public boolean isInterestedInKeysBetween(Location first, Location second) {
582 if( first==null ) {
583 return !ss.contains(0, second.getDataFileId());
584 } else if( second==null ) {
585 return true;
586 } else {
587 return !ss.contains(first.getDataFileId(), second.getDataFileId());
588 }
589 }
590
591 public void visit(List<Location> keys, List<Long> values) {
592 for (Location l : keys) {
593 int fileId = l.getDataFileId();
594 if( last != fileId ) {
595 ss.add(fileId);
596 last = fileId;
597 }
598 }
599 }
600
601 });
602 }
603 HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
604 while( !ss.isEmpty() ) {
605 missingJournalFiles.add( (int)ss.removeFirst() );
606 }
607 missingJournalFiles.removeAll( journal.getFileMap().keySet() );
608
609 if( !missingJournalFiles.isEmpty() ) {
610 LOG.info("Some journal files are missing: "+missingJournalFiles);
611 }
612
613 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
614 for (Integer missing : missingJournalFiles) {
615 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
616 }
617
618 if ( checkForCorruptJournalFiles ) {
619 Collection<DataFile> dataFiles = journal.getFileMap().values();
620 for (DataFile dataFile : dataFiles) {
621 int id = dataFile.getDataFileId();
622 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
623 Sequence seq = dataFile.getCorruptedBlocks().getHead();
624 while( seq!=null ) {
625 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
626 seq = seq.getNext();
627 }
628 }
629 }
630
631 if( !missingPredicates.isEmpty() ) {
632 for (StoredDestination sd : storedDestinations.values()) {
633
634 final ArrayList<Long> matches = new ArrayList<Long>();
635 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
636 @Override
637 protected void matched(Location key, Long value) {
638 matches.add(value);
639 }
640 });
641
642 // If somes message references are affected by the missing data files...
643 if( !matches.isEmpty() ) {
644
645 // We either 'gracefully' recover dropping the missing messages or
646 // we error out.
647 if( ignoreMissingJournalfiles ) {
648 // Update the index to remove the references to the missing data
649 for (Long sequenceId : matches) {
650 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
651 sd.locationIndex.remove(tx, keys.location);
652 sd.messageIdIndex.remove(tx, keys.messageId);
653 undoCounter++;
654 // TODO: do we need to modify the ack positions for the pub sub case?
655 }
656
657 } else {
658 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
659 }
660 }
661 }
662 }
663
664 end = System.currentTimeMillis();
665 if( undoCounter > 0 ) {
666 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
667 // should do sync writes to the journal.
668 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
669 }
670 }
671
672 private Location nextRecoveryPosition;
673 private Location lastRecoveryPosition;
674
675 public void incrementalRecover() throws IOException {
676 this.indexLock.writeLock().lock();
677 try {
678 if( nextRecoveryPosition == null ) {
679 if( lastRecoveryPosition==null ) {
680 nextRecoveryPosition = getRecoveryPosition();
681 } else {
682 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
683 }
684 }
685 while (nextRecoveryPosition != null) {
686 lastRecoveryPosition = nextRecoveryPosition;
687 metadata.lastUpdate = lastRecoveryPosition;
688 JournalCommand<?> message = load(lastRecoveryPosition);
689 process(message, lastRecoveryPosition);
690 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
691 }
692 }finally {
693 this.indexLock.writeLock().unlock();
694 }
695 }
696
697 public Location getLastUpdatePosition() throws IOException {
698 return metadata.lastUpdate;
699 }
700
701 private Location getRecoveryPosition() throws IOException {
702
703 if (!this.forceRecoverIndex) {
704
705 // If we need to recover the transactions..
706 if (metadata.firstInProgressTransactionLocation != null) {
707 return metadata.firstInProgressTransactionLocation;
708 }
709
710 // Perhaps there were no transactions...
711 if( metadata.lastUpdate!=null) {
712 // Start replay at the record after the last one recorded in the index file.
713 return journal.getNextLocation(metadata.lastUpdate);
714 }
715 }
716 // This loads the first position.
717 return journal.getNextLocation(null);
718 }
719
720 protected void checkpointCleanup(final boolean cleanup) throws IOException {
721 long start;
722 this.indexLock.writeLock().lock();
723 try {
724 start = System.currentTimeMillis();
725 if( !opened.get() ) {
726 return;
727 }
728 pageFile.tx().execute(new Transaction.Closure<IOException>() {
729 public void execute(Transaction tx) throws IOException {
730 checkpointUpdate(tx, cleanup);
731 }
732 });
733 }finally {
734 this.indexLock.writeLock().unlock();
735 }
736 long end = System.currentTimeMillis();
737 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
738 LOG.info("Slow KahaDB access: cleanup took "+(end-start));
739 }
740 }
741
742
743 public void checkpoint(Callback closure) throws Exception {
744 this.indexLock.writeLock().lock();
745 try {
746 pageFile.tx().execute(new Transaction.Closure<IOException>() {
747 public void execute(Transaction tx) throws IOException {
748 checkpointUpdate(tx, false);
749 }
750 });
751 closure.execute();
752 }finally {
753 this.indexLock.writeLock().unlock();
754 }
755 }
756
757 // /////////////////////////////////////////////////////////////////
758 // Methods call by the broker to update and query the store.
759 // /////////////////////////////////////////////////////////////////
760 public Location store(JournalCommand<?> data) throws IOException {
761 return store(data, false, null,null);
762 }
763
764 /**
765 * All updated are are funneled through this method. The updates are converted
766 * to a JournalMessage which is logged to the journal and then the data from
767 * the JournalMessage is used to update the index just like it would be done
768 * during a recovery process.
769 */
770 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
771 if (before != null) {
772 before.run();
773 }
774 try {
775 int size = data.serializedSizeFramed();
776 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
777 os.writeByte(data.type().getNumber());
778 data.writeFramed(os);
779
780 long start = System.currentTimeMillis();
781 Location location = journal.write(os.toByteSequence(), sync);
782 long start2 = System.currentTimeMillis();
783 process(data, location);
784 long end = System.currentTimeMillis();
785 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
786 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
787 }
788
789 this.indexLock.writeLock().lock();
790 try {
791 metadata.lastUpdate = location;
792 }finally {
793 this.indexLock.writeLock().unlock();
794 }
795 if (!checkpointThread.isAlive()) {
796 startCheckpoint();
797 }
798 if (after != null) {
799 after.run();
800 }
801 return location;
802 } catch (IOException ioe) {
803 LOG.error("KahaDB failed to store to Journal", ioe);
804 brokerService.handleIOException(ioe);
805 throw ioe;
806 }
807 }
808
809 /**
810 * Loads a previously stored JournalMessage
811 *
812 * @param location
813 * @return
814 * @throws IOException
815 */
816 public JournalCommand<?> load(Location location) throws IOException {
817 ByteSequence data = journal.read(location);
818 DataByteArrayInputStream is = new DataByteArrayInputStream(data);
819 byte readByte = is.readByte();
820 KahaEntryType type = KahaEntryType.valueOf(readByte);
821 if( type == null ) {
822 throw new IOException("Could not load journal record. Invalid location: "+location);
823 }
824 JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
825 message.mergeFramed(is);
826 return message;
827 }
828
829 /**
830 * do minimal recovery till we reach the last inDoubtLocation
831 * @param data
832 * @param location
833 * @param inDoubtlocation
834 * @throws IOException
835 */
836 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
837 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
838 process(data, location);
839 } else {
840 // just recover producer audit
841 data.visit(new Visitor() {
842 public void visit(KahaAddMessageCommand command) throws IOException {
843 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
844 }
845 });
846 }
847 }
848
849 // /////////////////////////////////////////////////////////////////
850 // Journaled record processing methods. Once the record is journaled,
851 // these methods handle applying the index updates. These may be called
852 // from the recovery method too so they need to be idempotent
853 // /////////////////////////////////////////////////////////////////
854
855 void process(JournalCommand<?> data, final Location location) throws IOException {
856 data.visit(new Visitor() {
857 @Override
858 public void visit(KahaAddMessageCommand command) throws IOException {
859 process(command, location);
860 }
861
862 @Override
863 public void visit(KahaRemoveMessageCommand command) throws IOException {
864 process(command, location);
865 }
866
867 @Override
868 public void visit(KahaPrepareCommand command) throws IOException {
869 process(command, location);
870 }
871
872 @Override
873 public void visit(KahaCommitCommand command) throws IOException {
874 process(command, location);
875 }
876
877 @Override
878 public void visit(KahaRollbackCommand command) throws IOException {
879 process(command, location);
880 }
881
882 @Override
883 public void visit(KahaRemoveDestinationCommand command) throws IOException {
884 process(command, location);
885 }
886
887 @Override
888 public void visit(KahaSubscriptionCommand command) throws IOException {
889 process(command, location);
890 }
891 });
892 }
893
894 protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
895 if (command.hasTransactionInfo()) {
896 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
897 inflightTx.add(new AddOpperation(command, location));
898 } else {
899 this.indexLock.writeLock().lock();
900 try {
901 pageFile.tx().execute(new Transaction.Closure<IOException>() {
902 public void execute(Transaction tx) throws IOException {
903 upadateIndex(tx, command, location);
904 }
905 });
906 }finally {
907 this.indexLock.writeLock().unlock();
908 }
909 }
910 }
911
912 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
913 if (command.hasTransactionInfo()) {
914 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
915 inflightTx.add(new RemoveOpperation(command, location));
916 } else {
917 this.indexLock.writeLock().lock();
918 try {
919 pageFile.tx().execute(new Transaction.Closure<IOException>() {
920 public void execute(Transaction tx) throws IOException {
921 updateIndex(tx, command, location);
922 }
923 });
924 }finally {
925 this.indexLock.writeLock().unlock();
926 }
927 }
928
929 }
930
931 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
932 this.indexLock.writeLock().lock();
933 try {
934 pageFile.tx().execute(new Transaction.Closure<IOException>() {
935 public void execute(Transaction tx) throws IOException {
936 updateIndex(tx, command, location);
937 }
938 });
939 }finally {
940 this.indexLock.writeLock().unlock();
941 }
942 }
943
944 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
945 this.indexLock.writeLock().lock();
946 try {
947 pageFile.tx().execute(new Transaction.Closure<IOException>() {
948 public void execute(Transaction tx) throws IOException {
949 updateIndex(tx, command, location);
950 }
951 });
952 }finally {
953 this.indexLock.writeLock().unlock();
954 }
955 }
956
957 protected void process(KahaCommitCommand command, Location location) throws IOException {
958 TransactionId key = key(command.getTransactionInfo());
959 List<Operation> inflightTx;
960 synchronized (inflightTransactions) {
961 inflightTx = inflightTransactions.remove(key);
962 if (inflightTx == null) {
963 inflightTx = preparedTransactions.remove(key);
964 }
965 }
966 if (inflightTx == null) {
967 return;
968 }
969
970 final List<Operation> messagingTx = inflightTx;
971 this.indexLock.writeLock().lock();
972 try {
973 pageFile.tx().execute(new Transaction.Closure<IOException>() {
974 public void execute(Transaction tx) throws IOException {
975 for (Operation op : messagingTx) {
976 op.execute(tx);
977 }
978 }
979 });
980 }finally {
981 this.indexLock.writeLock().unlock();
982 }
983 }
984
985 protected void process(KahaPrepareCommand command, Location location) {
986 TransactionId key = key(command.getTransactionInfo());
987 synchronized (inflightTransactions) {
988 List<Operation> tx = inflightTransactions.remove(key);
989 if (tx != null) {
990 preparedTransactions.put(key, tx);
991 }
992 }
993 }
994
995 protected void process(KahaRollbackCommand command, Location location) {
996 TransactionId key = key(command.getTransactionInfo());
997 synchronized (inflightTransactions) {
998 List<Operation> tx = inflightTransactions.remove(key);
999 if (tx == null) {
1000 preparedTransactions.remove(key);
1001 }
1002 }
1003 }
1004
1005 // /////////////////////////////////////////////////////////////////
1006 // These methods do the actual index updates.
1007 // /////////////////////////////////////////////////////////////////
1008
1009 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1010 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1011
1012 void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1013 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1014
1015 // Skip adding the message to the index if this is a topic and there are
1016 // no subscriptions.
1017 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1018 return;
1019 }
1020
1021 // Add the message.
1022 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1023 long id = sd.orderIndex.getNextMessageId(priority);
1024 Long previous = sd.locationIndex.put(tx, location, id);
1025 if (previous == null) {
1026 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1027 if (previous == null) {
1028 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1029 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1030 addAckLocationForNewMessage(tx, sd, id);
1031 }
1032 } else {
1033 // If the message ID as indexed, then the broker asked us to
1034 // store a DUP
1035 // message. Bad BOY! Don't do it, and log a warning.
1036 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1037 // TODO: consider just rolling back the tx.
1038 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1039 sd.locationIndex.remove(tx, location);
1040 }
1041 } else {
1042 // restore the previous value.. Looks like this was a redo of a
1043 // previously
1044 // added message. We don't want to assign it a new id as the other
1045 // indexes would
1046 // be wrong..
1047 //
1048 // TODO: consider just rolling back the tx.
1049 sd.locationIndex.put(tx, location, previous);
1050 }
1051 // record this id in any event, initial send or recovery
1052 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1053 }
1054
1055 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1056 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1057 if (!command.hasSubscriptionKey()) {
1058
1059 // In the queue case we just remove the message from the index..
1060 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1061 if (sequenceId != null) {
1062 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1063 if (keys != null) {
1064 sd.locationIndex.remove(tx, keys.location);
1065 recordAckMessageReferenceLocation(ackLocation, keys.location);
1066 }
1067 }
1068 } else {
1069 // In the topic case we need remove the message once it's been acked
1070 // by all the subs
1071 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1072
1073 // Make sure it's a valid message id...
1074 if (sequence != null) {
1075 String subscriptionKey = command.getSubscriptionKey();
1076 if (command.getAck() != UNMATCHED) {
1077 sd.orderIndex.get(tx, sequence);
1078 byte priority = sd.orderIndex.lastGetPriority();
1079 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1080 }
1081 // The following method handles deleting un-referenced messages.
1082 removeAckLocation(tx, sd, subscriptionKey, sequence);
1083 }
1084
1085 }
1086 }
1087
1088 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1089 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1090 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1091 if (referenceFileIds == null) {
1092 referenceFileIds = new HashSet<Integer>();
1093 referenceFileIds.add(messageLocation.getDataFileId());
1094 ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1095 } else {
1096 Integer id = Integer.valueOf(messageLocation.getDataFileId());
1097 if (!referenceFileIds.contains(id)) {
1098 referenceFileIds.add(id);
1099 }
1100 }
1101 }
1102
1103 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1104 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1105 sd.orderIndex.remove(tx);
1106
1107 sd.locationIndex.clear(tx);
1108 sd.locationIndex.unload(tx);
1109 tx.free(sd.locationIndex.getPageId());
1110
1111 sd.messageIdIndex.clear(tx);
1112 sd.messageIdIndex.unload(tx);
1113 tx.free(sd.messageIdIndex.getPageId());
1114
1115 if (sd.subscriptions != null) {
1116 sd.subscriptions.clear(tx);
1117 sd.subscriptions.unload(tx);
1118 tx.free(sd.subscriptions.getPageId());
1119
1120 sd.subscriptionAcks.clear(tx);
1121 sd.subscriptionAcks.unload(tx);
1122 tx.free(sd.subscriptionAcks.getPageId());
1123
1124 sd.ackPositions.clear(tx);
1125 sd.ackPositions.unload(tx);
1126 tx.free(sd.ackPositions.getPageId());
1127 }
1128
1129 String key = key(command.getDestination());
1130 storedDestinations.remove(key);
1131 metadata.destinations.remove(tx, key);
1132 }
1133
1134 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1135 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1136 final String subscriptionKey = command.getSubscriptionKey();
1137
1138 // If set then we are creating it.. otherwise we are destroying the sub
1139 if (command.hasSubscriptionInfo()) {
1140 sd.subscriptions.put(tx, subscriptionKey, command);
1141 long ackLocation=NOT_ACKED;
1142 if (!command.getRetroactive()) {
1143 ackLocation = sd.orderIndex.nextMessageId-1;
1144 } else {
1145 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
1146 }
1147 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1148 } else {
1149 // delete the sub...
1150 sd.subscriptions.remove(tx, subscriptionKey);
1151 sd.subscriptionAcks.remove(tx, subscriptionKey);
1152 removeAckLocationsForSub(tx, sd, subscriptionKey);
1153 }
1154 }
1155
1156 /**
1157 * @param tx
1158 * @throws IOException
1159 */
1160 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1161 LOG.debug("Checkpoint started.");
1162
1163 // reflect last update exclusive of current checkpoint
1164 Location firstTxLocation = metadata.lastUpdate;
1165
1166 metadata.state = OPEN_STATE;
1167 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1168 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1169 tx.store(metadata.page, metadataMarshaller, true);
1170 pageFile.flush();
1171
1172 if( cleanup ) {
1173
1174 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1175 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1176
1177 LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1178
1179 // Don't GC files under replication
1180 if( journalFilesBeingReplicated!=null ) {
1181 gcCandidateSet.removeAll(journalFilesBeingReplicated);
1182 }
1183
1184 // Don't GC files after the first in progress tx
1185 if( metadata.firstInProgressTransactionLocation!=null ) {
1186 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1187 firstTxLocation = metadata.firstInProgressTransactionLocation;
1188 };
1189 }
1190
1191 if( firstTxLocation!=null ) {
1192 while( !gcCandidateSet.isEmpty() ) {
1193 Integer last = gcCandidateSet.last();
1194 if( last >= firstTxLocation.getDataFileId() ) {
1195 gcCandidateSet.remove(last);
1196 } else {
1197 break;
1198 }
1199 }
1200 LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1201 }
1202
1203 // Go through all the destinations to see if any of them can remove GC candidates.
1204 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1205 if( gcCandidateSet.isEmpty() ) {
1206 break;
1207 }
1208
1209 // Use a visitor to cut down the number of pages that we load
1210 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1211 int last=-1;
1212 public boolean isInterestedInKeysBetween(Location first, Location second) {
1213 if( first==null ) {
1214 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1215 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1216 subset.remove(second.getDataFileId());
1217 }
1218 return !subset.isEmpty();
1219 } else if( second==null ) {
1220 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1221 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1222 subset.remove(first.getDataFileId());
1223 }
1224 return !subset.isEmpty();
1225 } else {
1226 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1227 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1228 subset.remove(first.getDataFileId());
1229 }
1230 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1231 subset.remove(second.getDataFileId());
1232 }
1233 return !subset.isEmpty();
1234 }
1235 }
1236
1237 public void visit(List<Location> keys, List<Long> values) {
1238 for (Location l : keys) {
1239 int fileId = l.getDataFileId();
1240 if( last != fileId ) {
1241 gcCandidateSet.remove(fileId);
1242 last = fileId;
1243 }
1244 }
1245 }
1246 });
1247 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1248 }
1249
1250 // check we are not deleting file with ack for in-use journal files
1251 LOG.trace("gc candidates: " + gcCandidateSet);
1252 final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1253 Iterator<Integer> candidates = gcCandidateSet.iterator();
1254 while (candidates.hasNext()) {
1255 Integer candidate = candidates.next();
1256 Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1257 if (referencedFileIds != null) {
1258 for (Integer referencedFileId : referencedFileIds) {
1259 if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1260 // active file that is not targeted for deletion is referenced so don't delete
1261 candidates.remove();
1262 break;
1263 }
1264 }
1265 if (gcCandidateSet.contains(candidate)) {
1266 ackMessageFileMap.remove(candidate);
1267 } else {
1268 LOG.trace("not removing data file: " + candidate
1269 + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1270 }
1271 }
1272 }
1273
1274 if( !gcCandidateSet.isEmpty() ) {
1275 LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
1276 journal.removeDataFiles(gcCandidateSet);
1277 }
1278 }
1279
1280 LOG.debug("Checkpoint done.");
1281 }
1282
1283 private Location checkpointProducerAudit() throws IOException {
1284 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1285 ObjectOutputStream oout = new ObjectOutputStream(baos);
1286 oout.writeObject(metadata.producerSequenceIdTracker);
1287 oout.flush();
1288 oout.close();
1289 return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
1290 }
1291
1292 public HashSet<Integer> getJournalFilesBeingReplicated() {
1293 return journalFilesBeingReplicated;
1294 }
1295
1296 // /////////////////////////////////////////////////////////////////
1297 // StoredDestination related implementation methods.
1298 // /////////////////////////////////////////////////////////////////
1299
1300
1301 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1302
1303 class StoredSubscription {
1304 SubscriptionInfo subscriptionInfo;
1305 String lastAckId;
1306 Location lastAckLocation;
1307 Location cursor;
1308 }
1309
1310 static class MessageKeys {
1311 final String messageId;
1312 final Location location;
1313
1314 public MessageKeys(String messageId, Location location) {
1315 this.messageId=messageId;
1316 this.location=location;
1317 }
1318
1319 @Override
1320 public String toString() {
1321 return "["+messageId+","+location+"]";
1322 }
1323 }
1324
1325 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1326 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1327
1328 public MessageKeys readPayload(DataInput dataIn) throws IOException {
1329 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1330 }
1331
1332 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1333 dataOut.writeUTF(object.messageId);
1334 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1335 }
1336 }
1337
1338 class LastAck {
1339 long lastAckedSequence;
1340 byte priority;
1341
1342 public LastAck(LastAck source) {
1343 this.lastAckedSequence = source.lastAckedSequence;
1344 this.priority = source.priority;
1345 }
1346
1347 public LastAck() {
1348 this.priority = MessageOrderIndex.HI;
1349 }
1350
1351 public LastAck(long ackLocation) {
1352 this.lastAckedSequence = ackLocation;
1353 this.priority = MessageOrderIndex.LO;
1354 }
1355
1356 public LastAck(long ackLocation, byte priority) {
1357 this.lastAckedSequence = ackLocation;
1358 this.priority = priority;
1359 }
1360
1361 public String toString() {
1362 return "[" + lastAckedSequence + ":" + priority + "]";
1363 }
1364 }
1365
1366 protected class LastAckMarshaller implements Marshaller<LastAck> {
1367
1368 public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1369 dataOut.writeLong(object.lastAckedSequence);
1370 dataOut.writeByte(object.priority);
1371 }
1372
1373 public LastAck readPayload(DataInput dataIn) throws IOException {
1374 LastAck lastAcked = new LastAck();
1375 lastAcked.lastAckedSequence = dataIn.readLong();
1376 if (metadata.version >= 3) {
1377 lastAcked.priority = dataIn.readByte();
1378 }
1379 return lastAcked;
1380 }
1381
1382 public int getFixedSize() {
1383 return 9;
1384 }
1385
1386 public LastAck deepCopy(LastAck source) {
1387 return new LastAck(source);
1388 }
1389
1390 public boolean isDeepCopySupported() {
1391 return true;
1392 }
1393 }
1394
1395 class StoredDestination {
1396
1397 MessageOrderIndex orderIndex = new MessageOrderIndex();
1398 BTreeIndex<Location, Long> locationIndex;
1399 BTreeIndex<String, Long> messageIdIndex;
1400
1401 // These bits are only set for Topics
1402 BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1403 BTreeIndex<String, LastAck> subscriptionAcks;
1404 HashMap<String, MessageOrderCursor> subscriptionCursors;
1405 BTreeIndex<Long, HashSet<String>> ackPositions;
1406 }
1407
1408 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1409
1410 public StoredDestination readPayload(DataInput dataIn) throws IOException {
1411 final StoredDestination value = new StoredDestination();
1412 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1413 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1414 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1415
1416 if (dataIn.readBoolean()) {
1417 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1418 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1419 if (metadata.version >= 3) {
1420 value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1421 } else {
1422 // upgrade
1423 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1424 public void execute(Transaction tx) throws IOException {
1425 value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
1426 value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1427 value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1428 value.ackPositions.load(tx);
1429 }
1430 });
1431 }
1432 }
1433 if (metadata.version >= 2) {
1434 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1435 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1436 } else {
1437 // upgrade
1438 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1439 public void execute(Transaction tx) throws IOException {
1440 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1441 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1442 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1443 value.orderIndex.lowPriorityIndex.load(tx);
1444
1445 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1446 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1447 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1448 value.orderIndex.highPriorityIndex.load(tx);
1449 }
1450 });
1451 }
1452
1453 return value;
1454 }
1455
1456 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1457 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1458 dataOut.writeLong(value.locationIndex.getPageId());
1459 dataOut.writeLong(value.messageIdIndex.getPageId());
1460 if (value.subscriptions != null) {
1461 dataOut.writeBoolean(true);
1462 dataOut.writeLong(value.subscriptions.getPageId());
1463 dataOut.writeLong(value.subscriptionAcks.getPageId());
1464 dataOut.writeLong(value.ackPositions.getPageId());
1465 } else {
1466 dataOut.writeBoolean(false);
1467 }
1468 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1469 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1470 }
1471 }
1472
1473 static class LocationMarshaller implements Marshaller<Location> {
1474 final static LocationMarshaller INSTANCE = new LocationMarshaller();
1475
1476 public Location readPayload(DataInput dataIn) throws IOException {
1477 Location rc = new Location();
1478 rc.setDataFileId(dataIn.readInt());
1479 rc.setOffset(dataIn.readInt());
1480 return rc;
1481 }
1482
1483 public void writePayload(Location object, DataOutput dataOut) throws IOException {
1484 dataOut.writeInt(object.getDataFileId());
1485 dataOut.writeInt(object.getOffset());
1486 }
1487
1488 public int getFixedSize() {
1489 return 8;
1490 }
1491
1492 public Location deepCopy(Location source) {
1493 return new Location(source);
1494 }
1495
1496 public boolean isDeepCopySupported() {
1497 return true;
1498 }
1499 }
1500
1501 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1502 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1503
1504 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1505 KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1506 rc.mergeFramed((InputStream)dataIn);
1507 return rc;
1508 }
1509
1510 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1511 object.writeFramed((OutputStream)dataOut);
1512 }
1513 }
1514
1515 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1516 String key = key(destination);
1517 StoredDestination rc = storedDestinations.get(key);
1518 if (rc == null) {
1519 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1520 rc = loadStoredDestination(tx, key, topic);
1521 // Cache it. We may want to remove/unload destinations from the
1522 // cache that are not used for a while
1523 // to reduce memory usage.
1524 storedDestinations.put(key, rc);
1525 }
1526 return rc;
1527 }
1528
1529
1530 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1531 String key = key(destination);
1532 StoredDestination rc = storedDestinations.get(key);
1533 if (rc == null && metadata.destinations.containsKey(tx, key)) {
1534 rc = getStoredDestination(destination, tx);
1535 }
1536 return rc;
1537 }
1538
1539 /**
1540 * @param tx
1541 * @param key
1542 * @param topic
1543 * @return
1544 * @throws IOException
1545 */
1546 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1547 // Try to load the existing indexes..
1548 StoredDestination rc = metadata.destinations.get(tx, key);
1549 if (rc == null) {
1550 // Brand new destination.. allocate indexes for it.
1551 rc = new StoredDestination();
1552 rc.orderIndex.allocate(tx);
1553 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1554 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1555
1556 if (topic) {
1557 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1558 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1559 rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
1560 }
1561 metadata.destinations.put(tx, key, rc);
1562 }
1563
1564 // Configure the marshalers and load.
1565 rc.orderIndex.load(tx);
1566
1567 // Figure out the next key using the last entry in the destination.
1568 rc.orderIndex.configureLast(tx);
1569
1570 rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
1571 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1572 rc.locationIndex.load(tx);
1573
1574 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1575 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1576 rc.messageIdIndex.load(tx);
1577
1578 // If it was a topic...
1579 if (topic) {
1580
1581 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1582 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1583 rc.subscriptions.load(tx);
1584
1585 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1586 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1587 rc.subscriptionAcks.load(tx);
1588
1589 rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1590 rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1591 rc.ackPositions.load(tx);
1592
1593 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1594
1595 if (metadata.version < 3) {
1596
1597 // on upgrade need to fill ackLocation with available messages past last ack
1598 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1599 Entry<String, LastAck> entry = iterator.next();
1600 for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1601 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1602 Long sequence = orderIterator.next().getKey();
1603 addAckLocation(tx, rc, sequence, entry.getKey());
1604 }
1605 // modify so it is upgraded
1606 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1607 }
1608 }
1609
1610 if (rc.orderIndex.nextMessageId == 0) {
1611 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1612 if (!rc.subscriptionAcks.isEmpty(tx)) {
1613 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1614 Entry<String, LastAck> entry = iterator.next();
1615 rc.orderIndex.nextMessageId =
1616 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1617 }
1618 }
1619 } else {
1620 // update based on ackPositions for unmatched, last entry is always the next
1621 if (!rc.ackPositions.isEmpty(tx)) {
1622 Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
1623 rc.orderIndex.nextMessageId =
1624 Math.max(rc.orderIndex.nextMessageId, last.getKey());
1625 }
1626 }
1627
1628 }
1629
1630 if (metadata.version < 3) {
1631 // store again after upgrade
1632 metadata.destinations.put(tx, key, rc);
1633 }
1634 return rc;
1635 }
1636
1637 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1638 HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
1639 if (hs == null) {
1640 hs = new HashSet<String>();
1641 }
1642 hs.add(subscriptionKey);
1643 // every ack location addition needs to be a btree modification to get it stored
1644 sd.ackPositions.put(tx, messageSequence, hs);
1645 }
1646
1647 // new sub is interested in potentially all existing messages
1648 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1649 for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) {
1650 Entry<Long, HashSet<String>> entry = iterator.next();
1651 entry.getValue().add(subscriptionKey);
1652 sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
1653 }
1654 }
1655
1656 final HashSet nextMessageIdMarker = new HashSet<String>();
1657 // on a new message add, all existing subs are interested in this message
1658 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1659 HashSet hs = new HashSet<String>();
1660 for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1661 Entry<String, LastAck> entry = iterator.next();
1662 hs.add(entry.getKey());
1663 }
1664 sd.ackPositions.put(tx, messageSequence, hs);
1665 // add empty next to keep track of nextMessage
1666 sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
1667 }
1668
1669 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1670 if (!sd.ackPositions.isEmpty(tx)) {
1671 Long end = sd.ackPositions.getLast(tx).getKey();
1672 for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) {
1673 removeAckLocation(tx, sd, subscriptionKey, sequence);
1674 }
1675 }
1676 }
1677
1678 /**
1679 * @param tx
1680 * @param sd
1681 * @param subscriptionKey
1682 * @param sequenceId
1683 * @throws IOException
1684 */
1685 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
1686 // Remove the sub from the previous location set..
1687 if (sequenceId != null) {
1688 HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
1689 if (hs != null) {
1690 hs.remove(subscriptionKey);
1691 if (hs.isEmpty()) {
1692 HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
1693 sd.ackPositions.remove(tx, sequenceId);
1694
1695 // Find all the entries that need to get deleted.
1696 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1697 sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1698
1699 // Do the actual deletes.
1700 for (Entry<Long, MessageKeys> entry : deletes) {
1701 sd.locationIndex.remove(tx, entry.getValue().location);
1702 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1703 sd.orderIndex.remove(tx, entry.getKey());
1704 }
1705 } else {
1706 // update
1707 sd.ackPositions.put(tx, sequenceId, hs);
1708 }
1709 }
1710 }
1711 }
1712
1713 private String key(KahaDestination destination) {
1714 return destination.getType().getNumber() + ":" + destination.getName();
1715 }
1716
1717 // /////////////////////////////////////////////////////////////////
1718 // Transaction related implementation methods.
1719 // /////////////////////////////////////////////////////////////////
1720 protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
1721 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
1722
1723 private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
1724 TransactionId key = key(info);
1725 List<Operation> tx;
1726 synchronized (inflightTransactions) {
1727 tx = inflightTransactions.get(key);
1728 if (tx == null) {
1729 tx = Collections.synchronizedList(new ArrayList<Operation>());
1730 inflightTransactions.put(key, tx);
1731 }
1732 }
1733 return tx;
1734 }
1735
1736 private TransactionId key(KahaTransactionInfo transactionInfo) {
1737 if (transactionInfo.hasLocalTransacitonId()) {
1738 KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
1739 LocalTransactionId rc = new LocalTransactionId();
1740 rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
1741 rc.setValue(tx.getTransacitonId());
1742 return rc;
1743 } else {
1744 KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
1745 XATransactionId rc = new XATransactionId();
1746 rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
1747 rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
1748 rc.setFormatId(tx.getFormatId());
1749 return rc;
1750 }
1751 }
1752
1753 abstract class Operation {
1754 final Location location;
1755
1756 public Operation(Location location) {
1757 this.location = location;
1758 }
1759
1760 public Location getLocation() {
1761 return location;
1762 }
1763
1764 abstract public void execute(Transaction tx) throws IOException;
1765 }
1766
1767 class AddOpperation extends Operation {
1768 final KahaAddMessageCommand command;
1769
1770 public AddOpperation(KahaAddMessageCommand command, Location location) {
1771 super(location);
1772 this.command = command;
1773 }
1774
1775 @Override
1776 public void execute(Transaction tx) throws IOException {
1777 upadateIndex(tx, command, location);
1778 }
1779
1780 public KahaAddMessageCommand getCommand() {
1781 return command;
1782 }
1783 }
1784
1785 class RemoveOpperation extends Operation {
1786 final KahaRemoveMessageCommand command;
1787
1788 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
1789 super(location);
1790 this.command = command;
1791 }
1792
1793 @Override
1794 public void execute(Transaction tx) throws IOException {
1795 updateIndex(tx, command, location);
1796 }
1797
1798 public KahaRemoveMessageCommand getCommand() {
1799 return command;
1800 }
1801 }
1802
1803 // /////////////////////////////////////////////////////////////////
1804 // Initialization related implementation methods.
1805 // /////////////////////////////////////////////////////////////////
1806
1807 private PageFile createPageFile() {
1808 PageFile index = new PageFile(directory, "db");
1809 index.setEnableWriteThread(isEnableIndexWriteAsync());
1810 index.setWriteBatchSize(getIndexWriteBatchSize());
1811 index.setPageCacheSize(indexCacheSize);
1812 return index;
1813 }
1814
1815 private Journal createJournal() throws IOException {
1816 Journal manager = new Journal();
1817 manager.setDirectory(directory);
1818 manager.setMaxFileLength(getJournalMaxFileLength());
1819 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
1820 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
1821 manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
1822 manager.setArchiveDataLogs(isArchiveDataLogs());
1823 manager.setSizeAccumulator(storeSize);
1824 if (getDirectoryArchive() != null) {
1825 IOHelper.mkdirs(getDirectoryArchive());
1826 manager.setDirectoryArchive(getDirectoryArchive());
1827 }
1828 return manager;
1829 }
1830
1831 public int getJournalMaxWriteBatchSize() {
1832 return journalMaxWriteBatchSize;
1833 }
1834
1835 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
1836 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
1837 }
1838
1839 public File getDirectory() {
1840 return directory;
1841 }
1842
1843 public void setDirectory(File directory) {
1844 this.directory = directory;
1845 }
1846
1847 public boolean isDeleteAllMessages() {
1848 return deleteAllMessages;
1849 }
1850
1851 public void setDeleteAllMessages(boolean deleteAllMessages) {
1852 this.deleteAllMessages = deleteAllMessages;
1853 }
1854
1855 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
1856 this.setIndexWriteBatchSize = setIndexWriteBatchSize;
1857 }
1858
1859 public int getIndexWriteBatchSize() {
1860 return setIndexWriteBatchSize;
1861 }
1862
1863 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
1864 this.enableIndexWriteAsync = enableIndexWriteAsync;
1865 }
1866
1867 boolean isEnableIndexWriteAsync() {
1868 return enableIndexWriteAsync;
1869 }
1870
1871 public boolean isEnableJournalDiskSyncs() {
1872 return enableJournalDiskSyncs;
1873 }
1874
1875 public void setEnableJournalDiskSyncs(boolean syncWrites) {
1876 this.enableJournalDiskSyncs = syncWrites;
1877 }
1878
1879 public long getCheckpointInterval() {
1880 return checkpointInterval;
1881 }
1882
1883 public void setCheckpointInterval(long checkpointInterval) {
1884 this.checkpointInterval = checkpointInterval;
1885 }
1886
1887 public long getCleanupInterval() {
1888 return cleanupInterval;
1889 }
1890
1891 public void setCleanupInterval(long cleanupInterval) {
1892 this.cleanupInterval = cleanupInterval;
1893 }
1894
1895 public void setJournalMaxFileLength(int journalMaxFileLength) {
1896 this.journalMaxFileLength = journalMaxFileLength;
1897 }
1898
1899 public int getJournalMaxFileLength() {
1900 return journalMaxFileLength;
1901 }
1902
1903 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
1904 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
1905 }
1906
1907 public int getMaxFailoverProducersToTrack() {
1908 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
1909 }
1910
1911 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
1912 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
1913 }
1914
1915 public int getFailoverProducersAuditDepth() {
1916 return this.metadata.producerSequenceIdTracker.getAuditDepth();
1917 }
1918
1919 public PageFile getPageFile() {
1920 if (pageFile == null) {
1921 pageFile = createPageFile();
1922 }
1923 return pageFile;
1924 }
1925
1926 public Journal getJournal() throws IOException {
1927 if (journal == null) {
1928 journal = createJournal();
1929 }
1930 return journal;
1931 }
1932
1933 public boolean isFailIfDatabaseIsLocked() {
1934 return failIfDatabaseIsLocked;
1935 }
1936
1937 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
1938 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
1939 }
1940
1941 public boolean isIgnoreMissingJournalfiles() {
1942 return ignoreMissingJournalfiles;
1943 }
1944
1945 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
1946 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
1947 }
1948
1949 public int getIndexCacheSize() {
1950 return indexCacheSize;
1951 }
1952
1953 public void setIndexCacheSize(int indexCacheSize) {
1954 this.indexCacheSize = indexCacheSize;
1955 }
1956
1957 public boolean isCheckForCorruptJournalFiles() {
1958 return checkForCorruptJournalFiles;
1959 }
1960
1961 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
1962 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
1963 }
1964
1965 public boolean isChecksumJournalFiles() {
1966 return checksumJournalFiles;
1967 }
1968
1969 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
1970 this.checksumJournalFiles = checksumJournalFiles;
1971 }
1972
1973 public void setBrokerService(BrokerService brokerService) {
1974 this.brokerService = brokerService;
1975 }
1976
1977 /**
1978 * @return the archiveDataLogs
1979 */
1980 public boolean isArchiveDataLogs() {
1981 return this.archiveDataLogs;
1982 }
1983
1984 /**
1985 * @param archiveDataLogs the archiveDataLogs to set
1986 */
1987 public void setArchiveDataLogs(boolean archiveDataLogs) {
1988 this.archiveDataLogs = archiveDataLogs;
1989 }
1990
1991 /**
1992 * @return the directoryArchive
1993 */
1994 public File getDirectoryArchive() {
1995 return this.directoryArchive;
1996 }
1997
1998 /**
1999 * @param directoryArchive the directoryArchive to set
2000 */
2001 public void setDirectoryArchive(File directoryArchive) {
2002 this.directoryArchive = directoryArchive;
2003 }
2004
2005 /**
2006 * @return the databaseLockedWaitDelay
2007 */
2008 public int getDatabaseLockedWaitDelay() {
2009 return this.databaseLockedWaitDelay;
2010 }
2011
2012 /**
2013 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
2014 */
2015 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
2016 this.databaseLockedWaitDelay = databaseLockedWaitDelay;
2017 }
2018
2019 // /////////////////////////////////////////////////////////////////
2020 // Internal conversion methods.
2021 // /////////////////////////////////////////////////////////////////
2022
2023 KahaTransactionInfo createTransactionInfo(TransactionId txid) {
2024 if (txid == null) {
2025 return null;
2026 }
2027 KahaTransactionInfo rc = new KahaTransactionInfo();
2028
2029 if (txid.isLocalTransaction()) {
2030 LocalTransactionId t = (LocalTransactionId) txid;
2031 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
2032 kahaTxId.setConnectionId(t.getConnectionId().getValue());
2033 kahaTxId.setTransacitonId(t.getValue());
2034 rc.setLocalTransacitonId(kahaTxId);
2035 } else {
2036 XATransactionId t = (XATransactionId) txid;
2037 KahaXATransactionId kahaTxId = new KahaXATransactionId();
2038 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
2039 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
2040 kahaTxId.setFormatId(t.getFormatId());
2041 rc.setXaTransacitonId(kahaTxId);
2042 }
2043 return rc;
2044 }
2045
2046 class MessageOrderCursor{
2047 long defaultCursorPosition;
2048 long lowPriorityCursorPosition;
2049 long highPriorityCursorPosition;
2050 MessageOrderCursor(){
2051 }
2052
2053 MessageOrderCursor(long position){
2054 this.defaultCursorPosition=position;
2055 this.lowPriorityCursorPosition=position;
2056 this.highPriorityCursorPosition=position;
2057 }
2058
2059 MessageOrderCursor(MessageOrderCursor other){
2060 this.defaultCursorPosition=other.defaultCursorPosition;
2061 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2062 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2063 }
2064
2065 MessageOrderCursor copy() {
2066 return new MessageOrderCursor(this);
2067 }
2068
2069 void reset() {
2070 this.defaultCursorPosition=0;
2071 this.highPriorityCursorPosition=0;
2072 this.lowPriorityCursorPosition=0;
2073 }
2074
2075 void increment() {
2076 if (defaultCursorPosition!=0) {
2077 defaultCursorPosition++;
2078 }
2079 if (highPriorityCursorPosition!=0) {
2080 highPriorityCursorPosition++;
2081 }
2082 if (lowPriorityCursorPosition!=0) {
2083 lowPriorityCursorPosition++;
2084 }
2085 }
2086
2087 public String toString() {
2088 return "MessageOrderCursor:[def:" + defaultCursorPosition
2089 + ", low:" + lowPriorityCursorPosition
2090 + ", high:" + highPriorityCursorPosition + "]";
2091 }
2092
2093 public void sync(MessageOrderCursor other) {
2094 this.defaultCursorPosition=other.defaultCursorPosition;
2095 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2096 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2097 }
2098 }
2099
2100 class MessageOrderIndex {
2101 static final byte HI = 9;
2102 static final byte LO = 0;
2103 static final byte DEF = 4;
2104
2105 long nextMessageId;
2106 BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2107 BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2108 BTreeIndex<Long, MessageKeys> highPriorityIndex;
2109 MessageOrderCursor cursor = new MessageOrderCursor();
2110 Long lastDefaultKey;
2111 Long lastHighKey;
2112 Long lastLowKey;
2113 byte lastGetPriority;
2114
2115 MessageKeys remove(Transaction tx, Long key) throws IOException {
2116 MessageKeys result = defaultPriorityIndex.remove(tx, key);
2117 if (result == null && highPriorityIndex!=null) {
2118 result = highPriorityIndex.remove(tx, key);
2119 if (result ==null && lowPriorityIndex!=null) {
2120 result = lowPriorityIndex.remove(tx, key);
2121 }
2122 }
2123 return result;
2124 }
2125
2126 void load(Transaction tx) throws IOException {
2127 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2128 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2129 defaultPriorityIndex.load(tx);
2130 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2131 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2132 lowPriorityIndex.load(tx);
2133 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2134 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2135 highPriorityIndex.load(tx);
2136 }
2137
2138 void allocate(Transaction tx) throws IOException {
2139 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2140 if (metadata.version >= 2) {
2141 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2142 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2143 }
2144 }
2145
2146 void configureLast(Transaction tx) throws IOException {
2147 // Figure out the next key using the last entry in the destination.
2148 if (highPriorityIndex != null) {
2149 Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2150 if (lastEntry != null) {
2151 nextMessageId = lastEntry.getKey() + 1;
2152 } else {
2153 lastEntry = defaultPriorityIndex.getLast(tx);
2154 if (lastEntry != null) {
2155 nextMessageId = lastEntry.getKey() + 1;
2156 } else {
2157 lastEntry = lowPriorityIndex.getLast(tx);
2158 if (lastEntry != null) {
2159 nextMessageId = lastEntry.getKey() + 1;
2160 }
2161 }
2162 }
2163 } else {
2164 Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2165 if (lastEntry != null) {
2166 nextMessageId = lastEntry.getKey() + 1;
2167 }
2168 }
2169 }
2170
2171
2172 void remove(Transaction tx) throws IOException {
2173 defaultPriorityIndex.clear(tx);
2174 defaultPriorityIndex.unload(tx);
2175 tx.free(defaultPriorityIndex.getPageId());
2176 if (lowPriorityIndex != null) {
2177 lowPriorityIndex.clear(tx);
2178 lowPriorityIndex.unload(tx);
2179
2180 tx.free(lowPriorityIndex.getPageId());
2181 }
2182 if (highPriorityIndex != null) {
2183 highPriorityIndex.clear(tx);
2184 highPriorityIndex.unload(tx);
2185 tx.free(highPriorityIndex.getPageId());
2186 }
2187 }
2188
2189 void resetCursorPosition() {
2190 this.cursor.reset();
2191 lastDefaultKey = null;
2192 lastHighKey = null;
2193 lastLowKey = null;
2194 }
2195
2196 void setBatch(Transaction tx, Long sequence) throws IOException {
2197 if (sequence != null) {
2198 Long nextPosition = new Long(sequence.longValue() + 1);
2199 if (defaultPriorityIndex.containsKey(tx, sequence)) {
2200 lastDefaultKey = sequence;
2201 cursor.defaultCursorPosition = nextPosition.longValue();
2202 } else if (highPriorityIndex != null) {
2203 if (highPriorityIndex.containsKey(tx, sequence)) {
2204 lastHighKey = sequence;
2205 cursor.highPriorityCursorPosition = nextPosition.longValue();
2206 } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2207 lastLowKey = sequence;
2208 cursor.lowPriorityCursorPosition = nextPosition.longValue();
2209 }
2210 } else {
2211 lastDefaultKey = sequence;
2212 cursor.defaultCursorPosition = nextPosition.longValue();
2213 }
2214 }
2215 }
2216
2217 void setBatch(Transaction tx, LastAck last) throws IOException {
2218 setBatch(tx, last.lastAckedSequence);
2219 if (cursor.defaultCursorPosition == 0
2220 && cursor.highPriorityCursorPosition == 0
2221 && cursor.lowPriorityCursorPosition == 0) {
2222 long next = last.lastAckedSequence + 1;
2223 switch (last.priority) {
2224 case DEF:
2225 cursor.defaultCursorPosition = next;
2226 cursor.highPriorityCursorPosition = next;
2227 break;
2228 case HI:
2229 cursor.highPriorityCursorPosition = next;
2230 break;
2231 case LO:
2232 cursor.lowPriorityCursorPosition = next;
2233 cursor.defaultCursorPosition = next;
2234 cursor.highPriorityCursorPosition = next;
2235 break;
2236 }
2237 }
2238 }
2239
2240 void stoppedIterating() {
2241 if (lastDefaultKey!=null) {
2242 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2243 }
2244 if (lastHighKey!=null) {
2245 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2246 }
2247 if (lastLowKey!=null) {
2248 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2249 }
2250 lastDefaultKey = null;
2251 lastHighKey = null;
2252 lastLowKey = null;
2253 }
2254
2255 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2256 throws IOException {
2257 if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2258 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2259 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2260 getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2261 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2262 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2263 }
2264 }
2265
2266 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2267 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2268
2269 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2270 deletes.add(iterator.next());
2271 }
2272
2273 long getNextMessageId(int priority) {
2274 return nextMessageId++;
2275 }
2276
2277 MessageKeys get(Transaction tx, Long key) throws IOException {
2278 MessageKeys result = defaultPriorityIndex.get(tx, key);
2279 if (result == null) {
2280 result = highPriorityIndex.get(tx, key);
2281 if (result == null) {
2282 result = lowPriorityIndex.get(tx, key);
2283 lastGetPriority = LO;
2284 } else {
2285 lastGetPriority = HI;
2286 }
2287 } else {
2288 lastGetPriority = DEF;
2289 }
2290 return result;
2291 }
2292
2293 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2294 if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2295 return defaultPriorityIndex.put(tx, key, value);
2296 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2297 return highPriorityIndex.put(tx, key, value);
2298 } else {
2299 return lowPriorityIndex.put(tx, key, value);
2300 }
2301 }
2302
2303 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2304 return new MessageOrderIterator(tx,cursor);
2305 }
2306
2307 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2308 return new MessageOrderIterator(tx,m);
2309 }
2310
2311 public byte lastGetPriority() {
2312 return lastGetPriority;
2313 }
2314
2315 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2316 Iterator<Entry<Long, MessageKeys>>currentIterator;
2317 final Iterator<Entry<Long, MessageKeys>>highIterator;
2318 final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2319 final Iterator<Entry<Long, MessageKeys>>lowIterator;
2320
2321
2322
2323 MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2324 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2325 if (highPriorityIndex != null) {
2326 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2327 } else {
2328 this.highIterator = null;
2329 }
2330 if (lowPriorityIndex != null) {
2331 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2332 } else {
2333 this.lowIterator = null;
2334 }
2335 }
2336
2337 public boolean hasNext() {
2338 if (currentIterator == null) {
2339 if (highIterator != null) {
2340 if (highIterator.hasNext()) {
2341 currentIterator = highIterator;
2342 return currentIterator.hasNext();
2343 }
2344 if (defaultIterator.hasNext()) {
2345 currentIterator = defaultIterator;
2346 return currentIterator.hasNext();
2347 }
2348 if (lowIterator.hasNext()) {
2349 currentIterator = lowIterator;
2350 return currentIterator.hasNext();
2351 }
2352 return false;
2353 } else {
2354 currentIterator = defaultIterator;
2355 return currentIterator.hasNext();
2356 }
2357 }
2358 if (highIterator != null) {
2359 if (currentIterator.hasNext()) {
2360 return true;
2361 }
2362 if (currentIterator == highIterator) {
2363 if (defaultIterator.hasNext()) {
2364 currentIterator = defaultIterator;
2365 return currentIterator.hasNext();
2366 }
2367 if (lowIterator.hasNext()) {
2368 currentIterator = lowIterator;
2369 return currentIterator.hasNext();
2370 }
2371 return false;
2372 }
2373 if (currentIterator == defaultIterator) {
2374 if (lowIterator.hasNext()) {
2375 currentIterator = lowIterator;
2376 return currentIterator.hasNext();
2377 }
2378 return false;
2379 }
2380 }
2381 return currentIterator.hasNext();
2382 }
2383
2384 public Entry<Long, MessageKeys> next() {
2385 Entry<Long, MessageKeys> result = currentIterator.next();
2386 if (result != null) {
2387 Long key = result.getKey();
2388 if (highIterator != null) {
2389 if (currentIterator == defaultIterator) {
2390 lastDefaultKey = key;
2391 } else if (currentIterator == highIterator) {
2392 lastHighKey = key;
2393 } else {
2394 lastLowKey = key;
2395 }
2396 } else {
2397 lastDefaultKey = key;
2398 }
2399 }
2400 return result;
2401 }
2402
2403 public void remove() {
2404 throw new UnsupportedOperationException();
2405 }
2406
2407 }
2408 }
2409
2410 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2411 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2412
2413 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2414 ByteArrayOutputStream baos = new ByteArrayOutputStream();
2415 ObjectOutputStream oout = new ObjectOutputStream(baos);
2416 oout.writeObject(object);
2417 oout.flush();
2418 oout.close();
2419 byte[] data = baos.toByteArray();
2420 dataOut.writeInt(data.length);
2421 dataOut.write(data);
2422 }
2423
2424 public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2425 int dataLen = dataIn.readInt();
2426 byte[] data = new byte[dataLen];
2427 dataIn.readFully(data);
2428 ByteArrayInputStream bais = new ByteArrayInputStream(data);
2429 ObjectInputStream oin = new ObjectInputStream(bais);
2430 try {
2431 return (HashSet<String>) oin.readObject();
2432 } catch (ClassNotFoundException cfe) {
2433 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2434 ioe.initCause(cfe);
2435 throw ioe;
2436 }
2437 }
2438 }
2439 }