001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.concurrent.ExecutorService;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.TimeUnit;
030 import java.util.concurrent.atomic.AtomicBoolean;
031 import java.util.concurrent.atomic.AtomicReference;
032 import javax.jms.IllegalStateException;
033 import javax.jms.InvalidDestinationException;
034 import javax.jms.JMSException;
035 import javax.jms.Message;
036 import javax.jms.MessageConsumer;
037 import javax.jms.MessageListener;
038 import javax.jms.TransactionRolledBackException;
039 import org.apache.activemq.blob.BlobDownloader;
040 import org.apache.activemq.command.ActiveMQBlobMessage;
041 import org.apache.activemq.command.ActiveMQDestination;
042 import org.apache.activemq.command.ActiveMQMessage;
043 import org.apache.activemq.command.ActiveMQTempDestination;
044 import org.apache.activemq.command.CommandTypes;
045 import org.apache.activemq.command.ConsumerId;
046 import org.apache.activemq.command.ConsumerInfo;
047 import org.apache.activemq.command.MessageAck;
048 import org.apache.activemq.command.MessageDispatch;
049 import org.apache.activemq.command.MessageId;
050 import org.apache.activemq.command.MessagePull;
051 import org.apache.activemq.command.RemoveInfo;
052 import org.apache.activemq.command.TransactionId;
053 import org.apache.activemq.management.JMSConsumerStatsImpl;
054 import org.apache.activemq.management.StatsCapable;
055 import org.apache.activemq.management.StatsImpl;
056 import org.apache.activemq.selector.SelectorParser;
057 import org.apache.activemq.thread.Scheduler;
058 import org.apache.activemq.transaction.Synchronization;
059 import org.apache.activemq.util.Callback;
060 import org.apache.activemq.util.IntrospectionSupport;
061 import org.apache.activemq.util.JMSExceptionSupport;
062 import org.slf4j.Logger;
063 import org.slf4j.LoggerFactory;
064
065 /**
066 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
067 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
068 * passing a <CODE>Destination</CODE> object to a message-consumer creation
069 * method supplied by a session.
070 * <P>
071 * <CODE>MessageConsumer</CODE> is the parent interface for all message
072 * consumers.
073 * <P>
074 * A message consumer can be created with a message selector. A message selector
075 * allows the client to restrict the messages delivered to the message consumer
076 * to those that match the selector.
077 * <P>
078 * A client may either synchronously receive a message consumer's messages or
079 * have the consumer asynchronously deliver them as they arrive.
080 * <P>
081 * For synchronous receipt, a client can request the next message from a message
082 * consumer using one of its <CODE> receive</CODE> methods. There are several
083 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
084 * the next message.
085 * <P>
086 * For asynchronous delivery, a client can register a
087 * <CODE>MessageListener</CODE> object with a message consumer. As messages
088 * arrive at the message consumer, it delivers them by calling the
089 * <CODE>MessageListener</CODE>'s<CODE>
090 * onMessage</CODE> method.
091 * <P>
092 * It is a client programming error for a <CODE>MessageListener</CODE> to
093 * throw an exception.
094 *
095 *
096 * @see javax.jms.MessageConsumer
097 * @see javax.jms.QueueReceiver
098 * @see javax.jms.TopicSubscriber
099 * @see javax.jms.Session
100 */
101 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
102
103 @SuppressWarnings("serial")
104 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
105 final TransactionId transactionId;
106 public PreviouslyDeliveredMap(TransactionId transactionId) {
107 this.transactionId = transactionId;
108 }
109 }
110
111 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
112 protected final Scheduler scheduler;
113 protected final ActiveMQSession session;
114 protected final ConsumerInfo info;
115
116 // These are the messages waiting to be delivered to the client
117 protected final MessageDispatchChannel unconsumedMessages;
118
119 // The are the messages that were delivered to the consumer but that have
120 // not been acknowledged. It's kept in reverse order since we
121 // Always walk list in reverse order.
122 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
123 // track duplicate deliveries in a transaction such that the tx integrity can be validated
124 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
125 private int deliveredCounter;
126 private int additionalWindowSize;
127 private long redeliveryDelay;
128 private int ackCounter;
129 private int dispatchedCount;
130 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
131 private final JMSConsumerStatsImpl stats;
132
133 private final String selector;
134 private boolean synchronizationRegistered;
135 private final AtomicBoolean started = new AtomicBoolean(false);
136
137 private MessageAvailableListener availableListener;
138
139 private RedeliveryPolicy redeliveryPolicy;
140 private boolean optimizeAcknowledge;
141 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
142 private ExecutorService executorService;
143 private MessageTransformer transformer;
144 private boolean clearDispatchList;
145 boolean inProgressClearRequiredFlag;
146
147 private MessageAck pendingAck;
148 private long lastDeliveredSequenceId;
149
150 private IOException failureError;
151
152 private long optimizeAckTimestamp = System.currentTimeMillis();
153 private final long optimizeAckTimeout = 300;
154 private long failoverRedeliveryWaitPeriod = 0;
155
156 /**
157 * Create a MessageConsumer
158 *
159 * @param session
160 * @param dest
161 * @param name
162 * @param selector
163 * @param prefetch
164 * @param maximumPendingMessageCount
165 * @param noLocal
166 * @param browser
167 * @param dispatchAsync
168 * @param messageListener
169 * @throws JMSException
170 */
171 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
172 String name, String selector, int prefetch,
173 int maximumPendingMessageCount, boolean noLocal, boolean browser,
174 boolean dispatchAsync, MessageListener messageListener) throws JMSException {
175 if (dest == null) {
176 throw new InvalidDestinationException("Don't understand null destinations");
177 } else if (dest.getPhysicalName() == null) {
178 throw new InvalidDestinationException("The destination object was not given a physical name.");
179 } else if (dest.isTemporary()) {
180 String physicalName = dest.getPhysicalName();
181
182 if (physicalName == null) {
183 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
184 }
185
186 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
187
188 if (physicalName.indexOf(connectionID) < 0) {
189 throw new InvalidDestinationException(
190 "Cannot use a Temporary destination from another Connection");
191 }
192
193 if (session.connection.isDeleted(dest)) {
194 throw new InvalidDestinationException(
195 "Cannot use a Temporary destination that has been deleted");
196 }
197 if (prefetch < 0) {
198 throw new JMSException("Cannot have a prefetch size less than zero");
199 }
200 }
201 if (session.connection.isMessagePrioritySupported()) {
202 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
203 }else {
204 this.unconsumedMessages = new FifoMessageDispatchChannel();
205 }
206
207 this.session = session;
208 this.scheduler = session.getScheduler();
209 this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
210 setTransformer(session.getTransformer());
211
212 this.info = new ConsumerInfo(consumerId);
213 this.info.setExclusive(this.session.connection.isExclusiveConsumer());
214 this.info.setSubscriptionName(name);
215 this.info.setPrefetchSize(prefetch);
216 this.info.setCurrentPrefetchSize(prefetch);
217 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
218 this.info.setNoLocal(noLocal);
219 this.info.setDispatchAsync(dispatchAsync);
220 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
221 this.info.setSelector(null);
222
223 // Allows the options on the destination to configure the consumerInfo
224 if (dest.getOptions() != null) {
225 Map<String, String> options = new HashMap<String, String>(dest.getOptions());
226 IntrospectionSupport.setProperties(this.info, options, "consumer.");
227 }
228
229 this.info.setDestination(dest);
230 this.info.setBrowser(browser);
231 if (selector != null && selector.trim().length() != 0) {
232 // Validate the selector
233 SelectorParser.parse(selector);
234 this.info.setSelector(selector);
235 this.selector = selector;
236 } else if (info.getSelector() != null) {
237 // Validate the selector
238 SelectorParser.parse(this.info.getSelector());
239 this.selector = this.info.getSelector();
240 } else {
241 this.selector = null;
242 }
243
244 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
245 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
246 && !info.isBrowser();
247 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
248 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
249 if (messageListener != null) {
250 setMessageListener(messageListener);
251 }
252 try {
253 this.session.addConsumer(this);
254 this.session.syncSendPacket(info);
255 } catch (JMSException e) {
256 this.session.removeConsumer(this);
257 throw e;
258 }
259
260 if (session.connection.isStarted()) {
261 start();
262 }
263 }
264
265 private boolean isAutoAcknowledgeEach() {
266 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
267 }
268
269 private boolean isAutoAcknowledgeBatch() {
270 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
271 }
272
273 public StatsImpl getStats() {
274 return stats;
275 }
276
277 public JMSConsumerStatsImpl getConsumerStats() {
278 return stats;
279 }
280
281 public RedeliveryPolicy getRedeliveryPolicy() {
282 return redeliveryPolicy;
283 }
284
285 /**
286 * Sets the redelivery policy used when messages are redelivered
287 */
288 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
289 this.redeliveryPolicy = redeliveryPolicy;
290 }
291
292 public MessageTransformer getTransformer() {
293 return transformer;
294 }
295
296 /**
297 * Sets the transformer used to transform messages before they are sent on
298 * to the JMS bus
299 */
300 public void setTransformer(MessageTransformer transformer) {
301 this.transformer = transformer;
302 }
303
304 /**
305 * @return Returns the value.
306 */
307 public ConsumerId getConsumerId() {
308 return info.getConsumerId();
309 }
310
311 /**
312 * @return the consumer name - used for durable consumers
313 */
314 public String getConsumerName() {
315 return this.info.getSubscriptionName();
316 }
317
318 /**
319 * @return true if this consumer does not accept locally produced messages
320 */
321 protected boolean isNoLocal() {
322 return info.isNoLocal();
323 }
324
325 /**
326 * Retrieve is a browser
327 *
328 * @return true if a browser
329 */
330 protected boolean isBrowser() {
331 return info.isBrowser();
332 }
333
334 /**
335 * @return ActiveMQDestination
336 */
337 protected ActiveMQDestination getDestination() {
338 return info.getDestination();
339 }
340
341 /**
342 * @return Returns the prefetchNumber.
343 */
344 public int getPrefetchNumber() {
345 return info.getPrefetchSize();
346 }
347
348 /**
349 * @return true if this is a durable topic subscriber
350 */
351 public boolean isDurableSubscriber() {
352 return info.getSubscriptionName() != null && info.getDestination().isTopic();
353 }
354
355 /**
356 * Gets this message consumer's message selector expression.
357 *
358 * @return this message consumer's message selector, or null if no message
359 * selector exists for the message consumer (that is, if the message
360 * selector was not set or was set to null or the empty string)
361 * @throws JMSException if the JMS provider fails to receive the next
362 * message due to some internal error.
363 */
364 public String getMessageSelector() throws JMSException {
365 checkClosed();
366 return selector;
367 }
368
369 /**
370 * Gets the message consumer's <CODE>MessageListener</CODE>.
371 *
372 * @return the listener for the message consumer, or null if no listener is
373 * set
374 * @throws JMSException if the JMS provider fails to get the message
375 * listener due to some internal error.
376 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
377 */
378 public MessageListener getMessageListener() throws JMSException {
379 checkClosed();
380 return this.messageListener.get();
381 }
382
383 /**
384 * Sets the message consumer's <CODE>MessageListener</CODE>.
385 * <P>
386 * Setting the message listener to null is the equivalent of unsetting the
387 * message listener for the message consumer.
388 * <P>
389 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
390 * while messages are being consumed by an existing listener or the consumer
391 * is being used to consume messages synchronously is undefined.
392 *
393 * @param listener the listener to which the messages are to be delivered
394 * @throws JMSException if the JMS provider fails to receive the next
395 * message due to some internal error.
396 * @see javax.jms.MessageConsumer#getMessageListener
397 */
398 public void setMessageListener(MessageListener listener) throws JMSException {
399 checkClosed();
400 if (info.getPrefetchSize() == 0) {
401 throw new JMSException(
402 "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
403 }
404 if (listener != null) {
405 boolean wasRunning = session.isRunning();
406 if (wasRunning) {
407 session.stop();
408 }
409
410 this.messageListener.set(listener);
411 session.redispatch(this, unconsumedMessages);
412
413 if (wasRunning) {
414 session.start();
415 }
416 } else {
417 this.messageListener.set(null);
418 }
419 }
420
421 public MessageAvailableListener getAvailableListener() {
422 return availableListener;
423 }
424
425 /**
426 * Sets the listener used to notify synchronous consumers that there is a
427 * message available so that the {@link MessageConsumer#receiveNoWait()} can
428 * be called.
429 */
430 public void setAvailableListener(MessageAvailableListener availableListener) {
431 this.availableListener = availableListener;
432 }
433
434 /**
435 * Used to get an enqueued message from the unconsumedMessages list. The
436 * amount of time this method blocks is based on the timeout value. - if
437 * timeout==-1 then it blocks until a message is received. - if timeout==0
438 * then it it tries to not block at all, it returns a message if it is
439 * available - if timeout>0 then it blocks up to timeout amount of time.
440 * Expired messages will consumed by this method.
441 *
442 * @throws JMSException
443 * @return null if we timeout or if the consumer is closed.
444 */
445 private MessageDispatch dequeue(long timeout) throws JMSException {
446 try {
447 long deadline = 0;
448 if (timeout > 0) {
449 deadline = System.currentTimeMillis() + timeout;
450 }
451 while (true) {
452 MessageDispatch md = unconsumedMessages.dequeue(timeout);
453 if (md == null) {
454 if (timeout > 0 && !unconsumedMessages.isClosed()) {
455 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
456 } else {
457 if (failureError != null) {
458 throw JMSExceptionSupport.create(failureError);
459 } else {
460 return null;
461 }
462 }
463 } else if (md.getMessage() == null) {
464 return null;
465 } else if (md.getMessage().isExpired()) {
466 if (LOG.isDebugEnabled()) {
467 LOG.debug(getConsumerId() + " received expired message: " + md);
468 }
469 beforeMessageIsConsumed(md);
470 afterMessageIsConsumed(md, true);
471 if (timeout > 0) {
472 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
473 }
474 } else {
475 if (LOG.isTraceEnabled()) {
476 LOG.trace(getConsumerId() + " received message: " + md);
477 }
478 return md;
479 }
480 }
481 } catch (InterruptedException e) {
482 Thread.currentThread().interrupt();
483 throw JMSExceptionSupport.create(e);
484 }
485 }
486
487 /**
488 * Receives the next message produced for this message consumer.
489 * <P>
490 * This call blocks indefinitely until a message is produced or until this
491 * message consumer is closed.
492 * <P>
493 * If this <CODE>receive</CODE> is done within a transaction, the consumer
494 * retains the message until the transaction commits.
495 *
496 * @return the next message produced for this message consumer, or null if
497 * this message consumer is concurrently closed
498 */
499 public Message receive() throws JMSException {
500 checkClosed();
501 checkMessageListener();
502
503 sendPullCommand(0);
504 MessageDispatch md = dequeue(-1);
505 if (md == null) {
506 return null;
507 }
508
509 beforeMessageIsConsumed(md);
510 afterMessageIsConsumed(md, false);
511
512 return createActiveMQMessage(md);
513 }
514
515 /**
516 * @param md
517 * @return
518 */
519 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
520 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
521 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
522 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
523 }
524 if (transformer != null) {
525 Message transformedMessage = transformer.consumerTransform(session, this, m);
526 if (transformedMessage != null) {
527 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
528 }
529 }
530 if (session.isClientAcknowledge()) {
531 m.setAcknowledgeCallback(new Callback() {
532 public void execute() throws Exception {
533 session.checkClosed();
534 session.acknowledge();
535 }
536 });
537 }else if (session.isIndividualAcknowledge()) {
538 m.setAcknowledgeCallback(new Callback() {
539 public void execute() throws Exception {
540 session.checkClosed();
541 acknowledge(md);
542 }
543 });
544 }
545 return m;
546 }
547
548 /**
549 * Receives the next message that arrives within the specified timeout
550 * interval.
551 * <P>
552 * This call blocks until a message arrives, the timeout expires, or this
553 * message consumer is closed. A <CODE>timeout</CODE> of zero never
554 * expires, and the call blocks indefinitely.
555 *
556 * @param timeout the timeout value (in milliseconds), a time out of zero
557 * never expires.
558 * @return the next message produced for this message consumer, or null if
559 * the timeout expires or this message consumer is concurrently
560 * closed
561 */
562 public Message receive(long timeout) throws JMSException {
563 checkClosed();
564 checkMessageListener();
565 if (timeout == 0) {
566 return this.receive();
567
568 }
569
570 sendPullCommand(timeout);
571 while (timeout > 0) {
572
573 MessageDispatch md;
574 if (info.getPrefetchSize() == 0) {
575 md = dequeue(-1); // We let the broker let us know when we timeout.
576 } else {
577 md = dequeue(timeout);
578 }
579
580 if (md == null) {
581 return null;
582 }
583
584 beforeMessageIsConsumed(md);
585 afterMessageIsConsumed(md, false);
586 return createActiveMQMessage(md);
587 }
588 return null;
589 }
590
591 /**
592 * Receives the next message if one is immediately available.
593 *
594 * @return the next message produced for this message consumer, or null if
595 * one is not available
596 * @throws JMSException if the JMS provider fails to receive the next
597 * message due to some internal error.
598 */
599 public Message receiveNoWait() throws JMSException {
600 checkClosed();
601 checkMessageListener();
602 sendPullCommand(-1);
603
604 MessageDispatch md;
605 if (info.getPrefetchSize() == 0) {
606 md = dequeue(-1); // We let the broker let us know when we
607 // timeout.
608 } else {
609 md = dequeue(0);
610 }
611
612 if (md == null) {
613 return null;
614 }
615
616 beforeMessageIsConsumed(md);
617 afterMessageIsConsumed(md, false);
618 return createActiveMQMessage(md);
619 }
620
621 /**
622 * Closes the message consumer.
623 * <P>
624 * Since a provider may allocate some resources on behalf of a <CODE>
625 * MessageConsumer</CODE>
626 * outside the Java virtual machine, clients should close them when they are
627 * not needed. Relying on garbage collection to eventually reclaim these
628 * resources may not be timely enough.
629 * <P>
630 * This call blocks until a <CODE>receive</CODE> or message listener in
631 * progress has completed. A blocked message consumer <CODE>receive </CODE>
632 * call returns null when this message consumer is closed.
633 *
634 * @throws JMSException if the JMS provider fails to close the consumer due
635 * to some internal error.
636 */
637 public void close() throws JMSException {
638 if (!unconsumedMessages.isClosed()) {
639 if (session.getTransactionContext().isInTransaction()) {
640 session.getTransactionContext().addSynchronization(new Synchronization() {
641 @Override
642 public void afterCommit() throws Exception {
643 doClose();
644 }
645
646 @Override
647 public void afterRollback() throws Exception {
648 doClose();
649 }
650 });
651 } else {
652 doClose();
653 }
654 }
655 }
656
657 void doClose() throws JMSException {
658 dispose();
659 RemoveInfo removeCommand = info.createRemoveCommand();
660 if (LOG.isDebugEnabled()) {
661 LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
662 }
663 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
664 this.session.asyncSendPacket(removeCommand);
665 }
666
667 void inProgressClearRequired() {
668 inProgressClearRequiredFlag = true;
669 // deal with delivered messages async to avoid lock contention with in progress acks
670 clearDispatchList = true;
671 }
672
673 void clearMessagesInProgress() {
674 if (inProgressClearRequiredFlag) {
675 synchronized (unconsumedMessages.getMutex()) {
676 if (inProgressClearRequiredFlag) {
677 if (LOG.isDebugEnabled()) {
678 LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
679 }
680 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
681 List<MessageDispatch> list = unconsumedMessages.removeAll();
682 if (!this.info.isBrowser()) {
683 for (MessageDispatch old : list) {
684 session.connection.rollbackDuplicate(this, old.getMessage());
685 }
686 }
687 // allow dispatch on this connection to resume
688 session.connection.transportInterruptionProcessingComplete();
689 inProgressClearRequiredFlag = false;
690 }
691 }
692 }
693 }
694
695 void deliverAcks() {
696 MessageAck ack = null;
697 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
698 if (isAutoAcknowledgeEach()) {
699 synchronized(deliveredMessages) {
700 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
701 if (ack != null) {
702 deliveredMessages.clear();
703 ackCounter = 0;
704 } else {
705 ack = pendingAck;
706 pendingAck = null;
707 }
708 }
709 } else if (pendingAck != null && pendingAck.isStandardAck()) {
710 ack = pendingAck;
711 pendingAck = null;
712 }
713 if (ack != null) {
714 final MessageAck ackToSend = ack;
715
716 if (executorService == null) {
717 executorService = Executors.newSingleThreadExecutor();
718 }
719 executorService.submit(new Runnable() {
720 public void run() {
721 try {
722 session.sendAck(ackToSend,true);
723 } catch (JMSException e) {
724 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
725 } finally {
726 deliveryingAcknowledgements.set(false);
727 }
728 }
729 });
730 } else {
731 deliveryingAcknowledgements.set(false);
732 }
733 }
734 }
735
736 public void dispose() throws JMSException {
737 if (!unconsumedMessages.isClosed()) {
738
739 // Do we have any acks we need to send out before closing?
740 // Ack any delivered messages now.
741 if (!session.getTransacted()) {
742 deliverAcks();
743 if (isAutoAcknowledgeBatch()) {
744 acknowledge();
745 }
746 }
747 if (executorService != null) {
748 executorService.shutdown();
749 try {
750 executorService.awaitTermination(60, TimeUnit.SECONDS);
751 } catch (InterruptedException e) {
752 Thread.currentThread().interrupt();
753 }
754 }
755
756 if (session.isClientAcknowledge()) {
757 if (!this.info.isBrowser()) {
758 // rollback duplicates that aren't acknowledged
759 List<MessageDispatch> tmp = null;
760 synchronized (this.deliveredMessages) {
761 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
762 }
763 for (MessageDispatch old : tmp) {
764 this.session.connection.rollbackDuplicate(this, old.getMessage());
765 }
766 tmp.clear();
767 }
768 }
769 if (!session.isTransacted()) {
770 synchronized(deliveredMessages) {
771 deliveredMessages.clear();
772 }
773 }
774 unconsumedMessages.close();
775 this.session.removeConsumer(this);
776 List<MessageDispatch> list = unconsumedMessages.removeAll();
777 if (!this.info.isBrowser()) {
778 for (MessageDispatch old : list) {
779 // ensure we don't filter this as a duplicate
780 session.connection.rollbackDuplicate(this, old.getMessage());
781 }
782 }
783 }
784 }
785
786 /**
787 * @throws IllegalStateException
788 */
789 protected void checkClosed() throws IllegalStateException {
790 if (unconsumedMessages.isClosed()) {
791 throw new IllegalStateException("The Consumer is closed");
792 }
793 }
794
795 /**
796 * If we have a zero prefetch specified then send a pull command to the
797 * broker to pull a message we are about to receive
798 */
799 protected void sendPullCommand(long timeout) throws JMSException {
800 clearDispatchList();
801 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
802 MessagePull messagePull = new MessagePull();
803 messagePull.configure(info);
804 messagePull.setTimeout(timeout);
805 session.asyncSendPacket(messagePull);
806 }
807 }
808
809 protected void checkMessageListener() throws JMSException {
810 session.checkMessageListener();
811 }
812
813 protected void setOptimizeAcknowledge(boolean value) {
814 if (optimizeAcknowledge && !value) {
815 deliverAcks();
816 }
817 optimizeAcknowledge = value;
818 }
819
820 protected void setPrefetchSize(int prefetch) {
821 deliverAcks();
822 this.info.setCurrentPrefetchSize(prefetch);
823 }
824
825 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
826 md.setDeliverySequenceId(session.getNextDeliveryId());
827 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
828 if (!isAutoAcknowledgeBatch()) {
829 synchronized(deliveredMessages) {
830 deliveredMessages.addFirst(md);
831 }
832 if (session.getTransacted()) {
833 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
834 }
835 }
836 }
837
838 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
839 if (unconsumedMessages.isClosed()) {
840 return;
841 }
842 if (messageExpired) {
843 synchronized (deliveredMessages) {
844 deliveredMessages.remove(md);
845 }
846 stats.getExpiredMessageCount().increment();
847 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
848 } else {
849 stats.onMessage();
850 if (session.getTransacted()) {
851 // Do nothing.
852 } else if (isAutoAcknowledgeEach()) {
853 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
854 synchronized (deliveredMessages) {
855 if (!deliveredMessages.isEmpty()) {
856 if (optimizeAcknowledge) {
857 ackCounter++;
858 if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
859 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
860 if (ack != null) {
861 deliveredMessages.clear();
862 ackCounter = 0;
863 session.sendAck(ack);
864 optimizeAckTimestamp = System.currentTimeMillis();
865 }
866 }
867 } else {
868 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
869 if (ack!=null) {
870 deliveredMessages.clear();
871 session.sendAck(ack);
872 }
873 }
874 }
875 }
876 deliveryingAcknowledgements.set(false);
877 }
878 } else if (isAutoAcknowledgeBatch()) {
879 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
880 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
881 boolean messageUnackedByConsumer = false;
882 synchronized (deliveredMessages) {
883 messageUnackedByConsumer = deliveredMessages.contains(md);
884 }
885 if (messageUnackedByConsumer) {
886 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
887 }
888 }
889 else {
890 throw new IllegalStateException("Invalid session state.");
891 }
892 }
893 }
894
895 /**
896 * Creates a MessageAck for all messages contained in deliveredMessages.
897 * Caller should hold the lock for deliveredMessages.
898 *
899 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
900 * @return <code>null</code> if nothing to ack.
901 */
902 private MessageAck makeAckForAllDeliveredMessages(byte type) {
903 synchronized (deliveredMessages) {
904 if (deliveredMessages.isEmpty())
905 return null;
906
907 MessageDispatch md = deliveredMessages.getFirst();
908 MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
909 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
910 return ack;
911 }
912 }
913
914 private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
915
916 // Don't acknowledge now, but we may need to let the broker know the
917 // consumer got the message to expand the pre-fetch window
918 if (session.getTransacted()) {
919 session.doStartTransaction();
920 if (!synchronizationRegistered) {
921 synchronizationRegistered = true;
922 session.getTransactionContext().addSynchronization(new Synchronization() {
923 @Override
924 public void beforeEnd() throws Exception {
925 acknowledge();
926 synchronizationRegistered = false;
927 }
928
929 @Override
930 public void afterCommit() throws Exception {
931 commit();
932 synchronizationRegistered = false;
933 }
934
935 @Override
936 public void afterRollback() throws Exception {
937 rollback();
938 synchronizationRegistered = false;
939 }
940 });
941 }
942 }
943
944 deliveredCounter++;
945
946 MessageAck oldPendingAck = pendingAck;
947 pendingAck = new MessageAck(md, ackType, deliveredCounter);
948 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
949 if( oldPendingAck==null ) {
950 pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
951 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
952 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
953 } else {
954 // old pending ack being superseded by ack of another type, if is is not a delivered
955 // ack and hence important, send it now so it is not lost.
956 if ( !oldPendingAck.isDeliveredAck()) {
957 if (LOG.isDebugEnabled()) {
958 LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
959 }
960 session.sendAck(oldPendingAck);
961 } else {
962 if (LOG.isDebugEnabled()) {
963 LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
964 }
965 }
966 }
967
968 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
969 session.sendAck(pendingAck);
970 pendingAck=null;
971 deliveredCounter = 0;
972 additionalWindowSize = 0;
973 }
974 }
975
976 /**
977 * Acknowledge all the messages that have been delivered to the client up to
978 * this point.
979 *
980 * @throws JMSException
981 */
982 public void acknowledge() throws JMSException {
983 clearDispatchList();
984 waitForRedeliveries();
985 synchronized(deliveredMessages) {
986 // Acknowledge all messages so far.
987 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
988 if (ack == null)
989 return; // no msgs
990
991 if (session.getTransacted()) {
992 rollbackOnFailedRecoveryRedelivery();
993 session.doStartTransaction();
994 ack.setTransactionId(session.getTransactionContext().getTransactionId());
995 }
996 session.sendAck(ack);
997 pendingAck = null;
998
999 // Adjust the counters
1000 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1001 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1002
1003 if (!session.getTransacted()) {
1004 deliveredMessages.clear();
1005 }
1006 }
1007 }
1008
1009 private void waitForRedeliveries() {
1010 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1011 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1012 int numberNotReplayed;
1013 do {
1014 numberNotReplayed = 0;
1015 synchronized(deliveredMessages) {
1016 if (previouslyDeliveredMessages != null) {
1017 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1018 if (!entry.getValue()) {
1019 numberNotReplayed++;
1020 }
1021 }
1022 }
1023 }
1024 if (numberNotReplayed > 0) {
1025 LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1026 + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId());
1027 try {
1028 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1029 } catch (InterruptedException outOfhere) {
1030 break;
1031 }
1032 }
1033 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1034 }
1035 }
1036
1037 /*
1038 * called with deliveredMessages locked
1039 */
1040 private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1041 if (previouslyDeliveredMessages != null) {
1042 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1043 // as messages have been dispatched else where.
1044 int numberNotReplayed = 0;
1045 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1046 if (!entry.getValue()) {
1047 numberNotReplayed++;
1048 if (LOG.isDebugEnabled()) {
1049 LOG.debug("previously delivered message has not been replayed in transaction: "
1050 + previouslyDeliveredMessages.transactionId
1051 + " , messageId: " + entry.getKey());
1052 }
1053 }
1054 }
1055 if (numberNotReplayed > 0) {
1056 String message = "rolling back transaction ("
1057 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1058 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1059 LOG.warn(message);
1060 throw new TransactionRolledBackException(message);
1061 }
1062 }
1063 }
1064
1065 void acknowledge(MessageDispatch md) throws JMSException {
1066 MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1067 session.sendAck(ack);
1068 synchronized(deliveredMessages){
1069 deliveredMessages.remove(md);
1070 }
1071 }
1072
1073 public void commit() throws JMSException {
1074 synchronized (deliveredMessages) {
1075 deliveredMessages.clear();
1076 clearPreviouslyDelivered();
1077 }
1078 redeliveryDelay = 0;
1079 }
1080
1081 public void rollback() throws JMSException {
1082 synchronized (unconsumedMessages.getMutex()) {
1083 if (optimizeAcknowledge) {
1084 // remove messages read but not acked at the broker yet through
1085 // optimizeAcknowledge
1086 if (!this.info.isBrowser()) {
1087 synchronized(deliveredMessages) {
1088 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1089 // ensure we don't filter this as a duplicate
1090 MessageDispatch md = deliveredMessages.removeLast();
1091 session.connection.rollbackDuplicate(this, md.getMessage());
1092 }
1093 }
1094 }
1095 }
1096 synchronized(deliveredMessages) {
1097 rollbackPreviouslyDeliveredAndNotRedelivered();
1098 if (deliveredMessages.isEmpty()) {
1099 return;
1100 }
1101
1102 // use initial delay for first redelivery
1103 MessageDispatch lastMd = deliveredMessages.getFirst();
1104 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1105 if (currentRedeliveryCount > 0) {
1106 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1107 } else {
1108 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1109 }
1110 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1111
1112 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1113 MessageDispatch md = iter.next();
1114 md.getMessage().onMessageRolledBack();
1115 // ensure we don't filter this as a duplicate
1116 session.connection.rollbackDuplicate(this, md.getMessage());
1117 }
1118
1119 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1120 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1121 // We need to NACK the messages so that they get sent to the
1122 // DLQ.
1123 // Acknowledge the last message.
1124
1125 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1126 ack.setPoisonCause(lastMd.getRollbackCause());
1127 ack.setFirstMessageId(firstMsgId);
1128 session.sendAck(ack,true);
1129 // Adjust the window size.
1130 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1131 redeliveryDelay = 0;
1132 } else {
1133
1134 // only redelivery_ack after first delivery
1135 if (currentRedeliveryCount > 0) {
1136 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1137 ack.setFirstMessageId(firstMsgId);
1138 session.sendAck(ack,true);
1139 }
1140
1141 // stop the delivery of messages.
1142 unconsumedMessages.stop();
1143
1144 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1145 MessageDispatch md = iter.next();
1146 unconsumedMessages.enqueueFirst(md);
1147 }
1148
1149 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1150 // Start up the delivery again a little later.
1151 scheduler.executeAfterDelay(new Runnable() {
1152 public void run() {
1153 try {
1154 if (started.get()) {
1155 start();
1156 }
1157 } catch (JMSException e) {
1158 session.connection.onAsyncException(e);
1159 }
1160 }
1161 }, redeliveryDelay);
1162 } else {
1163 start();
1164 }
1165
1166 }
1167 deliveredCounter -= deliveredMessages.size();
1168 deliveredMessages.clear();
1169 }
1170 }
1171 if (messageListener.get() != null) {
1172 session.redispatch(this, unconsumedMessages);
1173 }
1174 }
1175
1176 /*
1177 * called with unconsumedMessages && deliveredMessages locked
1178 * remove any message not re-delivered as they can't be replayed to this
1179 * consumer on rollback
1180 */
1181 private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1182 if (previouslyDeliveredMessages != null) {
1183 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1184 if (!entry.getValue()) {
1185 removeFromDeliveredMessages(entry.getKey());
1186 }
1187 }
1188 clearPreviouslyDelivered();
1189 }
1190 }
1191
1192 /*
1193 * called with deliveredMessages locked
1194 */
1195 private void removeFromDeliveredMessages(MessageId key) {
1196 Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1197 while (iterator.hasNext()) {
1198 MessageDispatch candidate = iterator.next();
1199 if (key.equals(candidate.getMessage().getMessageId())) {
1200 session.connection.rollbackDuplicate(this, candidate.getMessage());
1201 iterator.remove();
1202 break;
1203 }
1204 }
1205 }
1206 /*
1207 * called with deliveredMessages locked
1208 */
1209 private void clearPreviouslyDelivered() {
1210 if (previouslyDeliveredMessages != null) {
1211 previouslyDeliveredMessages.clear();
1212 previouslyDeliveredMessages = null;
1213 }
1214 }
1215
1216 public void dispatch(MessageDispatch md) {
1217 MessageListener listener = this.messageListener.get();
1218 try {
1219 clearMessagesInProgress();
1220 clearDispatchList();
1221 synchronized (unconsumedMessages.getMutex()) {
1222 if (!unconsumedMessages.isClosed()) {
1223 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1224 if (listener != null && unconsumedMessages.isRunning()) {
1225 ActiveMQMessage message = createActiveMQMessage(md);
1226 beforeMessageIsConsumed(md);
1227 try {
1228 boolean expired = message.isExpired();
1229 if (!expired) {
1230 listener.onMessage(message);
1231 }
1232 afterMessageIsConsumed(md, expired);
1233 } catch (RuntimeException e) {
1234 LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1235 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1236 // schedual redelivery and possible dlq processing
1237 md.setRollbackCause(e);
1238 rollback();
1239 } else {
1240 // Transacted or Client ack: Deliver the
1241 // next message.
1242 afterMessageIsConsumed(md, false);
1243 }
1244 }
1245 } else {
1246 if (!unconsumedMessages.isRunning()) {
1247 // delayed redelivery, ensure it can be re delivered
1248 session.connection.rollbackDuplicate(this, md.getMessage());
1249 }
1250 unconsumedMessages.enqueue(md);
1251 if (availableListener != null) {
1252 availableListener.onMessageAvailable(this);
1253 }
1254 }
1255 } else {
1256 if (!session.isTransacted()) {
1257 if (LOG.isDebugEnabled()) {
1258 LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage());
1259 }
1260 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
1261 session.sendAck(ack);
1262 } else {
1263 if (LOG.isDebugEnabled()) {
1264 LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1265 }
1266 boolean needsPoisonAck = false;
1267 synchronized (deliveredMessages) {
1268 if (previouslyDeliveredMessages != null) {
1269 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1270 } else {
1271 // delivery while pending redelivery to another consumer on the same connection
1272 // not waiting for redelivery will help here
1273 needsPoisonAck = true;
1274 }
1275 }
1276 if (needsPoisonAck) {
1277 LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1278 + " consumer on this connection, failoverRedeliveryWaitPeriod="
1279 + failoverRedeliveryWaitPeriod + ". Message: " + md);
1280 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1281 poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1282 session.sendAck(poisonAck);
1283 } else {
1284 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1285 }
1286 }
1287 }
1288 }
1289 }
1290 if (++dispatchedCount % 1000 == 0) {
1291 dispatchedCount = 0;
1292 Thread.yield();
1293 }
1294 } catch (Exception e) {
1295 session.connection.onClientInternalException(e);
1296 }
1297 }
1298
1299 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1300 private void clearDispatchList() {
1301 if (clearDispatchList) {
1302 synchronized (deliveredMessages) {
1303 if (clearDispatchList) {
1304 if (!deliveredMessages.isEmpty()) {
1305 if (session.isTransacted()) {
1306 if (LOG.isDebugEnabled()) {
1307 LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1308 }
1309 if (previouslyDeliveredMessages == null) {
1310 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1311 }
1312 for (MessageDispatch delivered : deliveredMessages) {
1313 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1314 }
1315 } else {
1316 if (LOG.isDebugEnabled()) {
1317 LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1318 }
1319 deliveredMessages.clear();
1320 pendingAck = null;
1321 }
1322 }
1323 clearDispatchList = false;
1324 }
1325 }
1326 }
1327 }
1328
1329 public int getMessageSize() {
1330 return unconsumedMessages.size();
1331 }
1332
1333 public void start() throws JMSException {
1334 if (unconsumedMessages.isClosed()) {
1335 return;
1336 }
1337 started.set(true);
1338 unconsumedMessages.start();
1339 session.executor.wakeup();
1340 }
1341
1342 public void stop() {
1343 started.set(false);
1344 unconsumedMessages.stop();
1345 }
1346
1347 @Override
1348 public String toString() {
1349 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1350 + " }";
1351 }
1352
1353 /**
1354 * Delivers a message to the message listener.
1355 *
1356 * @return
1357 * @throws JMSException
1358 */
1359 public boolean iterate() {
1360 MessageListener listener = this.messageListener.get();
1361 if (listener != null) {
1362 MessageDispatch md = unconsumedMessages.dequeueNoWait();
1363 if (md != null) {
1364 dispatch(md);
1365 return true;
1366 }
1367 }
1368 return false;
1369 }
1370
1371 public boolean isInUse(ActiveMQTempDestination destination) {
1372 return info.getDestination().equals(destination);
1373 }
1374
1375 public long getLastDeliveredSequenceId() {
1376 return lastDeliveredSequenceId;
1377 }
1378
1379 public IOException getFailureError() {
1380 return failureError;
1381 }
1382
1383 public void setFailureError(IOException failureError) {
1384 this.failureError = failureError;
1385 }
1386 }