001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.Set;
025 import java.util.concurrent.Callable;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.CountDownLatch;
028 import java.util.concurrent.FutureTask;
029 import java.util.concurrent.LinkedBlockingQueue;
030 import java.util.concurrent.ThreadFactory;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 import org.apache.activeio.journal.InvalidRecordLocationException;
035 import org.apache.activeio.journal.Journal;
036 import org.apache.activeio.journal.JournalEventListener;
037 import org.apache.activeio.journal.RecordLocation;
038 import org.apache.activeio.packet.ByteArrayPacket;
039 import org.apache.activeio.packet.Packet;
040 import org.apache.activemq.broker.BrokerService;
041 import org.apache.activemq.broker.BrokerServiceAware;
042 import org.apache.activemq.broker.ConnectionContext;
043 import org.apache.activemq.command.ActiveMQDestination;
044 import org.apache.activemq.command.ActiveMQQueue;
045 import org.apache.activemq.command.ActiveMQTopic;
046 import org.apache.activemq.command.DataStructure;
047 import org.apache.activemq.command.JournalQueueAck;
048 import org.apache.activemq.command.JournalTopicAck;
049 import org.apache.activemq.command.JournalTrace;
050 import org.apache.activemq.command.JournalTransaction;
051 import org.apache.activemq.command.Message;
052 import org.apache.activemq.command.MessageAck;
053 import org.apache.activemq.command.ProducerId;
054 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055 import org.apache.activemq.openwire.OpenWireFormat;
056 import org.apache.activemq.store.MessageStore;
057 import org.apache.activemq.store.PersistenceAdapter;
058 import org.apache.activemq.store.TopicMessageStore;
059 import org.apache.activemq.store.TransactionStore;
060 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061 import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062 import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063 import org.apache.activemq.thread.Scheduler;
064 import org.apache.activemq.thread.Task;
065 import org.apache.activemq.thread.TaskRunner;
066 import org.apache.activemq.thread.TaskRunnerFactory;
067 import org.apache.activemq.usage.SystemUsage;
068 import org.apache.activemq.usage.Usage;
069 import org.apache.activemq.usage.UsageListener;
070 import org.apache.activemq.util.ByteSequence;
071 import org.apache.activemq.util.IOExceptionSupport;
072 import org.apache.activemq.wireformat.WireFormat;
073 import org.slf4j.Logger;
074 import org.slf4j.LoggerFactory;
075
076 /**
077 * An implementation of {@link PersistenceAdapter} designed for use with a
078 * {@link Journal} and then check pointing asynchronously on a timeout with some
079 * other long term persistent storage.
080 *
081 * @org.apache.xbean.XBean
082 *
083 */
084 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085
086 private BrokerService brokerService;
087
088 protected Scheduler scheduler;
089 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
090
091 private Journal journal;
092 private PersistenceAdapter longTermPersistence;
093
094 private final WireFormat wireFormat = new OpenWireFormat();
095
096 private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097 private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098
099 private SystemUsage usageManager;
100 private final long checkpointInterval = 1000 * 60 * 5;
101 private long lastCheckpointRequest = System.currentTimeMillis();
102 private long lastCleanup = System.currentTimeMillis();
103 private int maxCheckpointWorkers = 10;
104 private int maxCheckpointMessageAddSize = 1024 * 1024;
105
106 private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107 private ThreadPoolExecutor checkpointExecutor;
108
109 private TaskRunner checkpointTask;
110 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111 private boolean fullCheckPoint;
112
113 private final AtomicBoolean started = new AtomicBoolean(false);
114
115 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116
117 private TaskRunnerFactory taskRunnerFactory;
118
119 public JournalPersistenceAdapter() {
120 }
121
122 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
123 setJournal(journal);
124 setTaskRunnerFactory(taskRunnerFactory);
125 setPersistenceAdapter(longTermPersistence);
126 }
127
128 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
129 this.taskRunnerFactory = taskRunnerFactory;
130 }
131
132 public void setJournal(Journal journal) {
133 this.journal = journal;
134 journal.setJournalEventListener(this);
135 }
136
137 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
138 this.longTermPersistence = longTermPersistence;
139 }
140
141 final Runnable createPeriodicCheckpointTask() {
142 return new Runnable() {
143 public void run() {
144 long lastTime = 0;
145 synchronized (this) {
146 lastTime = lastCheckpointRequest;
147 }
148 if (System.currentTimeMillis() > lastTime + checkpointInterval) {
149 checkpoint(false, true);
150 }
151 }
152 };
153 }
154
155 /**
156 * @param usageManager The UsageManager that is controlling the
157 * destination's memory usage.
158 */
159 public void setUsageManager(SystemUsage usageManager) {
160 this.usageManager = usageManager;
161 longTermPersistence.setUsageManager(usageManager);
162 }
163
164 public Set<ActiveMQDestination> getDestinations() {
165 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
166 destinations.addAll(queues.keySet());
167 destinations.addAll(topics.keySet());
168 return destinations;
169 }
170
171 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
172 if (destination.isQueue()) {
173 return createQueueMessageStore((ActiveMQQueue)destination);
174 } else {
175 return createTopicMessageStore((ActiveMQTopic)destination);
176 }
177 }
178
179 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
180 JournalMessageStore store = queues.get(destination);
181 if (store == null) {
182 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
183 store = new JournalMessageStore(this, checkpointStore, destination);
184 queues.put(destination, store);
185 }
186 return store;
187 }
188
189 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
190 JournalTopicMessageStore store = topics.get(destinationName);
191 if (store == null) {
192 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
193 store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
194 topics.put(destinationName, store);
195 }
196 return store;
197 }
198
199 /**
200 * Cleanup method to remove any state associated with the given destination
201 *
202 * @param destination Destination to forget
203 */
204 public void removeQueueMessageStore(ActiveMQQueue destination) {
205 queues.remove(destination);
206 }
207
208 /**
209 * Cleanup method to remove any state associated with the given destination
210 *
211 * @param destination Destination to forget
212 */
213 public void removeTopicMessageStore(ActiveMQTopic destination) {
214 topics.remove(destination);
215 }
216
217 public TransactionStore createTransactionStore() throws IOException {
218 return transactionStore;
219 }
220
221 public long getLastMessageBrokerSequenceId() throws IOException {
222 return longTermPersistence.getLastMessageBrokerSequenceId();
223 }
224
225 public void beginTransaction(ConnectionContext context) throws IOException {
226 longTermPersistence.beginTransaction(context);
227 }
228
229 public void commitTransaction(ConnectionContext context) throws IOException {
230 longTermPersistence.commitTransaction(context);
231 }
232
233 public void rollbackTransaction(ConnectionContext context) throws IOException {
234 longTermPersistence.rollbackTransaction(context);
235 }
236
237 public synchronized void start() throws Exception {
238 if (!started.compareAndSet(false, true)) {
239 return;
240 }
241
242 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243 public boolean iterate() {
244 return doCheckpoint();
245 }
246 }, "ActiveMQ Journal Checkpoint Worker");
247
248 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
249 public Thread newThread(Runnable runable) {
250 Thread t = new Thread(runable, "Journal checkpoint worker");
251 t.setPriority(7);
252 return t;
253 }
254 });
255 // checkpointExecutor.allowCoreThreadTimeOut(true);
256
257 this.usageManager.getMemoryUsage().addUsageListener(this);
258
259 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
260 // Disabled periodic clean up as it deadlocks with the checkpoint
261 // operations.
262 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
263 }
264
265 longTermPersistence.start();
266 createTransactionStore();
267 recover();
268
269 // Do a checkpoint periodically.
270 this.scheduler = new Scheduler("Journal Scheduler");
271 this.scheduler.start();
272 this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
273
274 }
275
276 public void stop() throws Exception {
277
278 this.usageManager.getMemoryUsage().removeUsageListener(this);
279 if (!started.compareAndSet(true, false)) {
280 return;
281 }
282
283 this.scheduler.cancel(periodicCheckpointTask);
284 this.scheduler.stop();
285
286 // Take one final checkpoint and stop checkpoint processing.
287 checkpoint(true, true);
288 checkpointTask.shutdown();
289 checkpointExecutor.shutdown();
290
291 queues.clear();
292 topics.clear();
293
294 IOException firstException = null;
295 try {
296 journal.close();
297 } catch (Exception e) {
298 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
299 }
300 longTermPersistence.stop();
301
302 if (firstException != null) {
303 throw firstException;
304 }
305 }
306
307 // Properties
308 // -------------------------------------------------------------------------
309 public PersistenceAdapter getLongTermPersistence() {
310 return longTermPersistence;
311 }
312
313 /**
314 * @return Returns the wireFormat.
315 */
316 public WireFormat getWireFormat() {
317 return wireFormat;
318 }
319
320 // Implementation methods
321 // -------------------------------------------------------------------------
322
323 /**
324 * The Journal give us a call back so that we can move old data out of the
325 * journal. Taking a checkpoint does this for us.
326 *
327 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
328 */
329 public void overflowNotification(RecordLocation safeLocation) {
330 checkpoint(false, true);
331 }
332
333 /**
334 * When we checkpoint we move all the journalled data to long term storage.
335 *
336 */
337 public void checkpoint(boolean sync, boolean fullCheckpoint) {
338 try {
339 if (journal == null) {
340 throw new IllegalStateException("Journal is closed.");
341 }
342
343 long now = System.currentTimeMillis();
344 CountDownLatch latch = null;
345 synchronized (this) {
346 latch = nextCheckpointCountDownLatch;
347 lastCheckpointRequest = now;
348 if (fullCheckpoint) {
349 this.fullCheckPoint = true;
350 }
351 }
352
353 checkpointTask.wakeup();
354
355 if (sync) {
356 LOG.debug("Waking for checkpoint to complete.");
357 latch.await();
358 }
359 } catch (InterruptedException e) {
360 Thread.currentThread().interrupt();
361 LOG.warn("Request to start checkpoint failed: " + e, e);
362 }
363 }
364
365 public void checkpoint(boolean sync) {
366 checkpoint(sync, sync);
367 }
368
369 /**
370 * This does the actual checkpoint.
371 *
372 * @return
373 */
374 public boolean doCheckpoint() {
375 CountDownLatch latch = null;
376 boolean fullCheckpoint;
377 synchronized (this) {
378 latch = nextCheckpointCountDownLatch;
379 nextCheckpointCountDownLatch = new CountDownLatch(1);
380 fullCheckpoint = this.fullCheckPoint;
381 this.fullCheckPoint = false;
382 }
383 try {
384
385 LOG.debug("Checkpoint started.");
386 RecordLocation newMark = null;
387
388 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
389
390 //
391 // We do many partial checkpoints (fullCheckpoint==false) to move
392 // topic messages
393 // to long term store as soon as possible.
394 //
395 // We want to avoid doing that for queue messages since removes the
396 // come in the same
397 // checkpoint cycle will nullify the previous message add.
398 // Therefore, we only
399 // checkpoint queues on the fullCheckpoint cycles.
400 //
401 if (fullCheckpoint) {
402 Iterator<JournalMessageStore> iterator = queues.values().iterator();
403 while (iterator.hasNext()) {
404 try {
405 final JournalMessageStore ms = iterator.next();
406 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
407 public RecordLocation call() throws Exception {
408 return ms.checkpoint();
409 }
410 });
411 futureTasks.add(task);
412 checkpointExecutor.execute(task);
413 } catch (Exception e) {
414 LOG.error("Failed to checkpoint a message store: " + e, e);
415 }
416 }
417 }
418
419 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
420 while (iterator.hasNext()) {
421 try {
422 final JournalTopicMessageStore ms = iterator.next();
423 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
424 public RecordLocation call() throws Exception {
425 return ms.checkpoint();
426 }
427 });
428 futureTasks.add(task);
429 checkpointExecutor.execute(task);
430 } catch (Exception e) {
431 LOG.error("Failed to checkpoint a message store: " + e, e);
432 }
433 }
434
435 try {
436 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
437 FutureTask<RecordLocation> ft = iter.next();
438 RecordLocation mark = ft.get();
439 // We only set a newMark on full checkpoints.
440 if (fullCheckpoint) {
441 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
442 newMark = mark;
443 }
444 }
445 }
446 } catch (Throwable e) {
447 LOG.error("Failed to checkpoint a message store: " + e, e);
448 }
449
450 if (fullCheckpoint) {
451 try {
452 if (newMark != null) {
453 LOG.debug("Marking journal at: " + newMark);
454 journal.setMark(newMark, true);
455 }
456 } catch (Exception e) {
457 LOG.error("Failed to mark the Journal: " + e, e);
458 }
459
460 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
461 // We may be check pointing more often than the
462 // checkpointInterval if under high use
463 // But we don't want to clean up the db that often.
464 long now = System.currentTimeMillis();
465 if (now > lastCleanup + checkpointInterval) {
466 lastCleanup = now;
467 ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
468 }
469 }
470 }
471
472 LOG.debug("Checkpoint done.");
473 } finally {
474 latch.countDown();
475 }
476 synchronized (this) {
477 return this.fullCheckPoint;
478 }
479
480 }
481
482 /**
483 * @param location
484 * @return
485 * @throws IOException
486 */
487 public DataStructure readCommand(RecordLocation location) throws IOException {
488 try {
489 Packet packet = journal.read(location);
490 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
491 } catch (InvalidRecordLocationException e) {
492 throw createReadException(location, e);
493 } catch (IOException e) {
494 throw createReadException(location, e);
495 }
496 }
497
498 /**
499 * Move all the messages that were in the journal into long term storage. We
500 * just replay and do a checkpoint.
501 *
502 * @throws IOException
503 * @throws IOException
504 * @throws InvalidRecordLocationException
505 * @throws IllegalStateException
506 */
507 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
508
509 RecordLocation pos = null;
510 int transactionCounter = 0;
511
512 LOG.info("Journal Recovery Started from: " + journal);
513 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
514
515 // While we have records in the journal.
516 while ((pos = journal.getNextRecordLocation(pos)) != null) {
517 Packet data = journal.read(pos);
518 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
519
520 if (c instanceof Message) {
521 Message message = (Message)c;
522 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
523 if (message.isInTransaction()) {
524 transactionStore.addMessage(store, message, pos);
525 } else {
526 store.replayAddMessage(context, message);
527 transactionCounter++;
528 }
529 } else {
530 switch (c.getDataStructureType()) {
531 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
532 JournalQueueAck command = (JournalQueueAck)c;
533 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
534 if (command.getMessageAck().isInTransaction()) {
535 transactionStore.removeMessage(store, command.getMessageAck(), pos);
536 } else {
537 store.replayRemoveMessage(context, command.getMessageAck());
538 transactionCounter++;
539 }
540 }
541 break;
542 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
543 JournalTopicAck command = (JournalTopicAck)c;
544 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
545 if (command.getTransactionId() != null) {
546 transactionStore.acknowledge(store, command, pos);
547 } else {
548 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
549 transactionCounter++;
550 }
551 }
552 break;
553 case JournalTransaction.DATA_STRUCTURE_TYPE: {
554 JournalTransaction command = (JournalTransaction)c;
555 try {
556 // Try to replay the packet.
557 switch (command.getType()) {
558 case JournalTransaction.XA_PREPARE:
559 transactionStore.replayPrepare(command.getTransactionId());
560 break;
561 case JournalTransaction.XA_COMMIT:
562 case JournalTransaction.LOCAL_COMMIT:
563 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
564 if (tx == null) {
565 break; // We may be trying to replay a commit
566 }
567 // that
568 // was already committed.
569
570 // Replay the committed operations.
571 tx.getOperations();
572 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
573 TxOperation op = (TxOperation)iter.next();
574 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
575 op.store.replayAddMessage(context, (Message)op.data);
576 }
577 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
578 op.store.replayRemoveMessage(context, (MessageAck)op.data);
579 }
580 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
581 JournalTopicAck ack = (JournalTopicAck)op.data;
582 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
583 }
584 }
585 transactionCounter++;
586 break;
587 case JournalTransaction.LOCAL_ROLLBACK:
588 case JournalTransaction.XA_ROLLBACK:
589 transactionStore.replayRollback(command.getTransactionId());
590 break;
591 default:
592 throw new IOException("Invalid journal command type: " + command.getType());
593 }
594 } catch (IOException e) {
595 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
596 }
597 }
598 break;
599 case JournalTrace.DATA_STRUCTURE_TYPE:
600 JournalTrace trace = (JournalTrace)c;
601 LOG.debug("TRACE Entry: " + trace.getMessage());
602 break;
603 default:
604 LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
605 }
606 }
607 }
608
609 RecordLocation location = writeTraceMessage("RECOVERED", true);
610 journal.setMark(location, true);
611
612 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
613 }
614
615 private IOException createReadException(RecordLocation location, Exception e) {
616 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
617 }
618
619 protected IOException createWriteException(DataStructure packet, Exception e) {
620 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
621 }
622
623 protected IOException createWriteException(String command, Exception e) {
624 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
625 }
626
627 protected IOException createRecoveryFailedException(Exception e) {
628 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
629 }
630
631 /**
632 * @param command
633 * @param sync
634 * @return
635 * @throws IOException
636 */
637 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
638 if (started.get()) {
639 try {
640 return journal.write(toPacket(wireFormat.marshal(command)), sync);
641 } catch (IOException ioe) {
642 LOG.error("Cannot write to the journal", ioe);
643 brokerService.handleIOException(ioe);
644 throw ioe;
645 }
646 }
647 throw new IOException("closed");
648 }
649
650 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
651 JournalTrace trace = new JournalTrace();
652 trace.setMessage(message);
653 return writeCommand(trace, sync);
654 }
655
656 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
657 newPercentUsage = (newPercentUsage / 10) * 10;
658 oldPercentUsage = (oldPercentUsage / 10) * 10;
659 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
660 boolean sync = newPercentUsage >= 90;
661 checkpoint(sync, true);
662 }
663 }
664
665 public JournalTransactionStore getTransactionStore() {
666 return transactionStore;
667 }
668
669 public void deleteAllMessages() throws IOException {
670 try {
671 JournalTrace trace = new JournalTrace();
672 trace.setMessage("DELETED");
673 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
674 journal.setMark(location, true);
675 LOG.info("Journal deleted: ");
676 } catch (IOException e) {
677 throw e;
678 } catch (Throwable e) {
679 throw IOExceptionSupport.create(e);
680 }
681 longTermPersistence.deleteAllMessages();
682 }
683
684 public SystemUsage getUsageManager() {
685 return usageManager;
686 }
687
688 public int getMaxCheckpointMessageAddSize() {
689 return maxCheckpointMessageAddSize;
690 }
691
692 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
693 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
694 }
695
696 public int getMaxCheckpointWorkers() {
697 return maxCheckpointWorkers;
698 }
699
700 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
701 this.maxCheckpointWorkers = maxCheckpointWorkers;
702 }
703
704 public boolean isUseExternalMessageReferences() {
705 return false;
706 }
707
708 public void setUseExternalMessageReferences(boolean enable) {
709 if (enable) {
710 throw new IllegalArgumentException("The journal does not support message references.");
711 }
712 }
713
714 public Packet toPacket(ByteSequence sequence) {
715 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
716 }
717
718 public ByteSequence toByteSequence(Packet packet) {
719 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
720 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
721 }
722
723 public void setBrokerName(String brokerName) {
724 longTermPersistence.setBrokerName(brokerName);
725 }
726
727 @Override
728 public String toString() {
729 return "JournalPersistenceAdapator(" + longTermPersistence + ")";
730 }
731
732 public void setDirectory(File dir) {
733 }
734
735 public long size(){
736 return 0;
737 }
738
739 public void setBrokerService(BrokerService brokerService) {
740 this.brokerService = brokerService;
741 PersistenceAdapter pa = getLongTermPersistence();
742 if( pa instanceof BrokerServiceAware ) {
743 ((BrokerServiceAware)pa).setBrokerService(brokerService);
744 }
745 }
746
747 public long getLastProducerSequenceId(ProducerId id) {
748 return -1;
749 }
750
751 }