001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.plist;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.util.concurrent.atomic.AtomicBoolean;
023 import java.util.concurrent.atomic.AtomicReference;
024 import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
025 import org.apache.kahadb.journal.Location;
026 import org.apache.kahadb.page.Page;
027 import org.apache.kahadb.page.Transaction;
028 import org.apache.kahadb.util.ByteSequence;
029
030 public class PList {
031 final PListStore store;
032 private String name;
033 private long rootId = EntryLocation.NOT_SET;
034 private long lastId = EntryLocation.NOT_SET;
035 private final AtomicBoolean loaded = new AtomicBoolean();
036 private int size = 0;
037 Object indexLock;
038
039 PList(PListStore store) {
040 this.store = store;
041 this.indexLock = store.getIndexLock();
042 }
043
044 public void setName(String name) {
045 this.name = name;
046 }
047
048 /*
049 * (non-Javadoc)
050 * @see org.apache.activemq.beanstalk.JobScheduler#getName()
051 */
052 public String getName() {
053 return this.name;
054 }
055
056 public synchronized int size() {
057 return this.size;
058 }
059
060 public synchronized boolean isEmpty() {
061 return size == 0;
062 }
063
064 /**
065 * @return the rootId
066 */
067 public long getRootId() {
068 return this.rootId;
069 }
070
071 /**
072 * @param rootId
073 * the rootId to set
074 */
075 public void setRootId(long rootId) {
076 this.rootId = rootId;
077 }
078
079 /**
080 * @return the lastId
081 */
082 public long getLastId() {
083 return this.lastId;
084 }
085
086 /**
087 * @param lastId
088 * the lastId to set
089 */
090 public void setLastId(long lastId) {
091 this.lastId = lastId;
092 }
093
094 /**
095 * @return the loaded
096 */
097 public boolean isLoaded() {
098 return this.loaded.get();
099 }
100
101 void read(DataInput in) throws IOException {
102 this.rootId = in.readLong();
103 this.name = in.readUTF();
104 }
105
106 public void write(DataOutput out) throws IOException {
107 out.writeLong(this.rootId);
108 out.writeUTF(name);
109 }
110
111 public synchronized void destroy() throws IOException {
112 synchronized (indexLock) {
113 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
114 public void execute(Transaction tx) throws IOException {
115 destroy(tx);
116 }
117 });
118 }
119 }
120
121 void destroy(Transaction tx) throws IOException {
122 // start from the first
123 EntryLocation entry = getFirst(tx);
124 while (entry != null) {
125 EntryLocation toRemove = entry.copy();
126 entry = getNext(tx, entry.getNext());
127 doRemove(tx, toRemove);
128 }
129 }
130
131 synchronized void load(Transaction tx) throws IOException {
132 if (loaded.compareAndSet(false, true)) {
133 final Page<EntryLocation> p = tx.load(this.rootId, null);
134 if (p.getType() == Page.PAGE_FREE_TYPE) {
135 // Need to initialize it..
136 EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
137
138 storeEntry(tx, root);
139 this.lastId = root.getPage().getPageId();
140 } else {
141 // find last id
142 long nextId = this.rootId;
143 while (nextId != EntryLocation.NOT_SET) {
144 EntryLocation next = getNext(tx, nextId);
145 if (next != null) {
146 this.lastId = next.getPage().getPageId();
147 nextId = next.getNext();
148 this.size++;
149 }
150 }
151 }
152 }
153 }
154
155 synchronized public void unload() {
156 if (loaded.compareAndSet(true, false)) {
157 this.rootId = EntryLocation.NOT_SET;
158 this.lastId = EntryLocation.NOT_SET;
159 this.size=0;
160 }
161 }
162
163 synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
164 final Location location = this.store.write(bs, false);
165 synchronized (indexLock) {
166 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
167 public void execute(Transaction tx) throws IOException {
168 addLast(tx, id, bs, location);
169 }
170 });
171 }
172 }
173
174 private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
175 EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
176 entry.setLocation(location);
177 storeEntry(tx, entry);
178 this.store.incrementJournalCount(tx, location);
179
180 EntryLocation last = loadEntry(tx, this.lastId);
181 last.setNext(entry.getPage().getPageId());
182 storeEntry(tx, last);
183 this.lastId = entry.getPage().getPageId();
184 this.size++;
185 }
186
187 synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
188 final Location location = this.store.write(bs, false);
189 synchronized (indexLock) {
190 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
191 public void execute(Transaction tx) throws IOException {
192 addFirst(tx, id, bs, location);
193 }
194 });
195 }
196 }
197
198 private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
199 EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
200 entry.setLocation(location);
201 EntryLocation oldFirst = getFirst(tx);
202 if (oldFirst != null) {
203 oldFirst.setPrev(entry.getPage().getPageId());
204 storeEntry(tx, oldFirst);
205 entry.setNext(oldFirst.getPage().getPageId());
206
207 }
208 EntryLocation root = getRoot(tx);
209 root.setNext(entry.getPage().getPageId());
210 storeEntry(tx, root);
211 storeEntry(tx, entry);
212
213 this.store.incrementJournalCount(tx, location);
214 this.size++;
215 }
216
217 synchronized public boolean remove(final String id) throws IOException {
218 final AtomicBoolean result = new AtomicBoolean();
219 synchronized (indexLock) {
220 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
221 public void execute(Transaction tx) throws IOException {
222 result.set(remove(tx, id));
223 }
224 });
225 }
226 return result.get();
227 }
228
229 synchronized public boolean remove(final int position) throws IOException {
230 final AtomicBoolean result = new AtomicBoolean();
231 synchronized (indexLock) {
232 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
233 public void execute(Transaction tx) throws IOException {
234 result.set(remove(tx, position));
235 }
236 });
237 }
238 return result.get();
239 }
240
241 synchronized public boolean remove(final PListEntry entry) throws IOException {
242 final AtomicBoolean result = new AtomicBoolean();
243 synchronized (indexLock) {
244 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
245 public void execute(Transaction tx) throws IOException {
246 result.set(doRemove(tx, entry.getEntry()));
247 }
248 });
249 }
250 return result.get();
251 }
252
253 synchronized public PListEntry get(final int position) throws IOException {
254 PListEntry result = null;
255 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
256 synchronized (indexLock) {
257 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
258 public void execute(Transaction tx) throws IOException {
259 ref.set(get(tx, position));
260 }
261 });
262 }
263 if (ref.get() != null) {
264 ByteSequence bs = this.store.getPayload(ref.get().getLocation());
265 result = new PListEntry(ref.get(), bs);
266 }
267 return result;
268 }
269
270 synchronized public PListEntry getFirst() throws IOException {
271 PListEntry result = null;
272 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
273 synchronized (indexLock) {
274 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
275 public void execute(Transaction tx) throws IOException {
276 ref.set(getFirst(tx));
277 }
278 });
279 if (ref.get() != null) {
280 ByteSequence bs = this.store.getPayload(ref.get().getLocation());
281 result = new PListEntry(ref.get(), bs);
282 }
283 }
284 return result;
285 }
286
287 synchronized public PListEntry getLast() throws IOException {
288 PListEntry result = null;
289 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
290 synchronized (indexLock) {
291 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
292 public void execute(Transaction tx) throws IOException {
293 ref.set(getLast(tx));
294 }
295 });
296 if (ref.get() != null) {
297 ByteSequence bs = this.store.getPayload(ref.get().getLocation());
298 result = new PListEntry(ref.get(), bs);
299 }
300 }
301 return result;
302 }
303
304 synchronized public PListEntry getNext(PListEntry entry) throws IOException {
305 PListEntry result = null;
306 final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
307 if (nextId != EntryLocation.NOT_SET) {
308 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
309 synchronized (indexLock) {
310 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
311 public void execute(Transaction tx) throws IOException {
312 ref.set(getNext(tx, nextId));
313 }
314 });
315 if (ref.get() != null) {
316 ByteSequence bs = this.store.getPayload(ref.get().getLocation());
317 result = new PListEntry(ref.get(), bs);
318 }
319 }
320 }
321 return result;
322 }
323
324 synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
325 PListEntry result = null;
326 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
327 synchronized (indexLock) {
328 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
329 public void execute(Transaction tx) throws IOException {
330 ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
331 }
332 });
333 if (ref.get() != null) {
334 result = new PListEntry(ref.get(), entry.getByteSequence());
335 }
336 }
337 return result;
338 }
339
340 boolean remove(Transaction tx, String id) throws IOException {
341 boolean result = false;
342 long nextId = this.rootId;
343 while (nextId != EntryLocation.NOT_SET) {
344 EntryLocation entry = getNext(tx, nextId);
345 if (entry != null) {
346 if (entry.getId().equals(id)) {
347 result = doRemove(tx, entry);
348 break;
349 }
350 nextId = entry.getNext();
351 } else {
352 // not found
353 break;
354 }
355 }
356 return result;
357 }
358
359 boolean remove(Transaction tx, int position) throws IOException {
360 boolean result = false;
361 long nextId = this.rootId;
362 int count = 0;
363 while (nextId != EntryLocation.NOT_SET) {
364 EntryLocation entry = getNext(tx, nextId);
365 if (entry != null) {
366 if (count == position) {
367 result = doRemove(tx, entry);
368 break;
369 }
370 nextId = entry.getNext();
371 } else {
372 // not found
373 break;
374 }
375 count++;
376 }
377 return result;
378 }
379
380 EntryLocation get(Transaction tx, int position) throws IOException {
381 EntryLocation result = null;
382 long nextId = this.rootId;
383 int count = -1;
384 while (nextId != EntryLocation.NOT_SET) {
385 EntryLocation entry = getNext(tx, nextId);
386 if (entry != null) {
387 if (count == position) {
388 result = entry;
389 break;
390 }
391 nextId = entry.getNext();
392 } else {
393 break;
394 }
395 count++;
396 }
397 return result;
398 }
399
400 EntryLocation getFirst(Transaction tx) throws IOException {
401 long offset = getRoot(tx).getNext();
402 if (offset != EntryLocation.NOT_SET) {
403 return loadEntry(tx, offset);
404 }
405 return null;
406 }
407
408 EntryLocation getLast(Transaction tx) throws IOException {
409 if (this.lastId != EntryLocation.NOT_SET) {
410 return loadEntry(tx, this.lastId);
411 }
412 return null;
413 }
414
415 private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
416 boolean result = false;
417 if (entry != null) {
418
419 EntryLocation prev = getPrevious(tx, entry.getPrev());
420 EntryLocation next = getNext(tx, entry.getNext());
421 long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
422 long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
423
424 if (next != null) {
425 next.setPrev(prevId);
426 storeEntry(tx, next);
427 } else {
428 // we are deleting the last one in the list
429 this.lastId = prevId;
430 }
431 if (prev != null) {
432 prev.setNext(nextId);
433 storeEntry(tx, prev);
434 }
435
436 this.store.decrementJournalCount(tx, entry.getLocation());
437 entry.reset();
438 storeEntry(tx, entry);
439 tx.free(entry.getPage().getPageId());
440 result = true;
441 this.size--;
442 }
443 return result;
444 }
445
446 private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
447 Page<EntryLocation> p = tx.allocate();
448 EntryLocation result = new EntryLocation();
449 result.setPage(p);
450 p.set(result);
451 result.setId(id);
452 result.setPrev(previous);
453 result.setNext(next);
454 return result;
455 }
456
457 private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
458 EntryLocation result = new EntryLocation();
459 result.setPage(p);
460 p.set(result);
461 result.setId(id);
462 result.setPrev(previous);
463 result.setNext(next);
464 return result;
465 }
466
467 EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
468 Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
469 EntryLocation entry = page.get();
470 if (entry != null) {
471 entry.setPage(page);
472 }
473 return entry;
474 }
475
476 private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
477 tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
478 }
479
480 EntryLocation getNext(Transaction tx, long next) throws IOException {
481 EntryLocation result = null;
482 if (next != EntryLocation.NOT_SET) {
483 result = loadEntry(tx, next);
484 }
485 return result;
486 }
487
488 private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
489 EntryLocation result = null;
490 if (previous != EntryLocation.NOT_SET) {
491 result = loadEntry(tx, previous);
492 }
493 return result;
494 }
495
496 private EntryLocation getRoot(Transaction tx) throws IOException {
497 EntryLocation result = loadEntry(tx, this.rootId);
498 return result;
499 }
500
501 ByteSequence getPayload(EntryLocation entry) throws IOException {
502 return this.store.getPayload(entry.getLocation());
503 }
504 }