001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.concurrent.ConcurrentHashMap;
022 import java.util.concurrent.atomic.AtomicBoolean;
023
024 import javax.jms.InvalidSelectorException;
025 import javax.jms.JMSException;
026
027 import org.apache.activemq.broker.Broker;
028 import org.apache.activemq.broker.ConnectionContext;
029 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
030 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageAck;
035 import org.apache.activemq.command.MessageDispatch;
036 import org.apache.activemq.command.MessageId;
037 import org.apache.activemq.filter.MessageEvaluationContext;
038 import org.apache.activemq.store.TopicMessageStore;
039 import org.apache.activemq.usage.SystemUsage;
040 import org.apache.activemq.usage.Usage;
041 import org.apache.activemq.usage.UsageListener;
042 import org.apache.activemq.util.SubscriptionKey;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
047
048 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
049 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
050 private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
051 private final SubscriptionKey subscriptionKey;
052 private final boolean keepDurableSubsActive;
053 private AtomicBoolean active = new AtomicBoolean();
054
055 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
056 throws JMSException {
057 super(broker,usageManager, context, info);
058 this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
059 this.pending.setSystemUsage(usageManager);
060 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
061 this.keepDurableSubsActive = keepDurableSubsActive;
062 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
063
064 }
065
066 public final boolean isActive() {
067 return active.get();
068 }
069
070 public boolean isFull() {
071 return !active.get() || super.isFull();
072 }
073
074 public void gc() {
075 }
076
077 /**
078 * store will have a pending ack for all durables, irrespective of the selector
079 * so we need to ack if node is un-matched
080 */
081 public void unmatched(MessageReference node) throws IOException {
082 MessageAck ack = new MessageAck();
083 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
084 ack.setMessageID(node.getMessageId());
085 node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
086 }
087
088 @Override
089 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
090 // statically configured via maxPageSize
091 }
092
093 public void add(ConnectionContext context, Destination destination) throws Exception {
094 super.add(context, destination);
095 // do it just once per destination
096 if (destinations.containsKey(destination.getActiveMQDestination())) {
097 return;
098 }
099 destinations.put(destination.getActiveMQDestination(), destination);
100
101 if (active.get() || keepDurableSubsActive) {
102 Topic topic = (Topic)destination;
103 topic.activate(context, this);
104 if (pending.isEmpty(topic)) {
105 topic.recoverRetroactiveMessages(context, this);
106 }
107 this.enqueueCounter+=pending.size();
108 } else if (destination.getMessageStore() != null) {
109 TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
110 try {
111 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
112 } catch (IOException e) {
113 JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
114 jmsEx.setLinkedException(e);
115 throw jmsEx;
116 }
117 }
118 dispatchPending();
119 }
120
121 public void activate(SystemUsage memoryManager, ConnectionContext context,
122 ConsumerInfo info) throws Exception {
123 if (!active.get()) {
124 this.context = context;
125 this.info = info;
126 LOG.debug("Activating " + this);
127 if (!keepDurableSubsActive) {
128 for (Iterator<Destination> iter = destinations.values()
129 .iterator(); iter.hasNext();) {
130 Topic topic = (Topic) iter.next();
131 add(context, topic);
132 topic.activate(context, this);
133 }
134 }
135 synchronized (pendingLock) {
136 pending.setSystemUsage(memoryManager);
137 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
138 pending.setMaxAuditDepth(getMaxAuditDepth());
139 pending.setMaxProducersToAudit(getMaxProducersToAudit());
140 pending.start();
141 // If nothing was in the persistent store, then try to use the
142 // recovery policy.
143 if (pending.isEmpty()) {
144 for (Iterator<Destination> iter = destinations.values()
145 .iterator(); iter.hasNext();) {
146 Topic topic = (Topic) iter.next();
147 topic.recoverRetroactiveMessages(context, this);
148 }
149 }
150 }
151 this.active.set(true);
152 dispatchPending();
153 this.usageManager.getMemoryUsage().addUsageListener(this);
154 }
155 }
156
157 public void deactivate(boolean keepDurableSubsActive) throws Exception {
158 LOG.debug("Deactivating " + this);
159 active.set(false);
160 this.usageManager.getMemoryUsage().removeUsageListener(this);
161 synchronized (pending) {
162 pending.stop();
163 }
164 if (!keepDurableSubsActive) {
165 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
166 Topic topic = (Topic)iter.next();
167 topic.deactivate(context, this);
168 }
169 }
170
171 for (final MessageReference node : dispatched) {
172 // Mark the dispatched messages as redelivered for next time.
173 Integer count = redeliveredMessages.get(node.getMessageId());
174 if (count != null) {
175 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
176 } else {
177 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
178 }
179 if (keepDurableSubsActive&& pending.isTransient()) {
180 synchronized (pending) {
181 pending.addMessageFirst(node);
182 }
183 } else {
184 node.decrementReferenceCount();
185 }
186 }
187 synchronized(dispatched) {
188 dispatched.clear();
189 }
190 if (!keepDurableSubsActive && pending.isTransient()) {
191 synchronized (pending) {
192 try {
193 pending.reset();
194 while (pending.hasNext()) {
195 MessageReference node = pending.next();
196 node.decrementReferenceCount();
197 pending.remove();
198 }
199 } finally {
200 pending.release();
201 }
202 }
203 }
204 prefetchExtension = 0;
205 }
206
207
208 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
209 MessageDispatch md = super.createMessageDispatch(node, message);
210 Integer count = redeliveredMessages.get(node.getMessageId());
211 if (count != null) {
212 md.setRedeliveryCounter(count.intValue());
213 }
214 return md;
215 }
216
217 public void add(MessageReference node) throws Exception {
218 if (!active.get() && !keepDurableSubsActive) {
219 return;
220 }
221 super.add(node);
222 }
223
224 protected void dispatchPending() throws IOException {
225 if (isActive()) {
226 super.dispatchPending();
227 }
228 }
229
230 protected void doAddRecoveredMessage(MessageReference message) throws Exception {
231 synchronized(pending) {
232 pending.addRecoveredMessage(message);
233 }
234 }
235
236 public int getPendingQueueSize() {
237 if (active.get() || keepDurableSubsActive) {
238 return super.getPendingQueueSize();
239 }
240 // TODO: need to get from store
241 return 0;
242 }
243
244 public void setSelector(String selector) throws InvalidSelectorException {
245 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
246 }
247
248 protected boolean canDispatch(MessageReference node) {
249 return isActive();
250 }
251
252 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
253 node.getRegionDestination().acknowledge(context, this, ack, node);
254 redeliveredMessages.remove(node.getMessageId());
255 node.decrementReferenceCount();
256 }
257
258
259 public synchronized String toString() {
260 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
261 + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
262 }
263
264 public SubscriptionKey getSubscriptionKey() {
265 return subscriptionKey;
266 }
267
268 /**
269 * Release any references that we are holding.
270 */
271 public void destroy() {
272 synchronized (pending) {
273 try {
274
275 pending.reset();
276 while (pending.hasNext()) {
277 MessageReference node = pending.next();
278 node.decrementReferenceCount();
279 }
280
281 } finally {
282 pending.release();
283 pending.clear();
284 }
285 }
286 synchronized(dispatched) {
287 for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
288 MessageReference node = (MessageReference) iter.next();
289 node.decrementReferenceCount();
290 }
291 dispatched.clear();
292 }
293 setSlowConsumer(false);
294 }
295
296 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
297 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
298 try {
299 dispatchPending();
300 } catch (IOException e) {
301 LOG.warn("problem calling dispatchMatched", e);
302 }
303 }
304 }
305
306 protected boolean isDropped(MessageReference node) {
307 return false;
308 }
309
310 public boolean isKeepDurableSubsActive() {
311 return keepDurableSubsActive;
312 }
313 }