001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import org.apache.activemq.state.CommandVisitor;
020
021 /**
022 * @openwire:marshaller code="22"
023 *
024 */
025 public class MessageAck extends BaseCommand {
026
027 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
028
029 /**
030 * Used to let the broker know that the message has been delivered to the
031 * client. Message will still be retained until an standard ack is received.
032 * This is used get the broker to send more messages past prefetch limits
033 * when an standard ack has not been sent.
034 */
035 public static final byte DELIVERED_ACK_TYPE = 0;
036
037 /**
038 * The standard ack case where a client wants the message to be discarded.
039 */
040 public static final byte STANDARD_ACK_TYPE = 2;
041
042 /**
043 * In case the client want's to explicitly let the broker know that a
044 * message was not processed and the message was considered a poison
045 * message.
046 */
047 public static final byte POSION_ACK_TYPE = 1;
048
049 /**
050 * In case the client want's to explicitly let the broker know that a
051 * message was not processed and it was re-delivered to the consumer
052 * but it was not yet considered to be a poison message. The messageCount
053 * field will hold the number of times the message was re-delivered.
054 */
055 public static final byte REDELIVERED_ACK_TYPE = 3;
056
057 /**
058 * The ack case where a client wants only an individual message to be discarded.
059 */
060 public static final byte INDIVIDUAL_ACK_TYPE = 4;
061
062 /**
063 * The ack case where a durable topic subscription does not match a selector.
064 */
065 public static final byte UNMATCHED_ACK_TYPE = 5;
066
067 protected byte ackType;
068 protected ConsumerId consumerId;
069 protected MessageId firstMessageId;
070 protected MessageId lastMessageId;
071 protected ActiveMQDestination destination;
072 protected TransactionId transactionId;
073 protected int messageCount;
074 protected Throwable poisonCause;
075
076 protected transient String consumerKey;
077
078 public MessageAck() {
079 }
080
081 public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
082 this.ackType = ackType;
083 this.consumerId = md.getConsumerId();
084 this.destination = md.getDestination();
085 this.lastMessageId = md.getMessage().getMessageId();
086 this.messageCount = messageCount;
087 }
088
089 public void copy(MessageAck copy) {
090 super.copy(copy);
091 copy.firstMessageId = firstMessageId;
092 copy.lastMessageId = lastMessageId;
093 copy.destination = destination;
094 copy.transactionId = transactionId;
095 copy.ackType = ackType;
096 copy.consumerId = consumerId;
097 }
098
099 public byte getDataStructureType() {
100 return DATA_STRUCTURE_TYPE;
101 }
102
103 public boolean isMessageAck() {
104 return true;
105 }
106
107 public boolean isPoisonAck() {
108 return ackType == POSION_ACK_TYPE;
109 }
110
111 public boolean isStandardAck() {
112 return ackType == STANDARD_ACK_TYPE;
113 }
114
115 public boolean isDeliveredAck() {
116 return ackType == DELIVERED_ACK_TYPE;
117 }
118
119 public boolean isRedeliveredAck() {
120 return ackType == REDELIVERED_ACK_TYPE;
121 }
122
123 public boolean isIndividualAck() {
124 return ackType == INDIVIDUAL_ACK_TYPE;
125 }
126
127 public boolean isUnmatchedAck() {
128 return ackType == UNMATCHED_ACK_TYPE;
129 }
130
131 /**
132 * @openwire:property version=1 cache=true
133 */
134 public ActiveMQDestination getDestination() {
135 return destination;
136 }
137
138 public void setDestination(ActiveMQDestination destination) {
139 this.destination = destination;
140 }
141
142 /**
143 * @openwire:property version=1 cache=true
144 */
145 public TransactionId getTransactionId() {
146 return transactionId;
147 }
148
149 public void setTransactionId(TransactionId transactionId) {
150 this.transactionId = transactionId;
151 }
152
153 public boolean isInTransaction() {
154 return transactionId != null;
155 }
156
157 /**
158 * @openwire:property version=1 cache=true
159 */
160 public ConsumerId getConsumerId() {
161 return consumerId;
162 }
163
164 public void setConsumerId(ConsumerId consumerId) {
165 this.consumerId = consumerId;
166 }
167
168 /**
169 * @openwire:property version=1
170 */
171 public byte getAckType() {
172 return ackType;
173 }
174
175 public void setAckType(byte ackType) {
176 this.ackType = ackType;
177 }
178
179 /**
180 * @openwire:property version=1
181 */
182 public MessageId getFirstMessageId() {
183 return firstMessageId;
184 }
185
186 public void setFirstMessageId(MessageId firstMessageId) {
187 this.firstMessageId = firstMessageId;
188 }
189
190 /**
191 * @openwire:property version=1
192 */
193 public MessageId getLastMessageId() {
194 return lastMessageId;
195 }
196
197 public void setLastMessageId(MessageId lastMessageId) {
198 this.lastMessageId = lastMessageId;
199 }
200
201 /**
202 * The number of messages being acknowledged in the range.
203 *
204 * @openwire:property version=1
205 */
206 public int getMessageCount() {
207 return messageCount;
208 }
209
210 public void setMessageCount(int messageCount) {
211 this.messageCount = messageCount;
212 }
213
214 /**
215 * The cause of a poison ack, if a message listener
216 * throws an exception it will be recorded here
217 *
218 * @openwire:property version=7
219 */
220 public Throwable getPoisonCause() {
221 return poisonCause;
222 }
223
224 public void setPoisonCause(Throwable poisonCause) {
225 this.poisonCause = poisonCause;
226 }
227
228 public Response visit(CommandVisitor visitor) throws Exception {
229 return visitor.processMessageAck(this);
230 }
231
232 /**
233 * A helper method to allow a single message ID to be acknowledged
234 */
235 public void setMessageID(MessageId messageID) {
236 setFirstMessageId(messageID);
237 setLastMessageId(messageID);
238 setMessageCount(1);
239 }
240
241 }