001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.amq;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.nio.channels.FileLock;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.Map;
028 import java.util.Set;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.CountDownLatch;
031 import java.util.concurrent.atomic.AtomicBoolean;
032 import java.util.concurrent.atomic.AtomicInteger;
033 import java.util.concurrent.atomic.AtomicLong;
034 import org.apache.activeio.journal.Journal;
035 import org.apache.activemq.broker.BrokerService;
036 import org.apache.activemq.broker.BrokerServiceAware;
037 import org.apache.activemq.broker.ConnectionContext;
038 import org.apache.activemq.command.ActiveMQDestination;
039 import org.apache.activemq.command.ActiveMQQueue;
040 import org.apache.activemq.command.ActiveMQTopic;
041 import org.apache.activemq.command.DataStructure;
042 import org.apache.activemq.command.JournalQueueAck;
043 import org.apache.activemq.command.JournalTopicAck;
044 import org.apache.activemq.command.JournalTrace;
045 import org.apache.activemq.command.JournalTransaction;
046 import org.apache.activemq.command.Message;
047 import org.apache.activemq.command.ProducerId;
048 import org.apache.activemq.command.SubscriptionInfo;
049 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051 import org.apache.activemq.kaha.impl.async.Location;
052 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053 import org.apache.activemq.openwire.OpenWireFormat;
054 import org.apache.activemq.store.MessageStore;
055 import org.apache.activemq.store.PersistenceAdapter;
056 import org.apache.activemq.store.ReferenceStore;
057 import org.apache.activemq.store.ReferenceStoreAdapter;
058 import org.apache.activemq.store.TopicMessageStore;
059 import org.apache.activemq.store.TopicReferenceStore;
060 import org.apache.activemq.store.TransactionStore;
061 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
062 import org.apache.activemq.thread.Scheduler;
063 import org.apache.activemq.thread.Task;
064 import org.apache.activemq.thread.TaskRunner;
065 import org.apache.activemq.thread.TaskRunnerFactory;
066 import org.apache.activemq.usage.SystemUsage;
067 import org.apache.activemq.usage.Usage;
068 import org.apache.activemq.usage.UsageListener;
069 import org.apache.activemq.util.ByteSequence;
070 import org.apache.activemq.util.IOExceptionSupport;
071 import org.apache.activemq.util.IOHelper;
072 import org.apache.activemq.wireformat.WireFormat;
073 import org.slf4j.Logger;
074 import org.slf4j.LoggerFactory;
075
076
077 /**
078 * An implementation of {@link PersistenceAdapter} designed for use with a
079 * {@link Journal} and then check pointing asynchronously on a timeout with some
080 * other long term persistent storage.
081 *
082 * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083 *
084 */
085 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086
087 private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
088 private Scheduler scheduler;
089 private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090 private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091 private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092 private static final boolean BROKEN_FILE_LOCK;
093 private static final boolean DISABLE_LOCKING;
094 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095 private AsyncDataManager asyncDataManager;
096 private ReferenceStoreAdapter referenceStoreAdapter;
097 private TaskRunnerFactory taskRunnerFactory;
098 private WireFormat wireFormat = new OpenWireFormat();
099 private SystemUsage usageManager;
100 private long checkpointInterval = 1000 * 20;
101 private int maxCheckpointMessageAddSize = 1024 * 4;
102 private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103 private TaskRunner checkpointTask;
104 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105 private final AtomicBoolean started = new AtomicBoolean(false);
106 private Runnable periodicCheckpointTask;
107 private Runnable periodicCleanupTask;
108 private boolean deleteAllMessages;
109 private boolean syncOnWrite;
110 private boolean syncOnTransaction=true;
111 private String brokerName = "";
112 private File directory;
113 private File directoryArchive;
114 private BrokerService brokerService;
115 private final AtomicLong storeSize = new AtomicLong();
116 private boolean persistentIndex=true;
117 private boolean useNio = true;
118 private boolean archiveDataLogs=false;
119 private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
120 private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
121 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
122 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
123 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
124 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
125 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
126 private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
127 private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
128 private RandomAccessFile lockFile;
129 private FileLock lock;
130 private boolean disableLocking = DISABLE_LOCKING;
131 private boolean failIfJournalIsLocked;
132 private boolean lockLogged;
133 private boolean lockAquired;
134 private boolean recoverReferenceStore=true;
135 private boolean forceRecoverReferenceStore=false;
136 private boolean useDedicatedTaskRunner=false;
137 private int journalThreadPriority = Thread.MAX_PRIORITY;
138
139 public String getBrokerName() {
140 return this.brokerName;
141 }
142
143 public void setBrokerName(String brokerName) {
144 this.brokerName = brokerName;
145 if (this.referenceStoreAdapter != null) {
146 this.referenceStoreAdapter.setBrokerName(brokerName);
147 }
148 }
149
150 public BrokerService getBrokerService() {
151 return brokerService;
152 }
153
154 public void setBrokerService(BrokerService brokerService) {
155 this.brokerService = brokerService;
156 }
157
158 public synchronized void start() throws Exception {
159 if (!started.compareAndSet(false, true)) {
160 return;
161 }
162 if (this.directory == null) {
163 if (brokerService != null) {
164 this.directory = brokerService.getBrokerDataDirectory();
165
166 } else {
167 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
168 this.directory = new File(directory, "amqstore");
169 directory.getAbsolutePath();
170 }
171 }
172 if (this.directoryArchive == null) {
173 this.directoryArchive = new File(this.directory,"archive");
174 }
175 if (this.brokerService != null) {
176 this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
177 this.scheduler = this.brokerService.getScheduler();
178 } else {
179 this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
180 true, 1000, isUseDedicatedTaskRunner());
181 this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
182 }
183
184 IOHelper.mkdirs(this.directory);
185 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
186 lock();
187 LOG.info("AMQStore starting using directory: " + directory);
188 if (archiveDataLogs) {
189 IOHelper.mkdirs(this.directoryArchive);
190 }
191
192 if (this.usageManager != null) {
193 this.usageManager.getMemoryUsage().addUsageListener(this);
194 }
195 if (asyncDataManager == null) {
196 asyncDataManager = createAsyncDataManager();
197 }
198 if (referenceStoreAdapter == null) {
199 referenceStoreAdapter = createReferenceStoreAdapter();
200 }
201 referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
202 referenceStoreAdapter.setBrokerName(getBrokerName());
203 referenceStoreAdapter.setUsageManager(usageManager);
204 referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
205
206 if (failIfJournalIsLocked) {
207 asyncDataManager.lock();
208 } else {
209 while (true) {
210 try {
211 asyncDataManager.lock();
212 break;
213 } catch (IOException e) {
214 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
215 try {
216 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
217 } catch (InterruptedException e1) {
218 }
219 }
220 }
221 }
222
223 asyncDataManager.start();
224 if (deleteAllMessages) {
225 asyncDataManager.delete();
226 try {
227 JournalTrace trace = new JournalTrace();
228 trace.setMessage("DELETED " + new Date());
229 Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
230 asyncDataManager.setMark(location, true);
231 LOG.info("Journal deleted: ");
232 deleteAllMessages = false;
233 } catch (IOException e) {
234 throw e;
235 } catch (Throwable e) {
236 throw IOExceptionSupport.create(e);
237 }
238 referenceStoreAdapter.deleteAllMessages();
239 }
240 referenceStoreAdapter.start();
241 Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
242 LOG.info("Active data files: " + files);
243 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
244
245 public boolean iterate() {
246 doCheckpoint();
247 return false;
248 }
249 }, "ActiveMQ Journal Checkpoint Worker");
250 createTransactionStore();
251
252 //
253 // The following was attempting to reduce startup times by avoiding the
254 // log
255 // file scanning that recovery performs. The problem with it is that XA
256 // transactions
257 // only live in transaction log and are not stored in the reference
258 // store, but they still
259 // need to be recovered when the broker starts up.
260
261 if (isForceRecoverReferenceStore()
262 || (isRecoverReferenceStore() && !referenceStoreAdapter
263 .isStoreValid())) {
264 LOG.warn("The ReferenceStore is not valid - recovering ...");
265 recover();
266 LOG.info("Finished recovering the ReferenceStore");
267 } else {
268 Location location = writeTraceMessage("RECOVERED " + new Date(),
269 true);
270 asyncDataManager.setMark(location, true);
271 // recover transactions
272 getTransactionStore().setPreparedTransactions(
273 referenceStoreAdapter.retrievePreparedState());
274 }
275
276 // Do a checkpoint periodically.
277 periodicCheckpointTask = new Runnable() {
278
279 public void run() {
280 checkpoint(false);
281 }
282 };
283 scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
284 periodicCleanupTask = new Runnable() {
285
286 public void run() {
287 cleanup();
288 }
289 };
290 scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
291
292 if (lockAquired && lockLogged) {
293 LOG.info("Aquired lock for AMQ Store" + getDirectory());
294 if (brokerService != null) {
295 brokerService.getBroker().nowMasterBroker();
296 }
297 }
298
299 }
300
301 public void stop() throws Exception {
302
303 if (!started.compareAndSet(true, false)) {
304 return;
305 }
306 unlock();
307 if (lockFile != null) {
308 lockFile.close();
309 lockFile = null;
310 }
311 this.usageManager.getMemoryUsage().removeUsageListener(this);
312 synchronized (this) {
313 scheduler.cancel(periodicCheckpointTask);
314 scheduler.cancel(periodicCleanupTask);
315 }
316 Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
317 while (queueIterator.hasNext()) {
318 AMQMessageStore ms = queueIterator.next();
319 ms.stop();
320 }
321 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
322 while (topicIterator.hasNext()) {
323 final AMQTopicMessageStore ms = topicIterator.next();
324 ms.stop();
325 }
326 // Take one final checkpoint and stop checkpoint processing.
327 checkpoint(true);
328 synchronized (this) {
329 checkpointTask.shutdown();
330 }
331 referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
332 queues.clear();
333 topics.clear();
334 IOException firstException = null;
335 referenceStoreAdapter.stop();
336 referenceStoreAdapter = null;
337
338 if (this.brokerService == null) {
339 this.taskRunnerFactory.shutdown();
340 this.scheduler.stop();
341 }
342 try {
343 LOG.debug("Journal close");
344 asyncDataManager.close();
345 } catch (Exception e) {
346 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
347 }
348 if (firstException != null) {
349 throw firstException;
350 }
351 }
352
353 /**
354 * When we checkpoint we move all the journalled data to long term storage.
355 *
356 * @param sync
357 */
358 public void checkpoint(boolean sync) {
359 try {
360 if (asyncDataManager == null) {
361 throw new IllegalStateException("Journal is closed.");
362 }
363 CountDownLatch latch = null;
364 synchronized (this) {
365 latch = nextCheckpointCountDownLatch;
366 checkpointTask.wakeup();
367 }
368 if (sync) {
369 if (LOG.isDebugEnabled()) {
370 LOG.debug("Waitng for checkpoint to complete.");
371 }
372 latch.await();
373 }
374 referenceStoreAdapter.checkpoint(sync);
375 } catch (InterruptedException e) {
376 Thread.currentThread().interrupt();
377 LOG.warn("Request to start checkpoint failed: " + e, e);
378 } catch (IOException e) {
379 LOG.error("checkpoint failed: " + e, e);
380 }
381 }
382
383 /**
384 * This does the actual checkpoint.
385 *
386 * @return true if successful
387 */
388 public boolean doCheckpoint() {
389 CountDownLatch latch = null;
390 synchronized (this) {
391 latch = nextCheckpointCountDownLatch;
392 nextCheckpointCountDownLatch = new CountDownLatch(1);
393 }
394 try {
395 if (LOG.isDebugEnabled()) {
396 LOG.debug("Checkpoint started.");
397 }
398
399 Location currentMark = asyncDataManager.getMark();
400 Location newMark = currentMark;
401 Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
402 while (queueIterator.hasNext()) {
403 final AMQMessageStore ms = queueIterator.next();
404 Location mark = ms.getMark();
405 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
406 newMark = mark;
407 }
408 }
409 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
410 while (topicIterator.hasNext()) {
411 final AMQTopicMessageStore ms = topicIterator.next();
412 Location mark = ms.getMark();
413 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
414 newMark = mark;
415 }
416 }
417 try {
418 if (newMark != currentMark) {
419 if (LOG.isDebugEnabled()) {
420 LOG.debug("Marking journal at: " + newMark);
421 }
422 asyncDataManager.setMark(newMark, false);
423 writeTraceMessage("CHECKPOINT " + new Date(), true);
424 }
425 } catch (Exception e) {
426 LOG.error("Failed to mark the Journal: " + e, e);
427 }
428 if (LOG.isDebugEnabled()) {
429 LOG.debug("Checkpoint done.");
430 }
431 } finally {
432 latch.countDown();
433 }
434 return true;
435 }
436
437 /**
438 * Cleans up the data files
439 * @throws IOException
440 */
441 public void cleanup() {
442 try {
443 Set<Integer>inProgress = new HashSet<Integer>();
444 if (LOG.isDebugEnabled()) {
445 LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
446 }
447 for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
448 inProgress.addAll(set.keySet());
449 }
450 Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
451 inProgress.add(lastDataFile);
452 lastDataFile = asyncDataManager.getMark().getDataFileId();
453 inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
454 Location lastActiveTx = transactionStore.checkpoint();
455 if (lastActiveTx != null) {
456 lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
457 }
458 LOG.debug("lastDataFile: " + lastDataFile);
459 asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
460 } catch (IOException e) {
461 LOG.error("Could not cleanup data files: " + e, e);
462 }
463 }
464
465 public Set<ActiveMQDestination> getDestinations() {
466 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
467 destinations.addAll(queues.keySet());
468 destinations.addAll(topics.keySet());
469 return destinations;
470 }
471
472 MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
473 if (destination.isQueue()) {
474 return createQueueMessageStore((ActiveMQQueue)destination);
475 } else {
476 return createTopicMessageStore((ActiveMQTopic)destination);
477 }
478 }
479
480 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
481 AMQMessageStore store = queues.get(destination);
482 if (store == null) {
483 ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
484 store = new AMQMessageStore(this, checkpointStore, destination);
485 try {
486 store.start();
487 } catch (Exception e) {
488 throw IOExceptionSupport.create(e);
489 }
490 queues.put(destination, store);
491 }
492 return store;
493 }
494
495 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
496 AMQTopicMessageStore store = topics.get(destinationName);
497 if (store == null) {
498 TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
499 store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
500 try {
501 store.start();
502 } catch (Exception e) {
503 throw IOExceptionSupport.create(e);
504 }
505 topics.put(destinationName, store);
506 }
507 return store;
508 }
509
510 /**
511 * Cleanup method to remove any state associated with the given destination
512 *
513 * @param destination
514 */
515 public void removeQueueMessageStore(ActiveMQQueue destination) {
516 AMQMessageStore store= queues.remove(destination);
517 referenceStoreAdapter.removeQueueMessageStore(destination);
518 }
519
520 /**
521 * Cleanup method to remove any state associated with the given destination
522 *
523 * @param destination
524 */
525 public void removeTopicMessageStore(ActiveMQTopic destination) {
526 topics.remove(destination);
527 }
528
529 public TransactionStore createTransactionStore() throws IOException {
530 return transactionStore;
531 }
532
533 public long getLastMessageBrokerSequenceId() throws IOException {
534 return referenceStoreAdapter.getLastMessageBrokerSequenceId();
535 }
536
537 public void beginTransaction(ConnectionContext context) throws IOException {
538 referenceStoreAdapter.beginTransaction(context);
539 }
540
541 public void commitTransaction(ConnectionContext context) throws IOException {
542 referenceStoreAdapter.commitTransaction(context);
543 }
544
545 public void rollbackTransaction(ConnectionContext context) throws IOException {
546 referenceStoreAdapter.rollbackTransaction(context);
547 }
548
549 public boolean isPersistentIndex() {
550 return persistentIndex;
551 }
552
553 public void setPersistentIndex(boolean persistentIndex) {
554 this.persistentIndex = persistentIndex;
555 }
556
557 /**
558 * @param location
559 * @return
560 * @throws IOException
561 */
562 public DataStructure readCommand(Location location) throws IOException {
563 try {
564 ByteSequence packet = asyncDataManager.read(location);
565 return (DataStructure)wireFormat.unmarshal(packet);
566 } catch (IOException e) {
567 throw createReadException(location, e);
568 }
569 }
570
571 /**
572 * Move all the messages that were in the journal into long term storage. We
573 * just replay and do a checkpoint.
574 *
575 * @throws IOException
576 * @throws IOException
577 * @throws IllegalStateException
578 */
579 private void recover() throws IllegalStateException, IOException {
580 referenceStoreAdapter.clearMessages();
581 Location pos = null;
582 int redoCounter = 0;
583 LOG.info("Journal Recovery Started from: " + asyncDataManager);
584 long start = System.currentTimeMillis();
585 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
586 // While we have records in the journal.
587 while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
588 ByteSequence data = asyncDataManager.read(pos);
589 DataStructure c = (DataStructure)wireFormat.unmarshal(data);
590 if (c instanceof Message) {
591 Message message = (Message)c;
592 AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
593 if (message.isInTransaction()) {
594 transactionStore.addMessage(store, message, pos);
595 } else {
596 if (store.replayAddMessage(context, message, pos)) {
597 redoCounter++;
598 }
599 }
600 } else {
601 switch (c.getDataStructureType()) {
602 case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
603 referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
604 }
605 break;
606 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
607 JournalQueueAck command = (JournalQueueAck)c;
608 AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
609 if (command.getMessageAck().isInTransaction()) {
610 transactionStore.removeMessage(store, command.getMessageAck(), pos);
611 } else {
612 if (store.replayRemoveMessage(context, command.getMessageAck())) {
613 redoCounter++;
614 }
615 }
616 }
617 break;
618 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
619 JournalTopicAck command = (JournalTopicAck)c;
620 AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
621 if (command.getTransactionId() != null) {
622 transactionStore.acknowledge(store, command, pos);
623 } else {
624 if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
625 redoCounter++;
626 }
627 }
628 }
629 break;
630 case JournalTransaction.DATA_STRUCTURE_TYPE: {
631 JournalTransaction command = (JournalTransaction)c;
632 try {
633 // Try to replay the packet.
634 switch (command.getType()) {
635 case JournalTransaction.XA_PREPARE:
636 transactionStore.replayPrepare(command.getTransactionId());
637 break;
638 case JournalTransaction.XA_COMMIT:
639 case JournalTransaction.LOCAL_COMMIT:
640 AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
641 if (tx == null) {
642 break; // We may be trying to replay a commit
643 }
644 // that
645 // was already committed.
646 // Replay the committed operations.
647 tx.getOperations();
648 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
649 AMQTxOperation op = (AMQTxOperation)iter.next();
650 if (op.replay(this, context)) {
651 redoCounter++;
652 }
653 }
654 break;
655 case JournalTransaction.LOCAL_ROLLBACK:
656 case JournalTransaction.XA_ROLLBACK:
657 transactionStore.replayRollback(command.getTransactionId());
658 break;
659 default:
660 throw new IOException("Invalid journal command type: " + command.getType());
661 }
662 } catch (IOException e) {
663 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
664 }
665 }
666 break;
667 case JournalTrace.DATA_STRUCTURE_TYPE:
668 JournalTrace trace = (JournalTrace)c;
669 LOG.debug("TRACE Entry: " + trace.getMessage());
670 break;
671 default:
672 LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
673 }
674 }
675 }
676 Location location = writeTraceMessage("RECOVERED " + new Date(), true);
677 asyncDataManager.setMark(location, true);
678 long end = System.currentTimeMillis();
679 LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
680 }
681
682 private IOException createReadException(Location location, Exception e) {
683 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
684 }
685
686 protected IOException createWriteException(DataStructure packet, Exception e) {
687 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
688 }
689
690 protected IOException createWriteException(String command, Exception e) {
691 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
692 }
693
694 protected IOException createRecoveryFailedException(Exception e) {
695 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
696 }
697
698 /**
699 * @param command
700 * @param syncHint
701 * @return
702 * @throws IOException
703 */
704 public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
705 return writeCommand(command, syncHint,false);
706 }
707
708 public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
709 try {
710 return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
711 } catch (IOException ioe) {
712 LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
713 brokerService.handleIOException(ioe);
714 throw ioe;
715 }
716 }
717
718 private Location writeTraceMessage(String message, boolean sync) throws IOException {
719 JournalTrace trace = new JournalTrace();
720 trace.setMessage(message);
721 return writeCommand(trace, sync);
722 }
723
724 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
725 newPercentUsage = (newPercentUsage / 10) * 10;
726 oldPercentUsage = (oldPercentUsage / 10) * 10;
727 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
728 checkpoint(false);
729 }
730 }
731
732 public AMQTransactionStore getTransactionStore() {
733 return transactionStore;
734 }
735
736 public synchronized void deleteAllMessages() throws IOException {
737 deleteAllMessages = true;
738 }
739
740 @Override
741 public String toString() {
742 return "AMQPersistenceAdapter(" + directory + ")";
743 }
744
745 // /////////////////////////////////////////////////////////////////
746 // Subclass overridables
747 // /////////////////////////////////////////////////////////////////
748 protected AsyncDataManager createAsyncDataManager() {
749 AsyncDataManager manager = new AsyncDataManager(storeSize);
750 manager.setDirectory(new File(directory, "journal"));
751 manager.setDirectoryArchive(getDirectoryArchive());
752 manager.setArchiveDataLogs(isArchiveDataLogs());
753 manager.setMaxFileLength(maxFileLength);
754 manager.setUseNio(useNio);
755 return manager;
756 }
757
758 protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
759 KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
760 adaptor.setPersistentIndex(isPersistentIndex());
761 adaptor.setIndexBinSize(getIndexBinSize());
762 adaptor.setIndexKeySize(getIndexKeySize());
763 adaptor.setIndexPageSize(getIndexPageSize());
764 adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
765 adaptor.setIndexLoadFactor(getIndexLoadFactor());
766 return adaptor;
767 }
768
769 // /////////////////////////////////////////////////////////////////
770 // Property Accessors
771 // /////////////////////////////////////////////////////////////////
772 public AsyncDataManager getAsyncDataManager() {
773 return asyncDataManager;
774 }
775
776 public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
777 this.asyncDataManager = asyncDataManager;
778 }
779
780 public ReferenceStoreAdapter getReferenceStoreAdapter() {
781 return referenceStoreAdapter;
782 }
783
784 public TaskRunnerFactory getTaskRunnerFactory() {
785 return taskRunnerFactory;
786 }
787
788 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
789 this.taskRunnerFactory = taskRunnerFactory;
790 }
791
792 /**
793 * @return Returns the wireFormat.
794 */
795 public WireFormat getWireFormat() {
796 return wireFormat;
797 }
798
799 public void setWireFormat(WireFormat wireFormat) {
800 this.wireFormat = wireFormat;
801 }
802
803 public SystemUsage getUsageManager() {
804 return usageManager;
805 }
806
807 public void setUsageManager(SystemUsage usageManager) {
808 this.usageManager = usageManager;
809 }
810
811 public int getMaxCheckpointMessageAddSize() {
812 return maxCheckpointMessageAddSize;
813 }
814
815 /**
816 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
817 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
818 */
819 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
820 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
821 }
822
823
824 public synchronized File getDirectory() {
825 return directory;
826 }
827
828 public synchronized void setDirectory(File directory) {
829 this.directory = directory;
830 }
831
832 public boolean isSyncOnWrite() {
833 return this.syncOnWrite;
834 }
835
836 public void setSyncOnWrite(boolean syncOnWrite) {
837 this.syncOnWrite = syncOnWrite;
838 }
839
840 public boolean isSyncOnTransaction() {
841 return syncOnTransaction;
842 }
843
844 public void setSyncOnTransaction(boolean syncOnTransaction) {
845 this.syncOnTransaction = syncOnTransaction;
846 }
847
848 /**
849 * @param referenceStoreAdapter the referenceStoreAdapter to set
850 */
851 public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
852 this.referenceStoreAdapter = referenceStoreAdapter;
853 }
854
855 public long size(){
856 return storeSize.get();
857 }
858
859 public boolean isUseNio() {
860 return useNio;
861 }
862
863 public void setUseNio(boolean useNio) {
864 this.useNio = useNio;
865 }
866
867 public int getMaxFileLength() {
868 return maxFileLength;
869 }
870
871 /**
872 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
873 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
874 */
875 public void setMaxFileLength(int maxFileLength) {
876 this.maxFileLength = maxFileLength;
877 }
878
879 public long getCleanupInterval() {
880 return cleanupInterval;
881 }
882
883 public void setCleanupInterval(long cleanupInterval) {
884 this.cleanupInterval = cleanupInterval;
885 }
886
887 public long getCheckpointInterval() {
888 return checkpointInterval;
889 }
890
891 public void setCheckpointInterval(long checkpointInterval) {
892 this.checkpointInterval = checkpointInterval;
893 }
894
895 public int getIndexBinSize() {
896 return indexBinSize;
897 }
898
899 public void setIndexBinSize(int indexBinSize) {
900 this.indexBinSize = indexBinSize;
901 }
902
903 public int getIndexKeySize() {
904 return indexKeySize;
905 }
906
907 public void setIndexKeySize(int indexKeySize) {
908 this.indexKeySize = indexKeySize;
909 }
910
911 public int getIndexPageSize() {
912 return indexPageSize;
913 }
914
915 public int getIndexMaxBinSize() {
916 return indexMaxBinSize;
917 }
918
919 public void setIndexMaxBinSize(int maxBinSize) {
920 this.indexMaxBinSize = maxBinSize;
921 }
922
923 /**
924 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
925 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
926 */
927 public void setIndexPageSize(int indexPageSize) {
928 this.indexPageSize = indexPageSize;
929 }
930
931 public void setIndexLoadFactor(int factor){
932 this.indexLoadFactor=factor;
933 }
934
935 public int getIndexLoadFactor(){
936 return this.indexLoadFactor;
937 }
938
939 public int getMaxReferenceFileLength() {
940 return maxReferenceFileLength;
941 }
942
943 /**
944 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
945 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
946 */
947 public void setMaxReferenceFileLength(int maxReferenceFileLength) {
948 this.maxReferenceFileLength = maxReferenceFileLength;
949 }
950
951 public File getDirectoryArchive() {
952 return directoryArchive;
953 }
954
955 public void setDirectoryArchive(File directoryArchive) {
956 this.directoryArchive = directoryArchive;
957 }
958
959 public boolean isArchiveDataLogs() {
960 return archiveDataLogs;
961 }
962
963 public void setArchiveDataLogs(boolean archiveDataLogs) {
964 this.archiveDataLogs = archiveDataLogs;
965 }
966
967 public boolean isDisableLocking() {
968 return disableLocking;
969 }
970
971 public void setDisableLocking(boolean disableLocking) {
972 this.disableLocking = disableLocking;
973 }
974
975 /**
976 * @return the recoverReferenceStore
977 */
978 public boolean isRecoverReferenceStore() {
979 return recoverReferenceStore;
980 }
981
982 /**
983 * @param recoverReferenceStore the recoverReferenceStore to set
984 */
985 public void setRecoverReferenceStore(boolean recoverReferenceStore) {
986 this.recoverReferenceStore = recoverReferenceStore;
987 }
988
989 /**
990 * @return the forceRecoverReferenceStore
991 */
992 public boolean isForceRecoverReferenceStore() {
993 return forceRecoverReferenceStore;
994 }
995
996 /**
997 * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
998 */
999 public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
1000 this.forceRecoverReferenceStore = forceRecoverReferenceStore;
1001 }
1002
1003 public boolean isUseDedicatedTaskRunner() {
1004 return useDedicatedTaskRunner;
1005 }
1006
1007 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009 }
1010
1011 /**
1012 * @return the journalThreadPriority
1013 */
1014 public int getJournalThreadPriority() {
1015 return this.journalThreadPriority;
1016 }
1017
1018 /**
1019 * @param journalThreadPriority the journalThreadPriority to set
1020 */
1021 public void setJournalThreadPriority(int journalThreadPriority) {
1022 this.journalThreadPriority = journalThreadPriority;
1023 }
1024
1025
1026 protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1027 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1028 if (map == null) {
1029 map = new ConcurrentHashMap<Integer, AtomicInteger>();
1030 dataFilesInProgress.put(store, map);
1031 }
1032 AtomicInteger count = map.get(dataFileId);
1033 if (count == null) {
1034 count = new AtomicInteger(0);
1035 map.put(dataFileId, count);
1036 }
1037 count.incrementAndGet();
1038 }
1039
1040 protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1041 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1042 if (map != null) {
1043 AtomicInteger count = map.get(dataFileId);
1044 if (count != null) {
1045 int newCount = count.decrementAndGet();
1046 if (newCount <=0) {
1047 map.remove(dataFileId);
1048 }
1049 }
1050 if (map.isEmpty()) {
1051 dataFilesInProgress.remove(store);
1052 }
1053 }
1054 }
1055
1056
1057 protected void lock() throws Exception {
1058 lockLogged = false;
1059 lockAquired = false;
1060 do {
1061 if (doLock()) {
1062 lockAquired = true;
1063 } else {
1064 if (!lockLogged) {
1065 LOG.warn("Waiting to Lock the Store " + getDirectory());
1066 lockLogged = true;
1067 }
1068 Thread.sleep(1000);
1069 }
1070
1071 } while (!lockAquired && !disableLocking);
1072 }
1073
1074 private synchronized void unlock() throws IOException {
1075 if (!disableLocking && (null != lock)) {
1076 //clear property doesn't work on some platforms
1077 System.getProperties().remove(getPropertyKey());
1078 System.clearProperty(getPropertyKey());
1079 assert(System.getProperty(getPropertyKey())==null);
1080 if (lock.isValid()) {
1081 lock.release();
1082 lock.channel().close();
1083
1084 }
1085 lock = null;
1086 }
1087 }
1088
1089
1090 protected boolean doLock() throws IOException {
1091 boolean result = true;
1092 if (!disableLocking && directory != null && lock == null) {
1093 String key = getPropertyKey();
1094 String property = System.getProperty(key);
1095 if (null == property) {
1096 if (!BROKEN_FILE_LOCK) {
1097 lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
1098 if (lock == null) {
1099 result = false;
1100 } else {
1101 System.setProperty(key, new Date().toString());
1102 }
1103 }
1104 } else { // already locked
1105 result = false;
1106 }
1107 }
1108 return result;
1109 }
1110
1111 private String getPropertyKey() throws IOException {
1112 return getClass().getName() + ".lock." + directory.getCanonicalPath();
1113 }
1114
1115 static {
1116 BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1117 + ".FileLockBroken",
1118 "false"));
1119 DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1120 + ".DisableLocking",
1121 "false"));
1122 }
1123
1124
1125 public long getLastProducerSequenceId(ProducerId id) {
1126 // reference store send has adequate duplicate suppression
1127 return -1;
1128 }
1129 }