001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.Serializable;
020
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023
024 import org.apache.activemq.broker.region.MessageReference;
025 import org.apache.activemq.command.MessageId;
026 import org.apache.activemq.command.ProducerId;
027 import org.apache.activemq.util.BitArrayBin;
028 import org.apache.activemq.util.IdGenerator;
029 import org.apache.activemq.util.LRUCache;
030
031 /**
032 * Provides basic audit functions for Messages without sync
033 *
034 *
035 */
036 public class ActiveMQMessageAuditNoSync implements Serializable {
037
038 private static final long serialVersionUID = 1L;
039
040 public static final int DEFAULT_WINDOW_SIZE = 2048;
041 public static final int MAXIMUM_PRODUCER_COUNT = 64;
042 private int auditDepth;
043 private int maximumNumberOfProducersToTrack;
044 private LRUCache<Object, BitArrayBin> map;
045
046 /**
047 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
048 * 64
049 */
050 public ActiveMQMessageAuditNoSync() {
051 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052 }
053
054 /**
055 * Construct a MessageAudit
056 *
057 * @param auditDepth range of ids to track
058 * @param maximumNumberOfProducersToTrack number of producers expected in
059 * the system
060 */
061 public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
062 this.auditDepth = auditDepth;
063 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
064 this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
065 }
066
067 /**
068 * @return the auditDepth
069 */
070 public int getAuditDepth() {
071 return auditDepth;
072 }
073
074 /**
075 * @param auditDepth the auditDepth to set
076 */
077 public void setAuditDepth(int auditDepth) {
078 this.auditDepth = auditDepth;
079 }
080
081 /**
082 * @return the maximumNumberOfProducersToTrack
083 */
084 public int getMaximumNumberOfProducersToTrack() {
085 return maximumNumberOfProducersToTrack;
086 }
087
088 /**
089 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090 */
091 public void setMaximumNumberOfProducersToTrack(
092 int maximumNumberOfProducersToTrack) {
093 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
094 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
095 }
096
097 /**
098 * Checks if this message has been seen before
099 *
100 * @param message
101 * @return true if the message is a duplicate
102 * @throws JMSException
103 */
104 public boolean isDuplicate(Message message) throws JMSException {
105 return isDuplicate(message.getJMSMessageID());
106 }
107
108 /**
109 * checks whether this messageId has been seen before and adds this
110 * messageId to the list
111 *
112 * @param id
113 * @return true if the message is a duplicate
114 */
115 public boolean isDuplicate(String id) {
116 boolean answer = false;
117 String seed = IdGenerator.getSeedFromId(id);
118 if (seed != null) {
119 BitArrayBin bab = map.get(seed);
120 if (bab == null) {
121 bab = new BitArrayBin(auditDepth);
122 map.put(seed, bab);
123 }
124 long index = IdGenerator.getSequenceFromId(id);
125 if (index >= 0) {
126 answer = bab.setBit(index, true);
127 }
128 }
129 return answer;
130 }
131
132 /**
133 * Checks if this message has been seen before
134 *
135 * @param message
136 * @return true if the message is a duplicate
137 */
138 public boolean isDuplicate(final MessageReference message) {
139 MessageId id = message.getMessageId();
140 return isDuplicate(id);
141 }
142
143 /**
144 * Checks if this messageId has been seen before
145 *
146 * @param id
147 * @return true if the message is a duplicate
148 */
149 public boolean isDuplicate(final MessageId id) {
150 boolean answer = false;
151
152 if (id != null) {
153 ProducerId pid = id.getProducerId();
154 if (pid != null) {
155 BitArrayBin bab = map.get(pid);
156 if (bab == null) {
157 bab = new BitArrayBin(auditDepth);
158 map.put(pid, bab);
159 }
160 answer = bab.setBit(id.getProducerSequenceId(), true);
161 }
162 }
163 return answer;
164 }
165
166 /**
167 * mark this message as being received
168 *
169 * @param message
170 */
171 public void rollback(final MessageReference message) {
172 MessageId id = message.getMessageId();
173 rollback(id);
174 }
175
176 /**
177 * mark this message as being received
178 *
179 * @param id
180 */
181 public void rollback(final MessageId id) {
182 if (id != null) {
183 ProducerId pid = id.getProducerId();
184 if (pid != null) {
185 BitArrayBin bab = map.get(pid);
186 if (bab != null) {
187 bab.setBit(id.getProducerSequenceId(), false);
188 }
189 }
190 }
191 }
192
193 /**
194 * Check the message is in order
195 * @param msg
196 * @return
197 * @throws JMSException
198 */
199 public boolean isInOrder(Message msg) throws JMSException {
200 return isInOrder(msg.getJMSMessageID());
201 }
202
203 /**
204 * Check the message id is in order
205 * @param id
206 * @return
207 */
208 public boolean isInOrder(final String id) {
209 boolean answer = true;
210
211 if (id != null) {
212 String seed = IdGenerator.getSeedFromId(id);
213 if (seed != null) {
214 BitArrayBin bab = map.get(seed);
215 if (bab != null) {
216 long index = IdGenerator.getSequenceFromId(id);
217 answer = bab.isInOrder(index);
218 }
219
220 }
221 }
222 return answer;
223 }
224
225 /**
226 * Check the MessageId is in order
227 * @param message
228 * @return
229 */
230 public boolean isInOrder(final MessageReference message) {
231 return isInOrder(message.getMessageId());
232 }
233
234 /**
235 * Check the MessageId is in order
236 * @param id
237 * @return
238 */
239 public boolean isInOrder(final MessageId id) {
240 boolean answer = false;
241
242 if (id != null) {
243 ProducerId pid = id.getProducerId();
244 if (pid != null) {
245 BitArrayBin bab = map.get(pid);
246 if (bab == null) {
247 bab = new BitArrayBin(auditDepth);
248 map.put(pid, bab);
249 }
250 answer = bab.isInOrder(id.getProducerSequenceId());
251
252 }
253 }
254 return answer;
255 }
256
257 public long getLastSeqId(ProducerId id) {
258 long result = -1;
259 BitArrayBin bab = map.get(id.toString() + ":");
260 if (bab != null) {
261 result = bab.getLastSetIndex();
262 }
263 return result;
264 }
265
266 public void clear() {
267 map.clear();
268 }
269 }