001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.plist;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.File;
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.Map.Entry;
031 import org.apache.activemq.util.IOHelper;
032 import org.apache.activemq.util.ServiceStopper;
033 import org.apache.activemq.util.ServiceSupport;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036 import org.apache.kahadb.index.BTreeIndex;
037 import org.apache.kahadb.journal.Journal;
038 import org.apache.kahadb.journal.Location;
039 import org.apache.kahadb.page.Page;
040 import org.apache.kahadb.page.PageFile;
041 import org.apache.kahadb.page.Transaction;
042 import org.apache.kahadb.util.ByteSequence;
043 import org.apache.kahadb.util.IntegerMarshaller;
044 import org.apache.kahadb.util.LockFile;
045 import org.apache.kahadb.util.StringMarshaller;
046 import org.apache.kahadb.util.VariableMarshaller;
047
048 /**
049 * @org.apache.xbean.XBean
050 */
051 public class PListStore extends ServiceSupport {
052 static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
053 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
054
055 static final int CLOSED_STATE = 1;
056 static final int OPEN_STATE = 2;
057
058 private File directory;
059 PageFile pageFile;
060 private Journal journal;
061 private LockFile lockFile;
062 private boolean failIfDatabaseIsLocked;
063 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
064 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
065 private boolean enableIndexWriteAsync = false;
066 private boolean initialized = false;
067 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
068 MetaData metaData = new MetaData(this);
069 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
070 Map<String, PList> persistentLists = new HashMap<String, PList>();
071 final Object indexLock = new Object();
072
073 public Object getIndexLock() {
074 return indexLock;
075 }
076
077 protected class MetaData {
078 protected MetaData(PListStore store) {
079 this.store = store;
080 }
081
082 private final PListStore store;
083 Page<MetaData> page;
084 BTreeIndex<Integer, Integer> journalRC;
085 BTreeIndex<String, PList> storedSchedulers;
086
087 void createIndexes(Transaction tx) throws IOException {
088 this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
089 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
090 }
091
092 void load(Transaction tx) throws IOException {
093 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
094 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
095 this.storedSchedulers.load(tx);
096 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
097 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
098 this.journalRC.load(tx);
099 }
100
101 void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
102 for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
103 Entry<String, PList> entry = i.next();
104 entry.getValue().load(tx);
105 schedulers.put(entry.getKey(), entry.getValue());
106 }
107 }
108
109 public void read(DataInput is) throws IOException {
110 this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
111 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
112 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
113 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
114 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
115 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
116 }
117
118 public void write(DataOutput os) throws IOException {
119 os.writeLong(this.storedSchedulers.getPageId());
120 os.writeLong(this.journalRC.getPageId());
121
122 }
123 }
124
125 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
126 private final PListStore store;
127
128 MetaDataMarshaller(PListStore store) {
129 this.store = store;
130 }
131 public MetaData readPayload(DataInput dataIn) throws IOException {
132 MetaData rc = new MetaData(this.store);
133 rc.read(dataIn);
134 return rc;
135 }
136
137 public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
138 object.write(dataOut);
139 }
140 }
141
142 class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
143 public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
144 List<EntryLocation> result = new ArrayList<EntryLocation>();
145 int size = dataIn.readInt();
146 for (int i = 0; i < size; i++) {
147 EntryLocation jobLocation = new EntryLocation();
148 jobLocation.readExternal(dataIn);
149 result.add(jobLocation);
150 }
151 return result;
152 }
153
154 public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
155 dataOut.writeInt(value.size());
156 for (EntryLocation jobLocation : value) {
157 jobLocation.writeExternal(dataOut);
158 }
159 }
160 }
161
162 class JobSchedulerMarshaller extends VariableMarshaller<PList> {
163 private final PListStore store;
164 JobSchedulerMarshaller(PListStore store) {
165 this.store = store;
166 }
167 public PList readPayload(DataInput dataIn) throws IOException {
168 PList result = new PList(this.store);
169 result.read(dataIn);
170 return result;
171 }
172
173 public void writePayload(PList js, DataOutput dataOut) throws IOException {
174 js.write(dataOut);
175 }
176 }
177
178 public File getDirectory() {
179 return directory;
180 }
181
182 public void setDirectory(File directory) {
183 this.directory = directory;
184 }
185
186 public long size() {
187 synchronized (this) {
188 if (!initialized) {
189 return 0;
190 }
191 }
192 try {
193 return journal.getDiskSize() + pageFile.getDiskSize();
194 } catch (IOException e) {
195 throw new RuntimeException(e);
196 }
197 }
198
199 synchronized public PList getPList(final String name) throws Exception {
200 if (!isStarted()) {
201 throw new IllegalStateException("Not started");
202 }
203 intialize();
204 PList result = this.persistentLists.get(name);
205 if (result == null) {
206 final PList pl = new PList(this);
207 pl.setName(name);
208 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
209 public void execute(Transaction tx) throws IOException {
210 pl.setRootId(tx.allocate().getPageId());
211 pl.load(tx);
212 metaData.storedSchedulers.put(tx, name, pl);
213 }
214 });
215 result = pl;
216 this.persistentLists.put(name, pl);
217 }
218 final PList load = result;
219 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
220 public void execute(Transaction tx) throws IOException {
221 load.load(tx);
222 }
223 });
224
225 return result;
226 }
227
228 synchronized public boolean removePList(final String name) throws Exception {
229 boolean result = false;
230 final PList pl = this.persistentLists.remove(name);
231 result = pl != null;
232 if (result) {
233 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
234 public void execute(Transaction tx) throws IOException {
235 metaData.storedSchedulers.remove(tx, name);
236 pl.destroy(tx);
237 }
238 });
239 }
240 return result;
241 }
242
243 protected synchronized void intialize() throws Exception {
244 if (isStarted()) {
245 if (this.initialized == false) {
246 this.initialized = true;
247 if (this.directory == null) {
248 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
249 }
250 IOHelper.mkdirs(this.directory);
251 lock();
252 this.journal = new Journal();
253 this.journal.setDirectory(directory);
254 this.journal.setMaxFileLength(getJournalMaxFileLength());
255 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
256 this.journal.start();
257 this.pageFile = new PageFile(directory, "tmpDB");
258 this.pageFile.load();
259
260 this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
261 public void execute(Transaction tx) throws IOException {
262 if (pageFile.getPageCount() == 0) {
263 Page<MetaData> page = tx.allocate();
264 assert page.getPageId() == 0;
265 page.set(metaData);
266 metaData.page = page;
267 metaData.createIndexes(tx);
268 tx.store(metaData.page, metaDataMarshaller, true);
269
270 } else {
271 Page<MetaData> page = tx.load(0, metaDataMarshaller);
272 metaData = page.get();
273 metaData.page = page;
274 }
275 metaData.load(tx);
276 metaData.loadLists(tx, persistentLists);
277 }
278 });
279
280 this.pageFile.flush();
281 LOG.info(this + " initialized");
282 }
283 }
284 }
285
286 @Override
287 protected synchronized void doStart() throws Exception {
288 LOG.info(this + " started");
289 }
290
291 @Override
292 protected synchronized void doStop(ServiceStopper stopper) throws Exception {
293 for (PList pl : this.persistentLists.values()) {
294 pl.unload();
295 }
296 if (this.pageFile != null) {
297 this.pageFile.unload();
298 }
299 if (this.journal != null) {
300 journal.close();
301 }
302 if (this.lockFile != null) {
303 this.lockFile.unlock();
304 }
305 this.lockFile = null;
306 this.initialized = false;
307 LOG.info(this + " stopped");
308
309 }
310
311 synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
312 int logId = location.getDataFileId();
313 Integer val = this.metaData.journalRC.get(tx, logId);
314 int refCount = val != null ? val.intValue() + 1 : 1;
315 this.metaData.journalRC.put(tx, logId, refCount);
316
317 }
318
319 synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
320 int logId = location.getDataFileId();
321 if (logId != Location.NOT_SET) {
322 int refCount = this.metaData.journalRC.get(tx, logId);
323 refCount--;
324 if (refCount <= 0) {
325 this.metaData.journalRC.remove(tx, logId);
326 Set<Integer> set = new HashSet<Integer>();
327 set.add(logId);
328 this.journal.removeDataFiles(set);
329 } else {
330 this.metaData.journalRC.put(tx, logId, refCount);
331 }
332 }
333 }
334
335 synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
336 ByteSequence result = null;
337 result = this.journal.read(location);
338 return result;
339 }
340
341 synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
342 return this.journal.write(payload, sync);
343 }
344
345 private void lock() throws IOException {
346 if (lockFile == null) {
347 File lockFileName = new File(directory, "lock");
348 lockFile = new LockFile(lockFileName, true);
349 if (failIfDatabaseIsLocked) {
350 lockFile.lock();
351 } else {
352 while (true) {
353 try {
354 lockFile.lock();
355 break;
356 } catch (IOException e) {
357 LOG.info("Database " + lockFileName + " is locked... waiting "
358 + (DATABASE_LOCKED_WAIT_DELAY / 1000)
359 + " seconds for the database to be unlocked. Reason: " + e);
360 try {
361 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
362 } catch (InterruptedException e1) {
363 }
364 }
365 }
366 }
367 }
368 }
369
370 PageFile getPageFile() {
371 this.pageFile.isLoaded();
372 return this.pageFile;
373 }
374
375 public boolean isFailIfDatabaseIsLocked() {
376 return failIfDatabaseIsLocked;
377 }
378
379 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
380 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
381 }
382
383 public int getJournalMaxFileLength() {
384 return journalMaxFileLength;
385 }
386
387 public void setJournalMaxFileLength(int journalMaxFileLength) {
388 this.journalMaxFileLength = journalMaxFileLength;
389 }
390
391 public int getJournalMaxWriteBatchSize() {
392 return journalMaxWriteBatchSize;
393 }
394
395 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
396 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
397 }
398
399 public boolean isEnableIndexWriteAsync() {
400 return enableIndexWriteAsync;
401 }
402
403 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
404 this.enableIndexWriteAsync = enableIndexWriteAsync;
405 }
406
407 @Override
408 public String toString() {
409 return "PListStore:" + this.directory;
410 }
411
412 }