001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.Collections;
020 import java.util.LinkedList;
021 import java.util.List;
022 import java.util.Set;
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.region.BaseDestination;
027 import org.apache.activemq.broker.region.Destination;
028 import org.apache.activemq.broker.region.MessageReference;
029 import org.apache.activemq.broker.region.Subscription;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.usage.SystemUsage;
032
033 /**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 *
037 *
038 */
039 public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040 protected int memoryUsageHighWaterMark = 70;
041 protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042 protected SystemUsage systemUsage;
043 protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044 protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045 protected boolean enableAudit=true;
046 protected ActiveMQMessageAudit audit;
047 protected boolean useCache=true;
048 private boolean cacheEnabled=true;
049 private boolean started=false;
050 protected MessageReference last = null;
051 protected final boolean prioritizedMessages;
052
053 public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054 this.prioritizedMessages=prioritizedMessages;
055 }
056
057
058 public synchronized void start() throws Exception {
059 if (!started && enableAudit && audit==null) {
060 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061 }
062 started=true;
063 }
064
065 public synchronized void stop() throws Exception {
066 started=false;
067 audit=null;
068 gc();
069 }
070
071 public void add(ConnectionContext context, Destination destination) throws Exception {
072 }
073
074 @SuppressWarnings("unchecked")
075 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
076 return Collections.EMPTY_LIST;
077 }
078
079 public boolean isRecoveryRequired() {
080 return true;
081 }
082
083 public void addMessageFirst(MessageReference node) throws Exception {
084 }
085
086 public void addMessageLast(MessageReference node) throws Exception {
087 }
088
089 public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
090 addMessageLast(node);
091 return true;
092 }
093
094 public void addRecoveredMessage(MessageReference node) throws Exception {
095 addMessageLast(node);
096 }
097
098 public void clear() {
099 }
100
101 public boolean hasNext() {
102 return false;
103 }
104
105 public boolean isEmpty() {
106 return false;
107 }
108
109 public boolean isEmpty(Destination destination) {
110 return isEmpty();
111 }
112
113 public MessageReference next() {
114 return null;
115 }
116
117 public void remove() {
118 }
119
120 public void reset() {
121 }
122
123 public int size() {
124 return 0;
125 }
126
127 public int getMaxBatchSize() {
128 return maxBatchSize;
129 }
130
131 public void setMaxBatchSize(int maxBatchSize) {
132 this.maxBatchSize = maxBatchSize;
133 }
134
135 protected void fillBatch() throws Exception {
136 }
137
138 public void resetForGC() {
139 reset();
140 }
141
142 public void remove(MessageReference node) {
143 }
144
145 public void gc() {
146 }
147
148 public void setSystemUsage(SystemUsage usageManager) {
149 this.systemUsage = usageManager;
150 }
151
152 public boolean hasSpace() {
153 return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
154 }
155
156 public boolean isFull() {
157 return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
158 }
159
160 public void release() {
161 }
162
163 public boolean hasMessagesBufferedToDeliver() {
164 return false;
165 }
166
167 /**
168 * @return the memoryUsageHighWaterMark
169 */
170 public int getMemoryUsageHighWaterMark() {
171 return memoryUsageHighWaterMark;
172 }
173
174 /**
175 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
176 */
177 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
178 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
179 }
180
181 /**
182 * @return the usageManager
183 */
184 public SystemUsage getSystemUsage() {
185 return this.systemUsage;
186 }
187
188 /**
189 * destroy the cursor
190 *
191 * @throws Exception
192 */
193 public void destroy() throws Exception {
194 stop();
195 }
196
197 /**
198 * Page in a restricted number of messages
199 *
200 * @param maxItems maximum number of messages to return
201 * @return a list of paged in messages
202 */
203 public LinkedList<MessageReference> pageInList(int maxItems) {
204 throw new RuntimeException("Not supported");
205 }
206
207 /**
208 * @return the maxProducersToAudit
209 */
210 public int getMaxProducersToAudit() {
211 return maxProducersToAudit;
212 }
213
214 /**
215 * @param maxProducersToAudit the maxProducersToAudit to set
216 */
217 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
218 this.maxProducersToAudit = maxProducersToAudit;
219 if (audit != null) {
220 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
221 }
222 }
223
224 /**
225 * @return the maxAuditDepth
226 */
227 public int getMaxAuditDepth() {
228 return maxAuditDepth;
229 }
230
231
232 /**
233 * @param maxAuditDepth the maxAuditDepth to set
234 */
235 public synchronized void setMaxAuditDepth(int maxAuditDepth) {
236 this.maxAuditDepth = maxAuditDepth;
237 if (audit != null) {
238 audit.setAuditDepth(maxAuditDepth);
239 }
240 }
241
242
243 /**
244 * @return the enableAudit
245 */
246 public boolean isEnableAudit() {
247 return enableAudit;
248 }
249
250 /**
251 * @param enableAudit the enableAudit to set
252 */
253 public synchronized void setEnableAudit(boolean enableAudit) {
254 this.enableAudit = enableAudit;
255 if (enableAudit && started && audit==null) {
256 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
257 }
258 }
259
260 public boolean isTransient() {
261 return false;
262 }
263
264
265 /**
266 * set the audit
267 * @param audit new audit component
268 */
269 public void setMessageAudit(ActiveMQMessageAudit audit) {
270 this.audit=audit;
271 }
272
273
274 /**
275 * @return the audit
276 */
277 public ActiveMQMessageAudit getMessageAudit() {
278 return audit;
279 }
280
281 public boolean isUseCache() {
282 return useCache;
283 }
284
285 public void setUseCache(boolean useCache) {
286 this.useCache = useCache;
287 }
288
289 public synchronized boolean isDuplicate(MessageId messageId) {
290 boolean unique = recordUniqueId(messageId);
291 rollback(messageId);
292 return !unique;
293 }
294
295 /**
296 * records a message id and checks if it is a duplicate
297 * @param messageId
298 * @return true if id is unique, false otherwise.
299 */
300 public synchronized boolean recordUniqueId(MessageId messageId) {
301 if (!enableAudit || audit==null) {
302 return true;
303 }
304 return !audit.isDuplicate(messageId);
305 }
306
307 public synchronized void rollback(MessageId id) {
308 if (audit != null) {
309 audit.rollback(id);
310 }
311 }
312
313 protected synchronized boolean isStarted() {
314 return started;
315 }
316
317 public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
318 boolean result = false;
319 Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
320 if (destinations != null) {
321 for (Destination dest:destinations) {
322 if (dest.isPrioritizedMessages()) {
323 result = true;
324 break;
325 }
326 }
327 }
328 return result;
329
330 }
331
332 public synchronized boolean isCacheEnabled() {
333 return cacheEnabled;
334 }
335
336 public synchronized void setCacheEnabled(boolean val) {
337 cacheEnabled = val;
338 }
339 }