001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.IOException;
020 import java.util.Arrays;
021
022 import javax.jms.JMSException;
023
024 import org.apache.activemq.filter.BooleanExpression;
025 import org.apache.activemq.filter.MessageEvaluationContext;
026 import org.apache.activemq.util.JMSExceptionSupport;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * @openwire:marshaller code="91"
032 *
033 */
034 public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
035
036 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
037 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
038
039 private BrokerId networkBrokerId;
040 private int networkTTL;
041
042 public NetworkBridgeFilter() {
043 }
044
045 public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
046 this.networkBrokerId = remoteBrokerPath;
047 this.networkTTL = networkTTL;
048 }
049
050 public byte getDataStructureType() {
051 return DATA_STRUCTURE_TYPE;
052 }
053
054 public boolean isMarshallAware() {
055 return false;
056 }
057
058 public boolean matches(MessageEvaluationContext mec) throws JMSException {
059 try {
060 // for Queues - the message can be acknowledged and dropped whilst
061 // still
062 // in the dispatch loop
063 // so need to get the reference to it
064 Message message = mec.getMessage();
065 return message != null && matchesForwardingFilter(message);
066 } catch (IOException e) {
067 throw JMSExceptionSupport.create(e);
068 }
069 }
070
071 public Object evaluate(MessageEvaluationContext message) throws JMSException {
072 return matches(message) ? Boolean.TRUE : Boolean.FALSE;
073 }
074
075 protected boolean matchesForwardingFilter(Message message) {
076
077 if (contains(message.getBrokerPath(), networkBrokerId)) {
078 if (LOG.isTraceEnabled()) {
079 LOG.trace("Message all ready routed once through this broker ("
080 + networkBrokerId + "), path: "
081 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
082 }
083 return false;
084 }
085
086 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
087
088 if (hops >= networkTTL) {
089 if (LOG.isTraceEnabled()) {
090 LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
091 }
092 return false;
093 }
094
095 // Don't propagate advisory messages about network subscriptions
096 if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
097 ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
098 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
099 if (hops >= networkTTL) {
100 if (LOG.isTraceEnabled()) {
101 LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
102 }
103 return false;
104 }
105 }
106 return true;
107 }
108
109 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
110 if (brokerPath != null && brokerId != null) {
111 for (int i = 0; i < brokerPath.length; i++) {
112 if (brokerId.equals(brokerPath[i])) {
113 return true;
114 }
115 }
116 }
117 return false;
118 }
119
120 /**
121 * @openwire:property version=1
122 */
123 public int getNetworkTTL() {
124 return networkTTL;
125 }
126
127 public void setNetworkTTL(int networkTTL) {
128 this.networkTTL = networkTTL;
129 }
130
131 /**
132 * @openwire:property version=1 cache=true
133 */
134 public BrokerId getNetworkBrokerId() {
135 return networkBrokerId;
136 }
137
138 public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
139 this.networkBrokerId = remoteBrokerPath;
140 }
141
142 }