001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.virtual;
018
019 import org.apache.activemq.broker.Broker;
020 import org.apache.activemq.broker.ConnectionContext;
021 import org.apache.activemq.broker.region.Destination;
022 import org.apache.activemq.command.ActiveMQDestination;
023 import org.apache.activemq.command.ActiveMQQueue;
024 import org.apache.activemq.command.ActiveMQTopic;
025 import org.apache.activemq.filter.DestinationFilter;
026
027 /**
028 * Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
029 * Topics</a> using a prefix and postfix. The virtual destination creates a
030 * wildcard that is then used to look up all active queue subscriptions which
031 * match.
032 *
033 * @org.apache.xbean.XBean
034 *
035 *
036 */
037 public class VirtualTopic implements VirtualDestination {
038
039 private String prefix = "Consumer.*.";
040 private String postfix = "";
041 private String name = ">";
042 private boolean selectorAware = false;
043
044
045 public ActiveMQDestination getVirtualDestination() {
046 return new ActiveMQTopic(getName());
047 }
048
049 public Destination intercept(Destination destination) {
050 return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix()) :
051 new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
052 }
053
054
055 public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
056 if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) {
057 DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
058 if (filter.matches(destination)) {
059 broker.addDestination(context, destination, false);
060 }
061 }
062 }
063
064 public void remove(Destination destination) {
065 }
066
067 // Properties
068 // -------------------------------------------------------------------------
069
070 public String getPostfix() {
071 return postfix;
072 }
073
074 /**
075 * Sets any postix used to identify the queue consumers
076 */
077 public void setPostfix(String postfix) {
078 this.postfix = postfix;
079 }
080
081 public String getPrefix() {
082 return prefix;
083 }
084
085 /**
086 * Sets the prefix wildcard used to identify the queue consumers for a given
087 * topic
088 */
089 public void setPrefix(String prefix) {
090 this.prefix = prefix;
091 }
092
093 public String getName() {
094 return name;
095 }
096
097 public void setName(String name) {
098 this.name = name;
099 }
100
101 /**
102 * Indicates whether the selectors of consumers are used to determine dispatch
103 * to a virtual destination, when true only messages matching an existing
104 * consumer will be dispatched.
105 * @param selectorAware when true take consumer selectors into consideration
106 */
107 public void setSelectorAware(boolean selectorAware) {
108 this.selectorAware = selectorAware;
109 }
110
111 public boolean isSelectorAware() {
112 return selectorAware;
113 }
114 }