001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.util.ArrayList;
020 import java.util.HashMap;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Set;
025 import java.util.concurrent.ConcurrentHashMap;
026 import javax.jms.JMSException;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.broker.ConsumerBrokerExchange;
029 import org.apache.activemq.broker.DestinationAlreadyExistsException;
030 import org.apache.activemq.broker.ProducerBrokerExchange;
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.ConsumerControl;
033 import org.apache.activemq.command.ConsumerId;
034 import org.apache.activemq.command.ConsumerInfo;
035 import org.apache.activemq.command.Message;
036 import org.apache.activemq.command.MessageAck;
037 import org.apache.activemq.command.MessageDispatchNotification;
038 import org.apache.activemq.command.MessagePull;
039 import org.apache.activemq.command.ProducerInfo;
040 import org.apache.activemq.command.RemoveSubscriptionInfo;
041 import org.apache.activemq.command.Response;
042 import org.apache.activemq.filter.DestinationFilter;
043 import org.apache.activemq.filter.DestinationMap;
044 import org.apache.activemq.security.SecurityContext;
045 import org.apache.activemq.thread.TaskRunnerFactory;
046 import org.apache.activemq.usage.SystemUsage;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049
050 /**
051 *
052 */
053 public abstract class AbstractRegion implements Region {
054
055 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
056
057 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
058 protected final DestinationMap destinationMap = new DestinationMap();
059 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
060 protected final SystemUsage usageManager;
061 protected final DestinationFactory destinationFactory;
062 protected final DestinationStatistics destinationStatistics;
063 protected final RegionBroker broker;
064 protected boolean autoCreateDestinations = true;
065 protected final TaskRunnerFactory taskRunnerFactory;
066 protected final Object destinationsMutex = new Object();
067 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
068 protected boolean started;
069
070 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
071 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
072 if (broker == null) {
073 throw new IllegalArgumentException("null broker");
074 }
075 this.broker = broker;
076 this.destinationStatistics = destinationStatistics;
077 this.usageManager = memoryManager;
078 this.taskRunnerFactory = taskRunnerFactory;
079 if (broker == null) {
080 throw new IllegalArgumentException("null destinationFactory");
081 }
082 this.destinationFactory = destinationFactory;
083 }
084
085 public final void start() throws Exception {
086 started = true;
087
088 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
089 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
090 ActiveMQDestination dest = iter.next();
091
092 ConnectionContext context = new ConnectionContext();
093 context.setBroker(broker.getBrokerService().getBroker());
094 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
095 context.getBroker().addDestination(context, dest, false);
096 }
097 synchronized (destinationsMutex) {
098 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
099 Destination dest = i.next();
100 dest.start();
101 }
102 }
103 }
104
105 public void stop() throws Exception {
106 started = false;
107 synchronized (destinationsMutex) {
108 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
109 Destination dest = i.next();
110 dest.stop();
111 }
112 }
113 destinations.clear();
114 }
115
116 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
117 boolean createIfTemporary) throws Exception {
118 LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
119 synchronized (destinationsMutex) {
120 Destination dest = destinations.get(destination);
121 if (dest == null) {
122 if (destination.isTemporary() == false || createIfTemporary) {
123 dest = createDestination(context, destination);
124 // intercept if there is a valid interceptor defined
125 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
126 if (destinationInterceptor != null) {
127 dest = destinationInterceptor.intercept(dest);
128 }
129 dest.start();
130 destinations.put(destination, dest);
131 destinationMap.put(destination, dest);
132 addSubscriptionsForDestination(context, dest);
133 }
134 if (dest == null) {
135 throw new JMSException("The destination " + destination + " does not exist.");
136 }
137 }
138 return dest;
139 }
140 }
141
142 public Map<ConsumerId, Subscription> getSubscriptions() {
143 return subscriptions;
144 }
145
146 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
147 throws Exception {
148
149 List<Subscription> rc = new ArrayList<Subscription>();
150 // Add all consumers that are interested in the destination.
151 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
152 Subscription sub = iter.next();
153 if (sub.matches(dest.getActiveMQDestination())) {
154 dest.addSubscription(context, sub);
155 rc.add(sub);
156 }
157 }
158 return rc;
159
160 }
161
162 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
163 throws Exception {
164
165 // No timeout.. then try to shut down right way, fails if there are
166 // current subscribers.
167 if (timeout == 0) {
168 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
169 Subscription sub = iter.next();
170 if (sub.matches(destination)) {
171 throw new JMSException("Destination still has an active subscription: " + destination);
172 }
173 }
174 }
175
176 if (timeout > 0) {
177 // TODO: implement a way to notify the subscribers that we want to
178 // take the down
179 // the destination and that they should un-subscribe.. Then wait up
180 // to timeout time before
181 // dropping the subscription.
182 }
183
184 LOG.debug("Removing destination: " + destination);
185
186 synchronized (destinationsMutex) {
187 Destination dest = destinations.remove(destination);
188 if (dest != null) {
189 // timeout<0 or we timed out, we now force any remaining
190 // subscriptions to un-subscribe.
191 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
192 Subscription sub = iter.next();
193 if (sub.matches(destination)) {
194 dest.removeSubscription(context, sub, 0l);
195 }
196 }
197 destinationMap.removeAll(destination);
198 dispose(context, dest);
199 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
200 if (destinationInterceptor != null) {
201 destinationInterceptor.remove(dest);
202 }
203
204 } else {
205 LOG.debug("Destination doesn't exist: " + dest);
206 }
207 }
208 }
209
210 /**
211 * Provide an exact or wildcard lookup of destinations in the region
212 *
213 * @return a set of matching destination objects.
214 */
215 public Set<Destination> getDestinations(ActiveMQDestination destination) {
216 synchronized (destinationsMutex) {
217 return destinationMap.get(destination);
218 }
219 }
220
221 public Map<ActiveMQDestination, Destination> getDestinationMap() {
222 synchronized (destinationsMutex) {
223 return new HashMap<ActiveMQDestination, Destination>(destinations);
224 }
225 }
226
227 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
228 LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
229 + info.getDestination());
230 ActiveMQDestination destination = info.getDestination();
231 if (destination != null && !destination.isPattern() && !destination.isComposite()) {
232 // lets auto-create the destination
233 lookup(context, destination,true);
234 }
235
236 Object addGuard;
237 synchronized (consumerChangeMutexMap) {
238 addGuard = consumerChangeMutexMap.get(info.getConsumerId());
239 if (addGuard == null) {
240 addGuard = new Object();
241 consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
242 }
243 }
244 synchronized (addGuard) {
245 Subscription o = subscriptions.get(info.getConsumerId());
246 if (o != null) {
247 LOG
248 .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
249 return o;
250 }
251
252 // We may need to add some destinations that are in persistent store
253 // but not active
254 // in the broker.
255 //
256 // TODO: think about this a little more. This is good cause
257 // destinations are not loaded into
258 // memory until a client needs to use the queue, but a management
259 // agent viewing the
260 // broker will not see a destination that exists in persistent
261 // store. We may want to
262 // eagerly load all destinations into the broker but have an
263 // inactive state for the
264 // destination which has reduced memory usage.
265 //
266 DestinationFilter.parseFilter(info.getDestination());
267
268 Subscription sub = createSubscription(context, info);
269
270 subscriptions.put(info.getConsumerId(), sub);
271
272 // At this point we're done directly manipulating subscriptions,
273 // but we need to retain the synchronized block here. Consider
274 // otherwise what would happen if at this point a second
275 // thread added, then removed, as would be allowed with
276 // no mutex held. Remove is only essentially run once
277 // so everything after this point would be leaked.
278
279 // Add the subscription to all the matching queues.
280 // But copy the matches first - to prevent deadlocks
281 List<Destination> addList = new ArrayList<Destination>();
282 synchronized (destinationsMutex) {
283 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
284 Destination dest = (Destination) iter.next();
285 addList.add(dest);
286 }
287 }
288
289 for (Destination dest : addList) {
290 dest.addSubscription(context, sub);
291 }
292
293 if (info.isBrowser()) {
294 ((QueueBrowserSubscription) sub).destinationsAdded();
295 }
296
297 return sub;
298 }
299 }
300
301 /**
302 * Get all the Destinations that are in storage
303 *
304 * @return Set of all stored destinations
305 */
306 public Set getDurableDestinations() {
307 return destinationFactory.getDestinations();
308 }
309
310 /**
311 * @return all Destinations that don't have active consumers
312 */
313 protected Set<ActiveMQDestination> getInactiveDestinations() {
314 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
315 synchronized (destinationsMutex) {
316 inactiveDests.removeAll(destinations.keySet());
317 }
318 return inactiveDests;
319 }
320
321 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
322 LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
323 + info.getDestination());
324
325 Subscription sub = subscriptions.remove(info.getConsumerId());
326 // The sub could be removed elsewhere - see ConnectionSplitBroker
327 if (sub != null) {
328
329 // remove the subscription from all the matching queues.
330 List<Destination> removeList = new ArrayList<Destination>();
331 synchronized (destinationsMutex) {
332 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
333 Destination dest = (Destination) iter.next();
334 removeList.add(dest);
335
336 }
337 }
338 for (Destination dest : removeList) {
339 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
340 }
341
342 destroySubscription(sub);
343 }
344 synchronized (consumerChangeMutexMap) {
345 consumerChangeMutexMap.remove(info.getConsumerId());
346 }
347 }
348
349 protected void destroySubscription(Subscription sub) {
350 sub.destroy();
351 }
352
353 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
354 throw new JMSException("Invalid operation.");
355 }
356
357 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
358 final ConnectionContext context = producerExchange.getConnectionContext();
359
360 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
361 final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
362 producerExchange.setRegionDestination(regionDestination);
363 }
364
365 producerExchange.getRegionDestination().send(producerExchange, messageSend);
366 }
367
368 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
369 Subscription sub = consumerExchange.getSubscription();
370 if (sub == null) {
371 sub = subscriptions.get(ack.getConsumerId());
372 if (sub == null) {
373 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
374 LOG.warn("Ack for non existent subscription, ack:" + ack);
375 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
376 } else {
377 LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
378 return;
379 }
380 }
381 consumerExchange.setSubscription(sub);
382 }
383 sub.acknowledge(consumerExchange.getConnectionContext(), ack);
384 }
385
386 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
387 Subscription sub = subscriptions.get(pull.getConsumerId());
388 if (sub == null) {
389 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
390 }
391 return sub.pullMessage(context, pull);
392 }
393
394 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
395 Destination dest = null;
396 synchronized (destinationsMutex) {
397 dest = destinations.get(destination);
398 }
399 if (dest == null) {
400 if (isAutoCreateDestinations()) {
401 // Try to auto create the destination... re-invoke broker
402 // from the
403 // top so that the proper security checks are performed.
404 try {
405 context.getBroker().addDestination(context, destination, createTemporary);
406 dest = addDestination(context, destination, false);
407 } catch (DestinationAlreadyExistsException e) {
408 // if the destination already exists then lets ignore
409 // this error
410 }
411 // We should now have the dest created.
412 synchronized (destinationsMutex) {
413 dest = destinations.get(destination);
414 }
415 }
416 if (dest == null) {
417 throw new JMSException("The destination " + destination + " does not exist.");
418 }
419 }
420 return dest;
421 }
422
423 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
424 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
425 if (sub != null) {
426 sub.processMessageDispatchNotification(messageDispatchNotification);
427 } else {
428 throw new JMSException("Slave broker out of sync with master - Subscription: "
429 + messageDispatchNotification.getConsumerId() + " on "
430 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
431 + messageDispatchNotification.getMessageId());
432 }
433 }
434
435 /*
436 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
437 * dispatch is deferred till the notification to ensure that the
438 * subscription chosen by the master is used. AMQ-2102
439 */
440 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
441 throws Exception {
442 Destination dest = null;
443 synchronized (destinationsMutex) {
444 dest = destinations.get(messageDispatchNotification.getDestination());
445 }
446 if (dest != null) {
447 dest.processDispatchNotification(messageDispatchNotification);
448 } else {
449 throw new JMSException("Slave broker out of sync with master - Destination: "
450 + messageDispatchNotification.getDestination() + " does not exist for consumer "
451 + messageDispatchNotification.getConsumerId() + " with message: "
452 + messageDispatchNotification.getMessageId());
453 }
454 }
455
456 public void gc() {
457 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
458 Subscription sub = iter.next();
459 sub.gc();
460 }
461 synchronized (destinationsMutex) {
462 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
463 Destination dest = iter.next();
464 dest.gc();
465 }
466 }
467 }
468
469 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
470
471 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
472 throws Exception {
473 return destinationFactory.createDestination(context, destination, destinationStatistics);
474 }
475
476 public boolean isAutoCreateDestinations() {
477 return autoCreateDestinations;
478 }
479
480 public void setAutoCreateDestinations(boolean autoCreateDestinations) {
481 this.autoCreateDestinations = autoCreateDestinations;
482 }
483
484 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
485 synchronized (destinationsMutex) {
486 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
487 Destination dest = (Destination) iter.next();
488 dest.addProducer(context, info);
489 }
490 }
491 }
492
493 /**
494 * Removes a Producer.
495 *
496 * @param context
497 * the environment the operation is being executed under.
498 * @throws Exception
499 * TODO
500 */
501 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
502 synchronized (destinationsMutex) {
503 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
504 Destination dest = (Destination) iter.next();
505 dest.removeProducer(context, info);
506 }
507 }
508 }
509
510 protected void dispose(ConnectionContext context, Destination dest) throws Exception {
511 dest.dispose(context);
512 dest.stop();
513 destinationFactory.removeDestination(dest);
514 }
515
516 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
517 Subscription sub = subscriptions.get(control.getConsumerId());
518 if (sub != null && sub instanceof AbstractSubscription) {
519 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
520 if (LOG.isDebugEnabled()) {
521 LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
522 + control.getConsumerId());
523 }
524 try {
525 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
526 } catch (Exception e) {
527 LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
528 }
529 }
530 }
531 }