001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.File;
020 import java.io.IOException;
021
022 import org.apache.activeio.journal.Journal;
023 import org.apache.activeio.journal.active.JournalImpl;
024 import org.apache.activeio.journal.active.JournalLockedException;
025 import org.apache.activemq.store.PersistenceAdapter;
026 import org.apache.activemq.store.PersistenceAdapterFactory;
027 import org.apache.activemq.store.jdbc.DataSourceSupport;
028 import org.apache.activemq.store.jdbc.JDBCAdapter;
029 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
030 import org.apache.activemq.store.jdbc.Statements;
031 import org.apache.activemq.thread.TaskRunnerFactory;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * Factory class that can create PersistenceAdapter objects.
037 *
038 * @org.apache.xbean.XBean
039 *
040 */
041 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
042
043 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
044
045 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
046
047 private int journalLogFileSize = 1024 * 1024 * 20;
048 private int journalLogFiles = 2;
049 private TaskRunnerFactory taskRunnerFactory;
050 private Journal journal;
051 private boolean useJournal = true;
052 private boolean useQuickJournal;
053 private File journalArchiveDirectory;
054 private boolean failIfJournalIsLocked;
055 private int journalThreadPriority = Thread.MAX_PRIORITY;
056 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
057 private boolean useDedicatedTaskRunner;
058
059 public PersistenceAdapter createPersistenceAdapter() throws IOException {
060 jdbcPersistenceAdapter.setDataSource(getDataSource());
061
062 if (!useJournal) {
063 return jdbcPersistenceAdapter;
064 }
065 return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
066
067 }
068
069 public int getJournalLogFiles() {
070 return journalLogFiles;
071 }
072
073 /**
074 * Sets the number of journal log files to use
075 */
076 public void setJournalLogFiles(int journalLogFiles) {
077 this.journalLogFiles = journalLogFiles;
078 }
079
080 public int getJournalLogFileSize() {
081 return journalLogFileSize;
082 }
083
084 /**
085 * Sets the size of the journal log files
086 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
087 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
088 */
089 public void setJournalLogFileSize(int journalLogFileSize) {
090 this.journalLogFileSize = journalLogFileSize;
091 }
092
093 public JDBCPersistenceAdapter getJdbcAdapter() {
094 return jdbcPersistenceAdapter;
095 }
096
097 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
098 this.jdbcPersistenceAdapter = jdbcAdapter;
099 }
100
101 public boolean isUseJournal() {
102 return useJournal;
103 }
104
105 /**
106 * Enables or disables the use of the journal. The default is to use the
107 * journal
108 *
109 * @param useJournal
110 */
111 public void setUseJournal(boolean useJournal) {
112 this.useJournal = useJournal;
113 }
114
115 public boolean isUseDedicatedTaskRunner() {
116 return useDedicatedTaskRunner;
117 }
118
119 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
120 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
121 }
122
123 public TaskRunnerFactory getTaskRunnerFactory() {
124 if (taskRunnerFactory == null) {
125 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
126 true, 1000, isUseDedicatedTaskRunner());
127 }
128 return taskRunnerFactory;
129 }
130
131 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
132 this.taskRunnerFactory = taskRunnerFactory;
133 }
134
135 public Journal getJournal() throws IOException {
136 if (journal == null) {
137 createJournal();
138 }
139 return journal;
140 }
141
142 public void setJournal(Journal journal) {
143 this.journal = journal;
144 }
145
146 public File getJournalArchiveDirectory() {
147 if (journalArchiveDirectory == null && useQuickJournal) {
148 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
149 }
150 return journalArchiveDirectory;
151 }
152
153 public void setJournalArchiveDirectory(File journalArchiveDirectory) {
154 this.journalArchiveDirectory = journalArchiveDirectory;
155 }
156
157 public boolean isUseQuickJournal() {
158 return useQuickJournal;
159 }
160
161 /**
162 * Enables or disables the use of quick journal, which keeps messages in the
163 * journal and just stores a reference to the messages in JDBC. Defaults to
164 * false so that messages actually reside long term in the JDBC database.
165 */
166 public void setUseQuickJournal(boolean useQuickJournal) {
167 this.useQuickJournal = useQuickJournal;
168 }
169
170 public JDBCAdapter getAdapter() throws IOException {
171 return jdbcPersistenceAdapter.getAdapter();
172 }
173
174 public void setAdapter(JDBCAdapter adapter) {
175 jdbcPersistenceAdapter.setAdapter(adapter);
176 }
177
178 public Statements getStatements() {
179 return jdbcPersistenceAdapter.getStatements();
180 }
181
182 public void setStatements(Statements statements) {
183 jdbcPersistenceAdapter.setStatements(statements);
184 }
185
186 public boolean isUseDatabaseLock() {
187 return jdbcPersistenceAdapter.isUseDatabaseLock();
188 }
189
190 /**
191 * Sets whether or not an exclusive database lock should be used to enable
192 * JDBC Master/Slave. Enabled by default.
193 */
194 public void setUseDatabaseLock(boolean useDatabaseLock) {
195 jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
196 }
197
198 public boolean isCreateTablesOnStartup() {
199 return jdbcPersistenceAdapter.isCreateTablesOnStartup();
200 }
201
202 /**
203 * Sets whether or not tables are created on startup
204 */
205 public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
206 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
207 }
208
209 public int getJournalThreadPriority() {
210 return journalThreadPriority;
211 }
212
213 /**
214 * Sets the thread priority of the journal thread
215 */
216 public void setJournalThreadPriority(int journalThreadPriority) {
217 this.journalThreadPriority = journalThreadPriority;
218 }
219
220 /**
221 * @throws IOException
222 */
223 protected void createJournal() throws IOException {
224 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
225 if (failIfJournalIsLocked) {
226 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
227 getJournalArchiveDirectory());
228 } else {
229 while (true) {
230 try {
231 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
232 getJournalArchiveDirectory());
233 break;
234 } catch (JournalLockedException e) {
235 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
236 + " seconds for the journal to be unlocked.");
237 try {
238 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
239 } catch (InterruptedException e1) {
240 }
241 }
242 }
243 }
244 }
245
246 }