001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.Set;
022 import org.apache.activeio.journal.Journal;
023 import org.apache.activemq.broker.BrokerService;
024 import org.apache.activemq.broker.BrokerServiceAware;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.ActiveMQQueue;
028 import org.apache.activemq.command.ActiveMQTopic;
029 import org.apache.activemq.command.ProducerId;
030 import org.apache.activemq.store.MessageStore;
031 import org.apache.activemq.store.PersistenceAdapter;
032 import org.apache.activemq.store.TopicMessageStore;
033 import org.apache.activemq.store.TransactionStore;
034 import org.apache.activemq.usage.SystemUsage;
035
036 /**
037 * An implementation of {@link PersistenceAdapter} designed for use with a
038 * {@link Journal} and then check pointing asynchronously on a timeout with some
039 * other long term persistent storage.
040 *
041 * @org.apache.xbean.XBean element="kahaDB"
042 *
043 */
044 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
045 private final KahaDBStore letter = new KahaDBStore();
046
047 /**
048 * @param context
049 * @throws IOException
050 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
051 */
052 public void beginTransaction(ConnectionContext context) throws IOException {
053 this.letter.beginTransaction(context);
054 }
055
056 /**
057 * @param sync
058 * @throws IOException
059 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
060 */
061 public void checkpoint(boolean sync) throws IOException {
062 this.letter.checkpoint(sync);
063 }
064
065 /**
066 * @param context
067 * @throws IOException
068 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
069 */
070 public void commitTransaction(ConnectionContext context) throws IOException {
071 this.letter.commitTransaction(context);
072 }
073
074 /**
075 * @param destination
076 * @return MessageStore
077 * @throws IOException
078 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
079 */
080 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
081 return this.letter.createQueueMessageStore(destination);
082 }
083
084 /**
085 * @param destination
086 * @return TopicMessageStore
087 * @throws IOException
088 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
089 */
090 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
091 return this.letter.createTopicMessageStore(destination);
092 }
093
094 /**
095 * @return TrandactionStore
096 * @throws IOException
097 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
098 */
099 public TransactionStore createTransactionStore() throws IOException {
100 return this.letter.createTransactionStore();
101 }
102
103 /**
104 * @throws IOException
105 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
106 */
107 public void deleteAllMessages() throws IOException {
108 this.letter.deleteAllMessages();
109 }
110
111 /**
112 * @return destinations
113 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
114 */
115 public Set<ActiveMQDestination> getDestinations() {
116 return this.letter.getDestinations();
117 }
118
119 /**
120 * @return lastMessageBrokerSequenceId
121 * @throws IOException
122 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
123 */
124 public long getLastMessageBrokerSequenceId() throws IOException {
125 return this.letter.getLastMessageBrokerSequenceId();
126 }
127
128 public long getLastProducerSequenceId(ProducerId id) throws IOException {
129 return this.letter.getLastProducerSequenceId(id);
130 }
131
132 /**
133 * @param destination
134 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
135 */
136 public void removeQueueMessageStore(ActiveMQQueue destination) {
137 this.letter.removeQueueMessageStore(destination);
138 }
139
140 /**
141 * @param destination
142 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
143 */
144 public void removeTopicMessageStore(ActiveMQTopic destination) {
145 this.letter.removeTopicMessageStore(destination);
146 }
147
148 /**
149 * @param context
150 * @throws IOException
151 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
152 */
153 public void rollbackTransaction(ConnectionContext context) throws IOException {
154 this.letter.rollbackTransaction(context);
155 }
156
157 /**
158 * @param brokerName
159 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
160 */
161 public void setBrokerName(String brokerName) {
162 this.letter.setBrokerName(brokerName);
163 }
164
165 /**
166 * @param usageManager
167 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
168 */
169 public void setUsageManager(SystemUsage usageManager) {
170 this.letter.setUsageManager(usageManager);
171 }
172
173 /**
174 * @return the size of the store
175 * @see org.apache.activemq.store.PersistenceAdapter#size()
176 */
177 public long size() {
178 return this.letter.size();
179 }
180
181 /**
182 * @throws Exception
183 * @see org.apache.activemq.Service#start()
184 */
185 public void start() throws Exception {
186 this.letter.start();
187 }
188
189 /**
190 * @throws Exception
191 * @see org.apache.activemq.Service#stop()
192 */
193 public void stop() throws Exception {
194 this.letter.stop();
195 }
196
197 /**
198 * Get the journalMaxFileLength
199 *
200 * @return the journalMaxFileLength
201 */
202 public int getJournalMaxFileLength() {
203 return this.letter.getJournalMaxFileLength();
204 }
205
206 /**
207 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
208 * be used
209 *
210 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
211 */
212 public void setJournalMaxFileLength(int journalMaxFileLength) {
213 this.letter.setJournalMaxFileLength(journalMaxFileLength);
214 }
215
216 /**
217 * Set the max number of producers (LRU cache) to track for duplicate sends
218 */
219 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
220 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
221 }
222
223 public int getMaxFailoverProducersToTrack() {
224 return this.letter.getMaxFailoverProducersToTrack();
225 }
226
227 /**
228 * set the audit window depth for duplicate suppression (should exceed the max transaction
229 * batch)
230 */
231 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
232 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
233 }
234
235 public int getFailoverProducersAuditDepth() {
236 return this.getFailoverProducersAuditDepth();
237 }
238
239 /**
240 * Get the checkpointInterval
241 *
242 * @return the checkpointInterval
243 */
244 public long getCheckpointInterval() {
245 return this.letter.getCheckpointInterval();
246 }
247
248 /**
249 * Set the checkpointInterval
250 *
251 * @param checkpointInterval
252 * the checkpointInterval to set
253 */
254 public void setCheckpointInterval(long checkpointInterval) {
255 this.letter.setCheckpointInterval(checkpointInterval);
256 }
257
258 /**
259 * Get the cleanupInterval
260 *
261 * @return the cleanupInterval
262 */
263 public long getCleanupInterval() {
264 return this.letter.getCleanupInterval();
265 }
266
267 /**
268 * Set the cleanupInterval
269 *
270 * @param cleanupInterval
271 * the cleanupInterval to set
272 */
273 public void setCleanupInterval(long cleanupInterval) {
274 this.letter.setCleanupInterval(cleanupInterval);
275 }
276
277 /**
278 * Get the indexWriteBatchSize
279 *
280 * @return the indexWriteBatchSize
281 */
282 public int getIndexWriteBatchSize() {
283 return this.letter.getIndexWriteBatchSize();
284 }
285
286 /**
287 * Set the indexWriteBatchSize
288 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
289 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
290 * @param indexWriteBatchSize
291 * the indexWriteBatchSize to set
292 */
293 public void setIndexWriteBatchSize(int indexWriteBatchSize) {
294 this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
295 }
296
297 /**
298 * Get the journalMaxWriteBatchSize
299 *
300 * @return the journalMaxWriteBatchSize
301 */
302 public int getJournalMaxWriteBatchSize() {
303 return this.letter.getJournalMaxWriteBatchSize();
304 }
305
306 /**
307 * Set the journalMaxWriteBatchSize
308 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
309 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
310 * @param journalMaxWriteBatchSize
311 * the journalMaxWriteBatchSize to set
312 */
313 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
314 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
315 }
316
317 /**
318 * Get the enableIndexWriteAsync
319 *
320 * @return the enableIndexWriteAsync
321 */
322 public boolean isEnableIndexWriteAsync() {
323 return this.letter.isEnableIndexWriteAsync();
324 }
325
326 /**
327 * Set the enableIndexWriteAsync
328 *
329 * @param enableIndexWriteAsync
330 * the enableIndexWriteAsync to set
331 */
332 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
333 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
334 }
335
336 /**
337 * Get the directory
338 *
339 * @return the directory
340 */
341 public File getDirectory() {
342 return this.letter.getDirectory();
343 }
344
345 /**
346 * @param dir
347 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
348 */
349 public void setDirectory(File dir) {
350 this.letter.setDirectory(dir);
351 }
352
353 /**
354 * Get the enableJournalDiskSyncs
355 *
356 * @return the enableJournalDiskSyncs
357 */
358 public boolean isEnableJournalDiskSyncs() {
359 return this.letter.isEnableJournalDiskSyncs();
360 }
361
362 /**
363 * Set the enableJournalDiskSyncs
364 *
365 * @param enableJournalDiskSyncs
366 * the enableJournalDiskSyncs to set
367 */
368 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
369 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
370 }
371
372 /**
373 * Get the indexCacheSize
374 *
375 * @return the indexCacheSize
376 */
377 public int getIndexCacheSize() {
378 return this.letter.getIndexCacheSize();
379 }
380
381 /**
382 * Set the indexCacheSize
383 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
384 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
385 * @param indexCacheSize
386 * the indexCacheSize to set
387 */
388 public void setIndexCacheSize(int indexCacheSize) {
389 this.letter.setIndexCacheSize(indexCacheSize);
390 }
391
392 /**
393 * Get the ignoreMissingJournalfiles
394 *
395 * @return the ignoreMissingJournalfiles
396 */
397 public boolean isIgnoreMissingJournalfiles() {
398 return this.letter.isIgnoreMissingJournalfiles();
399 }
400
401 /**
402 * Set the ignoreMissingJournalfiles
403 *
404 * @param ignoreMissingJournalfiles
405 * the ignoreMissingJournalfiles to set
406 */
407 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
408 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
409 }
410
411 public boolean isChecksumJournalFiles() {
412 return letter.isChecksumJournalFiles();
413 }
414
415 public boolean isCheckForCorruptJournalFiles() {
416 return letter.isCheckForCorruptJournalFiles();
417 }
418
419 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
420 letter.setChecksumJournalFiles(checksumJournalFiles);
421 }
422
423 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
424 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
425 }
426
427 public void setBrokerService(BrokerService brokerService) {
428 letter.setBrokerService(brokerService);
429 }
430
431 public boolean isArchiveDataLogs() {
432 return letter.isArchiveDataLogs();
433 }
434
435 public void setArchiveDataLogs(boolean archiveDataLogs) {
436 letter.setArchiveDataLogs(archiveDataLogs);
437 }
438
439 public File getDirectoryArchive() {
440 return letter.getDirectoryArchive();
441 }
442
443 public void setDirectoryArchive(File directoryArchive) {
444 letter.setDirectoryArchive(directoryArchive);
445 }
446
447 public boolean isConcurrentStoreAndDispatchQueues() {
448 return letter.isConcurrentStoreAndDispatchQueues();
449 }
450
451 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
452 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
453 }
454
455 public boolean isConcurrentStoreAndDispatchTopics() {
456 return letter.isConcurrentStoreAndDispatchTopics();
457 }
458
459 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
460 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
461 }
462
463 public int getMaxAsyncJobs() {
464 return letter.getMaxAsyncJobs();
465 }
466 /**
467 * @param maxAsyncJobs
468 * the maxAsyncJobs to set
469 */
470 public void setMaxAsyncJobs(int maxAsyncJobs) {
471 letter.setMaxAsyncJobs(maxAsyncJobs);
472 }
473
474 /**
475 * @return the databaseLockedWaitDelay
476 */
477 public int getDatabaseLockedWaitDelay() {
478 return letter.getDatabaseLockedWaitDelay();
479 }
480
481 /**
482 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
483 */
484 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
485 letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
486 }
487
488 public boolean getForceRecoverIndex() {
489 return letter.getForceRecoverIndex();
490 }
491
492 public void setForceRecoverIndex(boolean forceRecoverIndex) {
493 letter.setForceRecoverIndex(forceRecoverIndex);
494 }
495
496 // for testing
497 public KahaDBStore getStore() {
498 return letter;
499 }
500
501 @Override
502 public String toString() {
503 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
504 return "KahaDBPersistenceAdapter[" + path + "]";
505 }
506
507 }