001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.ArrayList;
020 import java.util.HashMap;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Map;
024 import org.apache.activemq.broker.region.MessageReference;
025 import org.apache.activemq.command.MessageId;
026
027 public class PrioritizedPendingList implements PendingList {
028 static final Integer MAX_PRIORITY = 10;
029 private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
030 final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
031
032 public PrioritizedPendingList() {
033 for (int i = 0; i < MAX_PRIORITY; i++) {
034 this.lists[i] = new OrderedPendingList();
035 }
036 }
037 public PendingNode addMessageFirst(MessageReference message) {
038 PendingNode node = getList(message).addMessageFirst(message);
039 this.map.put(message.getMessageId(), node);
040 return node;
041 }
042
043 public PendingNode addMessageLast(MessageReference message) {
044 PendingNode node = getList(message).addMessageLast(message);
045 this.map.put(message.getMessageId(), node);
046 return node;
047 }
048
049 public void clear() {
050 for (int i = 0; i < MAX_PRIORITY; i++) {
051 this.lists[i].clear();
052 }
053 this.map.clear();
054 }
055
056 public boolean isEmpty() {
057 return this.map.isEmpty();
058 }
059
060 public Iterator<MessageReference> iterator() {
061 return new PrioritizedPendingListIterator();
062 }
063
064 public void remove(MessageReference message) {
065 if (message != null) {
066 PendingNode node = this.map.remove(message.getMessageId());
067 if (node != null) {
068 node.getList().removeNode(node);
069 }
070 }
071 }
072
073 public int size() {
074 return this.map.size();
075 }
076
077 @Override
078 public String toString() {
079 return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
080 }
081
082 protected int getPriority(MessageReference message) {
083 int priority = javax.jms.Message.DEFAULT_PRIORITY;
084 if (message.getMessageId() != null) {
085 priority = Math.max(message.getMessage().getPriority(), 0);
086 priority = Math.min(priority, 9);
087 }
088 return priority;
089 }
090
091 protected OrderedPendingList getList(MessageReference msg) {
092 return lists[getPriority(msg)];
093 }
094
095 private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
096 private int index = 0;
097 private int currentIndex = 0;
098 List<PendingNode> list = new ArrayList<PendingNode>(size());
099
100 PrioritizedPendingListIterator() {
101 for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
102 OrderedPendingList orderedPendingList = lists[i];
103 if (!orderedPendingList.isEmpty()) {
104 list.addAll(orderedPendingList.getAsList());
105 }
106 }
107 }
108 public boolean hasNext() {
109 return list.size() > index;
110 }
111
112 public MessageReference next() {
113 PendingNode node = list.get(this.index);
114 this.currentIndex = this.index;
115 this.index++;
116 return node.getMessage();
117 }
118
119 public void remove() {
120 PendingNode node = list.get(this.currentIndex);
121 if (node != null) {
122 map.remove(node.getMessage().getMessageId());
123 node.getList().removeNode(node);
124 }
125
126 }
127
128 }
129
130 }