001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import org.apache.activemq.broker.region.Destination;
020 import org.apache.activemq.broker.region.Region;
021 import org.apache.activemq.command.Message;
022 import org.apache.activemq.state.ProducerState;
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025
026 /**
027 * Holds internal state in the broker for a MessageProducer
028 *
029 *
030 */
031 public class ProducerBrokerExchange {
032
033 private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
034 private ConnectionContext connectionContext;
035 private Destination regionDestination;
036 private Region region;
037 private ProducerState producerState;
038 private boolean mutable = true;
039 private long lastSendSequenceNumber = -1;
040
041 public ProducerBrokerExchange() {
042 }
043
044 public ProducerBrokerExchange copy() {
045 ProducerBrokerExchange rc = new ProducerBrokerExchange();
046 rc.connectionContext = connectionContext.copy();
047 rc.regionDestination = regionDestination;
048 rc.region = region;
049 rc.producerState = producerState;
050 rc.mutable = mutable;
051 return rc;
052 }
053
054
055 /**
056 * @return the connectionContext
057 */
058 public ConnectionContext getConnectionContext() {
059 return this.connectionContext;
060 }
061
062 /**
063 * @param connectionContext the connectionContext to set
064 */
065 public void setConnectionContext(ConnectionContext connectionContext) {
066 this.connectionContext = connectionContext;
067 }
068
069 /**
070 * @return the mutable
071 */
072 public boolean isMutable() {
073 return this.mutable;
074 }
075
076 /**
077 * @param mutable the mutable to set
078 */
079 public void setMutable(boolean mutable) {
080 this.mutable = mutable;
081 }
082
083 /**
084 * @return the regionDestination
085 */
086 public Destination getRegionDestination() {
087 return this.regionDestination;
088 }
089
090 /**
091 * @param regionDestination the regionDestination to set
092 */
093 public void setRegionDestination(Destination regionDestination) {
094 this.regionDestination = regionDestination;
095 }
096
097 /**
098 * @return the region
099 */
100 public Region getRegion() {
101 return this.region;
102 }
103
104 /**
105 * @param region the region to set
106 */
107 public void setRegion(Region region) {
108 this.region = region;
109 }
110
111 /**
112 * @return the producerState
113 */
114 public ProducerState getProducerState() {
115 return this.producerState;
116 }
117
118 /**
119 * @param producerState the producerState to set
120 */
121 public void setProducerState(ProducerState producerState) {
122 this.producerState = producerState;
123 }
124
125 /**
126 * Enforce duplicate suppression using info from persistence adapter
127 * @param messageSend
128 * @return false if message should be ignored as a duplicate
129 */
130 public boolean canDispatch(Message messageSend) {
131 boolean canDispatch = true;
132 if (lastSendSequenceNumber > 0) {
133 if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) {
134 canDispatch = false;
135 LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId ["
136 + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber);
137 }
138 }
139 return canDispatch;
140 }
141
142 public void setLastStoredSequenceId(long l) {
143 lastSendSequenceNumber = l;
144 LOG.debug("last stored sequence id set: " + l);
145 }
146 }