001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.util.HashSet;
020 import java.util.Iterator;
021 import java.util.List;
022 import java.util.Set;
023 import java.util.concurrent.ConcurrentHashMap;
024 import javax.jms.InvalidDestinationException;
025 import javax.jms.JMSException;
026 import org.apache.activemq.advisory.AdvisorySupport;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.broker.region.policy.PolicyEntry;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ConnectionId;
031 import org.apache.activemq.command.ConsumerId;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.RemoveSubscriptionInfo;
034 import org.apache.activemq.command.SessionId;
035 import org.apache.activemq.command.SubscriptionInfo;
036 import org.apache.activemq.store.TopicMessageStore;
037 import org.apache.activemq.thread.TaskRunnerFactory;
038 import org.apache.activemq.usage.SystemUsage;
039 import org.apache.activemq.util.LongSequenceGenerator;
040 import org.apache.activemq.util.SubscriptionKey;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 /**
045 *
046 */
047 public class TopicRegion extends AbstractRegion {
048 private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
049 protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
050 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
051 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
052 private boolean keepDurableSubsActive;
053
054 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
055 DestinationFactory destinationFactory) {
056 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
057
058 }
059
060 @Override
061 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
062 if (info.isDurable()) {
063 ActiveMQDestination destination = info.getDestination();
064 if (!destination.isPattern()) {
065 // Make sure the destination is created.
066 lookup(context, destination,true);
067 }
068 String clientId = context.getClientId();
069 String subscriptionName = info.getSubscriptionName();
070 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
071 DurableTopicSubscription sub = durableSubscriptions.get(key);
072 if (sub != null) {
073 if (sub.isActive()) {
074 throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
075 }
076 // Has the selector changed??
077 if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
078 // Remove the consumer first then add it.
079 durableSubscriptions.remove(key);
080 synchronized (destinationsMutex) {
081 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
082 Destination dest = iter.next();
083 //Account for virtual destinations
084 if (dest instanceof Topic){
085 Topic topic = (Topic)dest;
086 topic.deleteSubscription(context, key);
087 }
088 }
089 }
090 super.removeConsumer(context, sub.getConsumerInfo());
091 super.addConsumer(context, info);
092 sub = durableSubscriptions.get(key);
093 } else {
094 // Change the consumer id key of the durable sub.
095 if (sub.getConsumerInfo().getConsumerId() != null) {
096 subscriptions.remove(sub.getConsumerInfo().getConsumerId());
097 }
098 subscriptions.put(info.getConsumerId(), sub);
099 }
100 } else {
101 super.addConsumer(context, info);
102 sub = durableSubscriptions.get(key);
103 if (sub == null) {
104 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
105 + " subscriberName: " + key.getSubscriptionName());
106 }
107 }
108 sub.activate(usageManager, context, info);
109 return sub;
110 } else {
111 return super.addConsumer(context, info);
112 }
113 }
114
115 @Override
116 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117 if (info.isDurable()) {
118
119 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
120 DurableTopicSubscription sub = durableSubscriptions.get(key);
121 if (sub != null) {
122 sub.deactivate(keepDurableSubsActive);
123 }
124
125 } else {
126 super.removeConsumer(context, info);
127 }
128 }
129
130 @Override
131 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
132 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
133 DurableTopicSubscription sub = durableSubscriptions.remove(key);
134 if (sub == null) {
135 throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
136 }
137 if (sub.isActive()) {
138 throw new JMSException("Durable consumer is in use");
139 }
140
141 synchronized (destinationsMutex) {
142 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
143 Destination dest = iter.next();
144 //Account for virtual destinations
145 if (dest instanceof Topic){
146 Topic topic = (Topic)dest;
147 topic.deleteSubscription(context, key);
148 }
149 }
150 }
151 if (subscriptions.get(sub.getConsumerInfo()) != null) {
152 super.removeConsumer(context, sub.getConsumerInfo());
153 } else {
154 // try destroying inactive subscriptions
155 destroySubscription(sub);
156 }
157 }
158
159 @Override
160 public String toString() {
161 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
162 }
163
164 @Override
165 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
166 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
167 Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
168
169 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
170 // Eagerly recover the durable subscriptions
171 if (store != null) {
172 SubscriptionInfo[] infos = store.getAllSubscriptions();
173 for (int i = 0; i < infos.length; i++) {
174
175 SubscriptionInfo info = infos[i];
176 LOG.debug("Restoring durable subscription: " + info);
177 SubscriptionKey key = new SubscriptionKey(info);
178
179 // A single durable sub may be subscribing to multiple topics.
180 // so it might exist already.
181 DurableTopicSubscription sub = durableSubscriptions.get(key);
182 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
183 if (sub == null) {
184 ConnectionContext c = new ConnectionContext();
185 c.setBroker(context.getBroker());
186 c.setClientId(key.getClientId());
187 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
188 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
189 }
190
191 if (dupChecker.contains(sub)) {
192 continue;
193 }
194
195 dupChecker.add(sub);
196 rc.add(sub);
197 dest.addSubscription(context, sub);
198 }
199
200 // Now perhaps there other durable subscriptions (via wild card)
201 // that would match this destination..
202 durableSubscriptions.values();
203 for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
204 DurableTopicSubscription sub = iterator.next();
205 // Skip over subscriptions that we allready added..
206 if (dupChecker.contains(sub)) {
207 continue;
208 }
209
210 if (sub.matches(dest.getActiveMQDestination())) {
211 rc.add(sub);
212 dest.addSubscription(context, sub);
213 }
214 }
215 }
216 return rc;
217 }
218
219 private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
220 ConsumerInfo rc = new ConsumerInfo();
221 rc.setSelector(info.getSelector());
222 rc.setSubscriptionName(info.getSubscriptionName());
223 rc.setDestination(info.getSubscribedDestination());
224 rc.setConsumerId(createConsumerId());
225 return rc;
226 }
227
228 private ConsumerId createConsumerId() {
229 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
230 }
231
232 protected void configureTopic(Topic topic, ActiveMQDestination destination) {
233 if (broker.getDestinationPolicy() != null) {
234 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
235 if (entry != null) {
236 entry.configure(broker,topic);
237 }
238 }
239 }
240
241 @Override
242 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
243 ActiveMQDestination destination = info.getDestination();
244
245 if (info.isDurable()) {
246 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
247 throw new JMSException("Cannot create a durable subscription for an advisory Topic");
248 }
249 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
250 DurableTopicSubscription sub = durableSubscriptions.get(key);
251
252 if (sub == null) {
253
254 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
255
256 if (destination != null && broker.getDestinationPolicy() != null) {
257 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
258 if (entry != null) {
259 entry.configure(broker, usageManager, sub);
260 }
261 }
262 durableSubscriptions.put(key, sub);
263 } else {
264 throw new JMSException("That durable subscription is already active.");
265 }
266 return sub;
267 }
268 try {
269 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
270 // lets configure the subscription depending on the destination
271 if (destination != null && broker.getDestinationPolicy() != null) {
272 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
273 if (entry != null) {
274 entry.configure(broker, usageManager, answer);
275 }
276 }
277 answer.init();
278 return answer;
279 } catch (Exception e) {
280 LOG.error("Failed to create TopicSubscription ", e);
281 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
282 jmsEx.setLinkedException(e);
283 throw jmsEx;
284 }
285 }
286
287 /**
288 */
289 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
290 if (info1.getSelector() != null ^ info2.getSelector() != null) {
291 return true;
292 }
293 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
294 return true;
295 }
296 return !info1.getDestination().equals(info2.getDestination());
297 }
298
299 @Override
300 protected Set<ActiveMQDestination> getInactiveDestinations() {
301 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
302 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
303 ActiveMQDestination dest = iter.next();
304 if (!dest.isTopic()) {
305 iter.remove();
306 }
307 }
308 return inactiveDestinations;
309 }
310
311 public boolean isKeepDurableSubsActive() {
312 return keepDurableSubsActive;
313 }
314
315 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
316 this.keepDurableSubsActive = keepDurableSubsActive;
317 }
318
319 public boolean durableSubscriptionExists(SubscriptionKey key) {
320 return this.durableSubscriptions.containsKey(key);
321 }
322
323 }