001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.Collections;
020 import java.util.HashMap;
021 import java.util.List;
022 import java.util.Map;
023 import java.util.concurrent.CopyOnWriteArrayList;
024 import org.apache.activemq.advisory.AdvisorySupport;
025 import org.apache.activemq.broker.Broker;
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.broker.region.Destination;
028 import org.apache.activemq.broker.region.DurableTopicSubscription;
029 import org.apache.activemq.broker.region.MessageReference;
030 import org.apache.activemq.broker.region.Topic;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.usage.SystemUsage;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * persist pending messages pending message (messages awaiting dispatch to a
038 * consumer) cursor
039 *
040 *
041 */
042 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
043
044 private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
045 private static final int UNKNOWN = -1;
046 private final String clientId;
047 private final String subscriberName;
048 private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
049 private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
050 private final PendingMessageCursor nonPersistent;
051 private PendingMessageCursor currentCursor;
052 private final DurableTopicSubscription subscription;
053 private int cacheCurrentLowestPriority = UNKNOWN;
054 private boolean immediatePriorityDispatch = true;
055 /**
056 * @param broker Broker for this cursor
057 * @param clientId clientId for this cursor
058 * @param subscriberName subscriber name for this cursor
059 * @param maxBatchSize currently ignored
060 * @param subscription subscription for this cursor
061 */
062 public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) {
063 super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
064 this.subscription=subscription;
065 this.clientId = clientId;
066 this.subscriberName = subscriberName;
067 if (broker.getBrokerService().isPersistent()) {
068 this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
069 } else {
070 this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
071 }
072
073 this.nonPersistent.setMaxBatchSize(maxBatchSize);
074 this.nonPersistent.setSystemUsage(systemUsage);
075 this.storePrefetches.add(this.nonPersistent);
076
077 if (prioritizedMessages) {
078 setMaxAuditDepth(10*getMaxAuditDepth());
079 }
080 }
081
082 @Override
083 public synchronized void start() throws Exception {
084 if (!isStarted()) {
085 super.start();
086 for (PendingMessageCursor tsp : storePrefetches) {
087 tsp.setMessageAudit(getMessageAudit());
088 tsp.start();
089 }
090 }
091 }
092
093 @Override
094 public synchronized void stop() throws Exception {
095 if (isStarted()) {
096 if (subscription.isKeepDurableSubsActive()) {
097 super.gc();
098 super.getMessageAudit().clear();
099 for (PendingMessageCursor tsp : storePrefetches) {
100 tsp.gc();
101 tsp.getMessageAudit().clear();
102 }
103 } else {
104 super.stop();
105 for (PendingMessageCursor tsp : storePrefetches) {
106 tsp.stop();
107 }
108 }
109 }
110 }
111
112 /**
113 * Add a destination
114 *
115 * @param context
116 * @param destination
117 * @throws Exception
118 */
119 @Override
120 public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
121 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
122 TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
123 tsp.setMaxBatchSize(destination.getMaxPageSize());
124 tsp.setSystemUsage(systemUsage);
125 tsp.setMessageAudit(getMessageAudit());
126 tsp.setEnableAudit(isEnableAudit());
127 tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
128 topics.put(destination, tsp);
129 storePrefetches.add(tsp);
130 if (isStarted()) {
131 tsp.start();
132 }
133 }
134 }
135
136 /**
137 * remove a destination
138 *
139 * @param context
140 * @param destination
141 * @throws Exception
142 */
143 @Override
144 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
145 PendingMessageCursor tsp = topics.remove(destination);
146 if (tsp != null) {
147 storePrefetches.remove(tsp);
148 }
149 return Collections.EMPTY_LIST;
150 }
151
152 /**
153 * @return true if there are no pending messages
154 */
155 @Override
156 public synchronized boolean isEmpty() {
157 for (PendingMessageCursor tsp : storePrefetches) {
158 if( !tsp.isEmpty() )
159 return false;
160 }
161 return true;
162 }
163
164 @Override
165 public synchronized boolean isEmpty(Destination destination) {
166 boolean result = true;
167 TopicStorePrefetch tsp = topics.get(destination);
168 if (tsp != null) {
169 result = tsp.isEmpty();
170 }
171 return result;
172 }
173
174 /**
175 * Informs the Broker if the subscription needs to intervention to recover
176 * it's state e.g. DurableTopicSubscriber may do
177 *
178 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
179 * @return true if recovery required
180 */
181 @Override
182 public boolean isRecoveryRequired() {
183 return false;
184 }
185
186 @Override
187 public synchronized void addMessageLast(MessageReference node) throws Exception {
188 if (node != null) {
189 Message msg = node.getMessage();
190 if (isStarted()) {
191 if (!msg.isPersistent()) {
192 nonPersistent.addMessageLast(node);
193 }
194 }
195 if (msg.isPersistent()) {
196 Destination dest = msg.getRegionDestination();
197 TopicStorePrefetch tsp = topics.get(dest);
198 if (tsp != null) {
199 // cache can become high priority cache for immediate dispatch
200 final int priority = msg.getPriority();
201 if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
202 if (priority > tsp.getCurrentLowestPriority()) {
203 if (LOG.isTraceEnabled()) {
204 LOG.trace("enabling cache for cursor on high priority message " + priority
205 + ", current lowest: " + tsp.getCurrentLowestPriority());
206 }
207 tsp.setCacheEnabled(true);
208 cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
209 }
210 } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
211 // go to the store to get next priority message as lower priority messages may be recovered
212 // already and need to acked sequence order
213 if (LOG.isTraceEnabled()) {
214 LOG.trace("disabling/clearing cache for cursor on lower priority message "
215 + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
216 + " cache lowest: " + cacheCurrentLowestPriority);
217 }
218 tsp.setCacheEnabled(false);
219 cacheCurrentLowestPriority = UNKNOWN;
220 }
221 tsp.addMessageLast(node);
222 }
223 }
224
225 }
226 }
227
228 @Override
229 public boolean isTransient() {
230 return subscription.isKeepDurableSubsActive();
231 }
232
233 @Override
234 public void addMessageFirst(MessageReference node) throws Exception {
235 // for keep durable subs active, need to deal with redispatch
236 if (node != null) {
237 Message msg = node.getMessage();
238 if (!msg.isPersistent()) {
239 nonPersistent.addMessageFirst(node);
240 } else {
241 Destination dest = msg.getRegionDestination();
242 TopicStorePrefetch tsp = topics.get(dest);
243 if (tsp != null) {
244 tsp.addMessageFirst(node);
245 }
246 }
247 }
248 }
249
250 @Override
251 public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
252 nonPersistent.addMessageLast(node);
253 }
254
255 @Override
256 public synchronized void clear() {
257 for (PendingMessageCursor tsp : storePrefetches) {
258 tsp.clear();
259 }
260 }
261
262 @Override
263 public synchronized boolean hasNext() {
264 boolean result = true;
265 if (result) {
266 try {
267 currentCursor = getNextCursor();
268 } catch (Exception e) {
269 LOG.error("Failed to get current cursor ", e);
270 throw new RuntimeException(e);
271 }
272 result = currentCursor != null ? currentCursor.hasNext() : false;
273 }
274 return result;
275 }
276
277 @Override
278 public synchronized MessageReference next() {
279 MessageReference result = currentCursor != null ? currentCursor.next() : null;
280 return result;
281 }
282
283 @Override
284 public synchronized void remove() {
285 if (currentCursor != null) {
286 currentCursor.remove();
287 }
288 }
289
290 @Override
291 public synchronized void remove(MessageReference node) {
292 if (currentCursor != null) {
293 currentCursor.remove(node);
294 }
295 }
296
297 @Override
298 public synchronized void reset() {
299 for (PendingMessageCursor storePrefetch : storePrefetches) {
300 storePrefetch.reset();
301 }
302 }
303
304 @Override
305 public synchronized void release() {
306 for (PendingMessageCursor storePrefetch : storePrefetches) {
307 storePrefetch.release();
308 }
309 }
310
311 @Override
312 public synchronized int size() {
313 int pendingCount=0;
314 for (PendingMessageCursor tsp : storePrefetches) {
315 pendingCount += tsp.size();
316 }
317 return pendingCount;
318 }
319
320 @Override
321 public void setMaxBatchSize(int newMaxBatchSize) {
322 for (PendingMessageCursor storePrefetch : storePrefetches) {
323 storePrefetch.setMaxBatchSize(newMaxBatchSize);
324 }
325 super.setMaxBatchSize(newMaxBatchSize);
326 }
327
328 @Override
329 public synchronized void gc() {
330 for (PendingMessageCursor tsp : storePrefetches) {
331 tsp.gc();
332 }
333 cacheCurrentLowestPriority = UNKNOWN;
334 }
335
336 @Override
337 public void setSystemUsage(SystemUsage usageManager) {
338 super.setSystemUsage(usageManager);
339 for (PendingMessageCursor tsp : storePrefetches) {
340 tsp.setSystemUsage(usageManager);
341 }
342 }
343
344 @Override
345 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
346 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
347 for (PendingMessageCursor cursor : storePrefetches) {
348 cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
349 }
350 }
351
352 @Override
353 public void setMaxProducersToAudit(int maxProducersToAudit) {
354 super.setMaxProducersToAudit(maxProducersToAudit);
355 for (PendingMessageCursor cursor : storePrefetches) {
356 cursor.setMaxAuditDepth(maxAuditDepth);
357 }
358 }
359
360 @Override
361 public void setMaxAuditDepth(int maxAuditDepth) {
362 super.setMaxAuditDepth(maxAuditDepth);
363 for (PendingMessageCursor cursor : storePrefetches) {
364 cursor.setMaxAuditDepth(maxAuditDepth);
365 }
366 }
367
368 @Override
369 public void setEnableAudit(boolean enableAudit) {
370 super.setEnableAudit(enableAudit);
371 for (PendingMessageCursor cursor : storePrefetches) {
372 cursor.setEnableAudit(enableAudit);
373 }
374 }
375
376 @Override
377 public void setUseCache(boolean useCache) {
378 super.setUseCache(useCache);
379 for (PendingMessageCursor cursor : storePrefetches) {
380 cursor.setUseCache(useCache);
381 }
382 }
383
384 protected synchronized PendingMessageCursor getNextCursor() throws Exception {
385 if (currentCursor == null || currentCursor.isEmpty()) {
386 currentCursor = null;
387 for (PendingMessageCursor tsp : storePrefetches) {
388 if (tsp.hasNext()) {
389 currentCursor = tsp;
390 break;
391 }
392 }
393 // round-robin
394 if (storePrefetches.size()>1) {
395 PendingMessageCursor first = storePrefetches.remove(0);
396 storePrefetches.add(first);
397 }
398 }
399 return currentCursor;
400 }
401
402 @Override
403 public String toString() {
404 return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
405 }
406
407 public boolean isImmediatePriorityDispatch() {
408 return immediatePriorityDispatch;
409 }
410
411 public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
412 this.immediatePriorityDispatch = immediatePriorityDispatch;
413 }
414
415 }