001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.LinkedList;
023 import java.util.Map;
024 import java.util.Map.Entry;
025
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.command.ActiveMQBytesMessage;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQMessage;
031 import org.apache.activemq.command.ConsumerInfo;
032 import org.apache.activemq.command.MessageAck;
033 import org.apache.activemq.command.MessageDispatch;
034 import org.apache.activemq.command.MessageId;
035 import org.apache.activemq.command.TransactionId;
036
037 /**
038 * Keeps track of the STOMP subscription so that acking is correctly done.
039 *
040 * @author <a href="http://hiramchirino.com">chirino</a>
041 */
042 public class StompSubscription {
043
044 public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045 public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046 public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047
048 private final ProtocolConverter protocolConverter;
049 private final String subscriptionId;
050 private final ConsumerInfo consumerInfo;
051
052 private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
053 private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
054
055 private String ackMode = AUTO_ACK;
056 private ActiveMQDestination destination;
057 private String transformation;
058
059
060 public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
061 this.protocolConverter = stompTransport;
062 this.subscriptionId = subscriptionId;
063 this.consumerInfo = consumerInfo;
064 this.transformation = transformation;
065 }
066
067 void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
068 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
069 if (ackMode == CLIENT_ACK) {
070 synchronized (this) {
071 dispatchedMessage.put(message.getMessageId(), md);
072 }
073 } else if (ackMode == INDIVIDUAL_ACK) {
074 synchronized (this) {
075 dispatchedMessage.put(message.getMessageId(), md);
076 }
077 } else if (ackMode == AUTO_ACK) {
078 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
079 protocolConverter.getStompTransport().sendToActiveMQ(ack);
080 }
081
082 boolean ignoreTransformation = false;
083
084 if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
085 message.setReadOnlyProperties(false);
086 message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
087 } else {
088 if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
089 ignoreTransformation = true;
090 }
091 }
092
093 StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
094
095 command.setAction(Stomp.Responses.MESSAGE);
096 if (subscriptionId != null) {
097 command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
098 }
099
100 protocolConverter.getStompTransport().sendToStomp(command);
101 }
102
103 synchronized void onStompAbort(TransactionId transactionId) {
104 unconsumedMessage.clear();
105 }
106
107 synchronized void onStompCommit(TransactionId transactionId) {
108 for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
109 Map.Entry entry = (Entry)iter.next();
110 MessageId id = (MessageId)entry.getKey();
111 MessageDispatch msg = (MessageDispatch)entry.getValue();
112 if (unconsumedMessage.contains(msg)) {
113 iter.remove();
114 }
115 }
116 unconsumedMessage.clear();
117 }
118
119 synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
120
121 MessageId msgId = new MessageId(messageId);
122
123 if (!dispatchedMessage.containsKey(msgId)) {
124 return null;
125 }
126
127 MessageAck ack = new MessageAck();
128 ack.setDestination(consumerInfo.getDestination());
129 ack.setConsumerId(consumerInfo.getConsumerId());
130
131 if (ackMode == CLIENT_ACK) {
132 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
133 int count = 0;
134 for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
135
136 Map.Entry entry = (Entry)iter.next();
137 MessageId id = (MessageId)entry.getKey();
138 MessageDispatch msg = (MessageDispatch)entry.getValue();
139
140 if (ack.getFirstMessageId() == null) {
141 ack.setFirstMessageId(id);
142 }
143
144 if (transactionId != null) {
145 if (!unconsumedMessage.contains(msg)) {
146 unconsumedMessage.add(msg);
147 }
148 } else {
149 iter.remove();
150 }
151
152
153 count++;
154
155 if (id.equals(msgId)) {
156 ack.setLastMessageId(id);
157 break;
158 }
159
160 }
161 ack.setMessageCount(count);
162 if (transactionId != null) {
163 ack.setTransactionId(transactionId);
164 }
165 }
166 else if (ackMode == INDIVIDUAL_ACK) {
167 ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
168 ack.setMessageID(msgId);
169 if (transactionId != null) {
170 unconsumedMessage.add(dispatchedMessage.get(msgId));
171 ack.setTransactionId(transactionId);
172 }
173 dispatchedMessage.remove(msgId);
174 }
175 return ack;
176 }
177
178 public String getAckMode() {
179 return ackMode;
180 }
181
182 public void setAckMode(String ackMode) {
183 this.ackMode = ackMode;
184 }
185
186 public String getSubscriptionId() {
187 return subscriptionId;
188 }
189
190 public void setDestination(ActiveMQDestination destination) {
191 this.destination = destination;
192 }
193
194 public ActiveMQDestination getDestination() {
195 return destination;
196 }
197
198 public ConsumerInfo getConsumerInfo() {
199 return consumerInfo;
200 }
201
202 }