001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.scheduler;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.File;
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.Map.Entry;
031 import org.apache.activemq.util.IOHelper;
032 import org.apache.activemq.util.ServiceStopper;
033 import org.apache.activemq.util.ServiceSupport;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036 import org.apache.kahadb.index.BTreeIndex;
037 import org.apache.kahadb.journal.Journal;
038 import org.apache.kahadb.journal.Location;
039 import org.apache.kahadb.page.Page;
040 import org.apache.kahadb.page.PageFile;
041 import org.apache.kahadb.page.Transaction;
042 import org.apache.kahadb.util.ByteSequence;
043 import org.apache.kahadb.util.IntegerMarshaller;
044 import org.apache.kahadb.util.LockFile;
045 import org.apache.kahadb.util.StringMarshaller;
046 import org.apache.kahadb.util.VariableMarshaller;
047
048 public class JobSchedulerStore extends ServiceSupport {
049 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
050 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
051
052 public static final int CLOSED_STATE = 1;
053 public static final int OPEN_STATE = 2;
054
055 private File directory;
056 PageFile pageFile;
057 private Journal journal;
058 private LockFile lockFile;
059 private boolean failIfDatabaseIsLocked;
060 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
061 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
062 private boolean enableIndexWriteAsync = false;
063 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
064 MetaData metaData = new MetaData(this);
065 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
066 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
067
068 protected class MetaData {
069 protected MetaData(JobSchedulerStore store) {
070 this.store = store;
071 }
072 private final JobSchedulerStore store;
073 Page<MetaData> page;
074 BTreeIndex<Integer, Integer> journalRC;
075 BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
076
077 void createIndexes(Transaction tx) throws IOException {
078 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
079 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
080 }
081
082 void load(Transaction tx) throws IOException {
083 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
084 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
085 this.storedSchedulers.load(tx);
086 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
087 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
088 this.journalRC.load(tx);
089 }
090
091 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
092 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
093 Entry<String, JobSchedulerImpl> entry = i.next();
094 entry.getValue().load(tx);
095 schedulers.put(entry.getKey(), entry.getValue());
096 }
097 }
098
099 public void read(DataInput is) throws IOException {
100 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
101 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
102 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
103 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
104 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
105 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
106 }
107
108 public void write(DataOutput os) throws IOException {
109 os.writeLong(this.storedSchedulers.getPageId());
110 os.writeLong(this.journalRC.getPageId());
111
112 }
113 }
114
115 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
116 private final JobSchedulerStore store;
117
118 MetaDataMarshaller(JobSchedulerStore store) {
119 this.store = store;
120 }
121 public MetaData readPayload(DataInput dataIn) throws IOException {
122 MetaData rc = new MetaData(this.store);
123 rc.read(dataIn);
124 return rc;
125 }
126
127 public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
128 object.write(dataOut);
129 }
130 }
131
132 class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
133 public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
134 List<JobLocation> result = new ArrayList<JobLocation>();
135 int size = dataIn.readInt();
136 for (int i = 0; i < size; i++) {
137 JobLocation jobLocation = new JobLocation();
138 jobLocation.readExternal(dataIn);
139 result.add(jobLocation);
140 }
141 return result;
142 }
143
144 public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
145 dataOut.writeInt(value.size());
146 for (JobLocation jobLocation : value) {
147 jobLocation.writeExternal(dataOut);
148 }
149 }
150 }
151
152 class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
153 private final JobSchedulerStore store;
154 JobSchedulerMarshaller(JobSchedulerStore store) {
155 this.store = store;
156 }
157 public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
158 JobSchedulerImpl result = new JobSchedulerImpl(this.store);
159 result.read(dataIn);
160 return result;
161 }
162
163 public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
164 js.write(dataOut);
165 }
166 }
167
168 public File getDirectory() {
169 return directory;
170 }
171
172 public void setDirectory(File directory) {
173 this.directory = directory;
174 }
175
176 public long size() {
177 if ( !isStarted() ) {
178 return 0;
179 }
180 try {
181 return journal.getDiskSize() + pageFile.getDiskSize();
182 } catch (IOException e) {
183 throw new RuntimeException(e);
184 }
185 }
186
187 public JobScheduler getJobScheduler(final String name) throws Exception {
188 JobSchedulerImpl result = this.schedulers.get(name);
189 if (result == null) {
190 final JobSchedulerImpl js = new JobSchedulerImpl(this);
191 js.setName(name);
192 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
193 public void execute(Transaction tx) throws IOException {
194 js.createIndexes(tx);
195 js.load(tx);
196 metaData.storedSchedulers.put(tx, name, js);
197 }
198 });
199 result = js;
200 this.schedulers.put(name, js);
201 if (isStarted()) {
202 result.start();
203 }
204 this.pageFile.flush();
205 }
206 return result;
207 }
208
209 synchronized public boolean removeJobScheduler(final String name) throws Exception {
210 boolean result = false;
211 final JobSchedulerImpl js = this.schedulers.remove(name);
212 result = js != null;
213 if (result) {
214 js.stop();
215 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
216 public void execute(Transaction tx) throws IOException {
217 metaData.storedSchedulers.remove(tx, name);
218 js.destroy(tx);
219 }
220 });
221 }
222 return result;
223 }
224
225 @Override
226 protected synchronized void doStart() throws Exception {
227 if (this.directory == null) {
228 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
229 }
230 IOHelper.mkdirs(this.directory);
231 lock();
232 this.journal = new Journal();
233 this.journal.setDirectory(directory);
234 this.journal.setMaxFileLength(getJournalMaxFileLength());
235 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
236 this.journal.start();
237 this.pageFile = new PageFile(directory, "scheduleDB");
238 this.pageFile.load();
239
240 this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
241 public void execute(Transaction tx) throws IOException {
242 if (pageFile.getPageCount() == 0) {
243 Page<MetaData> page = tx.allocate();
244 assert page.getPageId() == 0;
245 page.set(metaData);
246 metaData.page = page;
247 metaData.createIndexes(tx);
248 tx.store(metaData.page, metaDataMarshaller, true);
249
250 } else {
251 Page<MetaData> page = tx.load(0, metaDataMarshaller);
252 metaData = page.get();
253 metaData.page = page;
254 }
255 metaData.load(tx);
256 metaData.loadScheduler(tx, schedulers);
257 for (JobSchedulerImpl js :schedulers.values()) {
258 try {
259 js.start();
260 } catch (Exception e) {
261 JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
262 }
263 }
264 }
265 });
266
267 this.pageFile.flush();
268 LOG.info(this + " started");
269 }
270
271 @Override
272 protected synchronized void doStop(ServiceStopper stopper) throws Exception {
273 for (JobSchedulerImpl js : this.schedulers.values()) {
274 js.stop();
275 }
276 if (this.pageFile != null) {
277 this.pageFile.unload();
278 }
279 if (this.journal != null) {
280 journal.close();
281 }
282 if (this.lockFile != null) {
283 this.lockFile.unlock();
284 }
285 this.lockFile = null;
286 LOG.info(this + " stopped");
287
288 }
289
290 synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
291 int logId = location.getDataFileId();
292 Integer val = this.metaData.journalRC.get(tx, logId);
293 int refCount = val != null ? val.intValue() + 1 : 1;
294 this.metaData.journalRC.put(tx, logId, refCount);
295
296 }
297
298 synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
299 int logId = location.getDataFileId();
300 int refCount = this.metaData.journalRC.get(tx, logId);
301 refCount--;
302 if (refCount <= 0) {
303 this.metaData.journalRC.remove(tx, logId);
304 Set<Integer> set = new HashSet<Integer>();
305 set.add(logId);
306 this.journal.removeDataFiles(set);
307 } else {
308 this.metaData.journalRC.put(tx, logId, refCount);
309 }
310
311 }
312
313 synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
314 ByteSequence result = null;
315 result = this.journal.read(location);
316 return result;
317 }
318
319 synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
320 return this.journal.write(payload, sync);
321 }
322
323 private void lock() throws IOException {
324 if (lockFile == null) {
325 File lockFileName = new File(directory, "lock");
326 lockFile = new LockFile(lockFileName, true);
327 if (failIfDatabaseIsLocked) {
328 lockFile.lock();
329 } else {
330 while (true) {
331 try {
332 lockFile.lock();
333 break;
334 } catch (IOException e) {
335 LOG.info("Database " + lockFileName + " is locked... waiting "
336 + (DATABASE_LOCKED_WAIT_DELAY / 1000)
337 + " seconds for the database to be unlocked. Reason: " + e);
338 try {
339 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
340 } catch (InterruptedException e1) {
341 }
342 }
343 }
344 }
345 }
346 }
347
348 PageFile getPageFile() {
349 this.pageFile.isLoaded();
350 return this.pageFile;
351 }
352
353 public boolean isFailIfDatabaseIsLocked() {
354 return failIfDatabaseIsLocked;
355 }
356
357 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
358 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
359 }
360
361 public int getJournalMaxFileLength() {
362 return journalMaxFileLength;
363 }
364
365 public void setJournalMaxFileLength(int journalMaxFileLength) {
366 this.journalMaxFileLength = journalMaxFileLength;
367 }
368
369 public int getJournalMaxWriteBatchSize() {
370 return journalMaxWriteBatchSize;
371 }
372
373 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
374 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
375 }
376
377 public boolean isEnableIndexWriteAsync() {
378 return enableIndexWriteAsync;
379 }
380
381 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
382 this.enableIndexWriteAsync = enableIndexWriteAsync;
383 }
384
385 @Override
386 public String toString() {
387 return "JobSchedulerStore:" + this.directory;
388 }
389
390 }