001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.List;
021 import java.util.Set;
022 import org.apache.activemq.broker.Broker;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.ProducerBrokerExchange;
025 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
026 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.MessageDispatchNotification;
031 import org.apache.activemq.command.ProducerInfo;
032 import org.apache.activemq.store.MessageStore;
033 import org.apache.activemq.usage.MemoryUsage;
034 import org.apache.activemq.usage.Usage;
035
036 /**
037 *
038 *
039 */
040 public class DestinationFilter implements Destination {
041
042 private final Destination next;
043
044 public DestinationFilter(Destination next) {
045 this.next = next;
046 }
047
048 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
049 next.acknowledge(context, sub, ack, node);
050 }
051
052 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
053 next.addSubscription(context, sub);
054 }
055
056 public Message[] browse() {
057 return next.browse();
058 }
059
060 public void dispose(ConnectionContext context) throws IOException {
061 next.dispose(context);
062 }
063
064 public void gc() {
065 next.gc();
066 }
067
068 public ActiveMQDestination getActiveMQDestination() {
069 return next.getActiveMQDestination();
070 }
071
072 public DeadLetterStrategy getDeadLetterStrategy() {
073 return next.getDeadLetterStrategy();
074 }
075
076 public DestinationStatistics getDestinationStatistics() {
077 return next.getDestinationStatistics();
078 }
079
080 public String getName() {
081 return next.getName();
082 }
083
084 public MemoryUsage getMemoryUsage() {
085 return next.getMemoryUsage();
086 }
087
088 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
089 next.removeSubscription(context, sub, lastDeliveredSequenceId);
090 }
091
092 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
093 next.send(context, messageSend);
094 }
095
096 public void start() throws Exception {
097 next.start();
098 }
099
100 public void stop() throws Exception {
101 next.stop();
102 }
103
104 public List<Subscription> getConsumers() {
105 return next.getConsumers();
106 }
107
108 /**
109 * Sends a message to the given destination which may be a wildcard
110 *
111 * @param context broker context
112 * @param message message to send
113 * @param destination possibly wildcard destination to send the message to
114 * @throws Exception on error
115 */
116 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
117 Broker broker = context.getConnectionContext().getBroker();
118 Set<Destination> destinations = broker.getDestinations(destination);
119
120 for (Destination dest : destinations) {
121 dest.send(context, message.copy());
122 }
123 }
124
125 public MessageStore getMessageStore() {
126 return next.getMessageStore();
127 }
128
129 public boolean isProducerFlowControl() {
130 return next.isProducerFlowControl();
131 }
132
133 public void setProducerFlowControl(boolean value) {
134 next.setProducerFlowControl(value);
135 }
136
137 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
138 next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
139 }
140
141 public long getBlockedProducerWarningInterval() {
142 return next.getBlockedProducerWarningInterval();
143 }
144
145 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
146 next.addProducer(context, info);
147
148 }
149
150 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
151 next.removeProducer(context, info);
152 }
153
154 public int getMaxAuditDepth() {
155 return next.getMaxAuditDepth();
156 }
157
158 public int getMaxProducersToAudit() {
159 return next.getMaxProducersToAudit();
160 }
161
162 public boolean isEnableAudit() {
163 return next.isEnableAudit();
164 }
165
166 public void setEnableAudit(boolean enableAudit) {
167 next.setEnableAudit(enableAudit);
168 }
169
170 public void setMaxAuditDepth(int maxAuditDepth) {
171 next.setMaxAuditDepth(maxAuditDepth);
172 }
173
174 public void setMaxProducersToAudit(int maxProducersToAudit) {
175 next.setMaxProducersToAudit(maxProducersToAudit);
176 }
177
178 public boolean isActive() {
179 return next.isActive();
180 }
181
182 public int getMaxPageSize() {
183 return next.getMaxPageSize();
184 }
185
186 public void setMaxPageSize(int maxPageSize) {
187 next.setMaxPageSize(maxPageSize);
188 }
189
190 public boolean isUseCache() {
191 return next.isUseCache();
192 }
193
194 public void setUseCache(boolean useCache) {
195 next.setUseCache(useCache);
196 }
197
198 public int getMinimumMessageSize() {
199 return next.getMinimumMessageSize();
200 }
201
202 public void setMinimumMessageSize(int minimumMessageSize) {
203 next.setMinimumMessageSize(minimumMessageSize);
204 }
205
206 public void wakeup() {
207 next.wakeup();
208 }
209
210 public boolean isLazyDispatch() {
211 return next.isLazyDispatch();
212 }
213
214 public void setLazyDispatch(boolean value) {
215 next.setLazyDispatch(value);
216 }
217
218 public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
219 next.messageExpired(context, prefetchSubscription, node);
220 }
221
222 public boolean iterate() {
223 return next.iterate();
224 }
225
226 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
227 next.fastProducer(context, producerInfo);
228 }
229
230 public void isFull(ConnectionContext context, Usage usage) {
231 next.isFull(context, usage);
232 }
233
234 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
235 next.messageConsumed(context, messageReference);
236 }
237
238 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
239 next.messageDelivered(context, messageReference);
240 }
241
242 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
243 next.messageDiscarded(context, sub, messageReference);
244 }
245
246 public void slowConsumer(ConnectionContext context, Subscription subs) {
247 next.slowConsumer(context, subs);
248 }
249
250 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
251 next.messageExpired(context, subs, node);
252 }
253
254 public int getMaxBrowsePageSize() {
255 return next.getMaxBrowsePageSize();
256 }
257
258 public void setMaxBrowsePageSize(int maxPageSize) {
259 next.setMaxBrowsePageSize(maxPageSize);
260 }
261
262 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
263 next.processDispatchNotification(messageDispatchNotification);
264 }
265
266 public int getCursorMemoryHighWaterMark() {
267 return next.getCursorMemoryHighWaterMark();
268 }
269
270 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
271 next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
272 }
273
274 public boolean isPrioritizedMessages() {
275 return next.isPrioritizedMessages();
276 }
277
278 public SlowConsumerStrategy getSlowConsumerStrategy() {
279 return next.getSlowConsumerStrategy();
280 }
281 }