001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.Arrays;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.concurrent.ConcurrentHashMap;
025
026 import org.apache.activemq.ActiveMQMessageAudit;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.command.ActiveMQTopic;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.command.MessageAck;
031 import org.apache.activemq.command.MessageId;
032 import org.apache.activemq.command.SubscriptionInfo;
033 import org.apache.activemq.store.MessageRecoveryListener;
034 import org.apache.activemq.store.TopicMessageStore;
035 import org.apache.activemq.util.ByteSequence;
036 import org.apache.activemq.util.IOExceptionSupport;
037 import org.apache.activemq.wireformat.WireFormat;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 /**
042 *
043 */
044 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
045
046 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
047 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
048
049 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
050 super(persistenceAdapter, adapter, wireFormat, topic, audit);
051 }
052
053 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
054 if (ack != null && ack.isUnmatchedAck()) {
055 if (LOG.isTraceEnabled()) {
056 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
057 }
058 return;
059 }
060 TransactionContext c = persistenceAdapter.getTransactionContext(context);
061 try {
062 long[] res = adapter.getStoreSequenceId(c, destination, messageId);
063 if (this.isPrioritizedMessages()) {
064 adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
065 } else {
066 adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
067 }
068 if (LOG.isTraceEnabled()) {
069 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
070 }
071 } catch (SQLException e) {
072 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
073 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
074 } finally {
075 c.close();
076 }
077 }
078
079 /**
080 * @throws Exception
081 */
082 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
083 TransactionContext c = persistenceAdapter.getTransactionContext();
084 try {
085 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
086 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
087 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
088 msg.getMessageId().setBrokerSequenceId(sequenceId);
089 return listener.recoverMessage(msg);
090 }
091
092 public boolean recoverMessageReference(String reference) throws Exception {
093 return listener.recoverMessageReference(new MessageId(reference));
094 }
095
096 });
097 } catch (SQLException e) {
098 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
099 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
100 } finally {
101 c.close();
102 }
103 }
104
105 private class LastRecovered implements Iterable<LastRecoveredEntry> {
106 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
107 LastRecovered() {
108 for (int i=0; i<perPriority.length; i++) {
109 perPriority[i] = new LastRecoveredEntry(i);
110 }
111 }
112
113 public void updateStored(long sequence, int priority) {
114 perPriority[priority].stored = sequence;
115 }
116
117 public LastRecoveredEntry defaultPriority() {
118 return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
119 }
120
121 public String toString() {
122 return Arrays.deepToString(perPriority);
123 }
124
125 public Iterator<LastRecoveredEntry> iterator() {
126 return new PriorityIterator();
127 }
128
129 class PriorityIterator implements Iterator<LastRecoveredEntry> {
130 int current = 9;
131 public boolean hasNext() {
132 for (int i=current; i>=0; i--) {
133 if (perPriority[i].hasMessages()) {
134 current = i;
135 return true;
136 }
137 }
138 return false;
139 }
140
141 public LastRecoveredEntry next() {
142 return perPriority[current];
143 }
144
145 public void remove() {
146 throw new RuntimeException("not implemented");
147 }
148 }
149 }
150
151 private class LastRecoveredEntry {
152 final int priority;
153 long recovered = 0;
154 long stored = Integer.MAX_VALUE;
155
156 public LastRecoveredEntry(int priority) {
157 this.priority = priority;
158 }
159
160 public String toString() {
161 return priority + "-" + stored + ":" + recovered;
162 }
163
164 public void exhausted() {
165 stored = recovered;
166 }
167
168 public boolean hasMessages() {
169 return stored > recovered;
170 }
171 }
172
173 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
174 final MessageRecoveryListener delegate;
175 final int maxMessages;
176 LastRecoveredEntry lastRecovered;
177 int recoveredCount;
178 int recoveredMarker;
179
180 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
181 this.delegate = delegate;
182 this.maxMessages = maxMessages;
183 }
184
185 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
186 if (delegate.hasSpace()) {
187 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
188 msg.getMessageId().setBrokerSequenceId(sequenceId);
189 if (delegate.recoverMessage(msg)) {
190 lastRecovered.recovered = sequenceId;
191 recoveredCount++;
192 return true;
193 }
194 }
195 return false;
196 }
197
198 public boolean recoverMessageReference(String reference) throws Exception {
199 return delegate.recoverMessageReference(new MessageId(reference));
200 }
201
202 public void setLastRecovered(LastRecoveredEntry lastRecovered) {
203 this.lastRecovered = lastRecovered;
204 recoveredMarker = recoveredCount;
205 }
206
207 public boolean complete() {
208 return !delegate.hasSpace() || recoveredCount == maxMessages;
209 }
210
211 public boolean stalled() {
212 return recoveredMarker == recoveredCount;
213 }
214 }
215
216 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
217 throws Exception {
218 //Duration duration = new Duration("recoverNextMessages");
219 TransactionContext c = persistenceAdapter.getTransactionContext();
220
221 String key = getSubscriptionKey(clientId, subscriptionName);
222 if (!subscriberLastRecoveredMap.containsKey(key)) {
223 subscriberLastRecoveredMap.put(key, new LastRecovered());
224 }
225 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
226 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
227 try {
228 if (LOG.isTraceEnabled()) {
229 LOG.trace(key + " existing last recovered: " + lastRecovered);
230 }
231 if (isPrioritizedMessages()) {
232 Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
233 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
234 LastRecoveredEntry entry = it.next();
235 recoveredAwareListener.setLastRecovered(entry);
236 //Duration microDuration = new Duration("recoverNextMessages:loop");
237 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
238 entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
239 //microDuration.end(entry);
240 if (recoveredAwareListener.stalled()) {
241 if (recoveredAwareListener.complete()) {
242 break;
243 } else {
244 entry.exhausted();
245 }
246 }
247 }
248 } else {
249 LastRecoveredEntry last = lastRecovered.defaultPriority();
250 recoveredAwareListener.setLastRecovered(last);
251 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
252 last.recovered, 0, maxReturned, recoveredAwareListener);
253 }
254 if (LOG.isTraceEnabled()) {
255 LOG.trace(key + " last recovered: " + lastRecovered);
256 }
257 //duration.end();
258 } catch (SQLException e) {
259 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
260 } finally {
261 c.close();
262 }
263 }
264
265 public void resetBatching(String clientId, String subscriptionName) {
266 subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
267 }
268
269 protected void onAdd(long sequenceId, byte priority) {
270 // update last recovered state
271 for (LastRecovered last : subscriberLastRecoveredMap.values()) {
272 last.updateStored(sequenceId, priority);
273 }
274 }
275
276
277 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
278 TransactionContext c = persistenceAdapter.getTransactionContext();
279 try {
280 c = persistenceAdapter.getTransactionContext();
281 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
282 } catch (SQLException e) {
283 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
284 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
285 } finally {
286 c.close();
287 }
288 }
289
290 /**
291 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
292 * String)
293 */
294 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
295 TransactionContext c = persistenceAdapter.getTransactionContext();
296 try {
297 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
298 } catch (SQLException e) {
299 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
300 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
301 } finally {
302 c.close();
303 }
304 }
305
306 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
307 TransactionContext c = persistenceAdapter.getTransactionContext();
308 try {
309 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
310 } catch (SQLException e) {
311 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
312 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
313 } finally {
314 c.close();
315 resetBatching(clientId, subscriptionName);
316 }
317 }
318
319 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
320 TransactionContext c = persistenceAdapter.getTransactionContext();
321 try {
322 return adapter.doGetAllSubscriptions(c, destination);
323 } catch (SQLException e) {
324 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
325 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
326 } finally {
327 c.close();
328 }
329 }
330
331 public int getMessageCount(String clientId, String subscriberName) throws IOException {
332 //Duration duration = new Duration("getMessageCount");
333 int result = 0;
334 TransactionContext c = persistenceAdapter.getTransactionContext();
335 try {
336 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
337 } catch (SQLException e) {
338 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
339 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
340 } finally {
341 c.close();
342 }
343 if (LOG.isTraceEnabled()) {
344 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
345 }
346 //duration.end();
347 return result;
348 }
349
350 protected String getSubscriptionKey(String clientId, String subscriberName) {
351 String result = clientId + ":";
352 result += subscriberName != null ? subscriberName : "NOT_SET";
353 return result;
354 }
355
356 }