001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.state;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.Map;
023 import java.util.Vector;
024 import java.util.Map.Entry;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import javax.jms.TransactionRolledBackException;
028 import javax.transaction.xa.XAResource;
029
030 import org.apache.activemq.command.Command;
031 import org.apache.activemq.command.ConnectionId;
032 import org.apache.activemq.command.ConnectionInfo;
033 import org.apache.activemq.command.ConsumerControl;
034 import org.apache.activemq.command.ConsumerId;
035 import org.apache.activemq.command.ConsumerInfo;
036 import org.apache.activemq.command.DestinationInfo;
037 import org.apache.activemq.command.ExceptionResponse;
038 import org.apache.activemq.command.IntegerResponse;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessageId;
041 import org.apache.activemq.command.MessagePull;
042 import org.apache.activemq.command.ProducerId;
043 import org.apache.activemq.command.ProducerInfo;
044 import org.apache.activemq.command.Response;
045 import org.apache.activemq.command.SessionId;
046 import org.apache.activemq.command.SessionInfo;
047 import org.apache.activemq.command.TransactionInfo;
048 import org.apache.activemq.transport.Transport;
049 import org.apache.activemq.util.IOExceptionSupport;
050 import org.slf4j.Logger;
051 import org.slf4j.LoggerFactory;
052
053 /**
054 * Tracks the state of a connection so a newly established transport can be
055 * re-initialized to the state that was tracked.
056 *
057 *
058 */
059 public class ConnectionStateTracker extends CommandVisitorAdapter {
060 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
061
062 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
063
064 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
065
066 private boolean trackTransactions;
067 private boolean restoreSessions = true;
068 private boolean restoreConsumers = true;
069 private boolean restoreProducers = true;
070 private boolean restoreTransaction = true;
071 private boolean trackMessages = true;
072 private boolean trackTransactionProducers = true;
073 private int maxCacheSize = 128 * 1024;
074 private int currentCacheSize;
075 private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
076 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
077 boolean result = currentCacheSize > maxCacheSize;
078 if (result) {
079 if (eldest.getValue() instanceof Message) {
080 currentCacheSize -= ((Message)eldest.getValue()).getSize();
081 }
082 }
083 return result;
084 }
085 };
086
087 private class RemoveTransactionAction implements ResponseHandler {
088 private final TransactionInfo info;
089
090 public RemoveTransactionAction(TransactionInfo info) {
091 this.info = info;
092 }
093
094 public void onResponse(Command response) {
095 ConnectionId connectionId = info.getConnectionId();
096 ConnectionState cs = connectionStates.get(connectionId);
097 cs.removeTransactionState(info.getTransactionId());
098 }
099 }
100
101 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
102
103 public PrepareReadonlyTransactionAction(TransactionInfo info) {
104 super(info);
105 }
106
107 public void onResponse(Command command) {
108 IntegerResponse response = (IntegerResponse) command;
109 if (XAResource.XA_RDONLY == response.getResult()) {
110 // all done, no commit or rollback from TM
111 super.onResponse(command);
112 }
113 }
114 }
115
116 /**
117 *
118 *
119 * @param command
120 * @return null if the command is not state tracked.
121 * @throws IOException
122 */
123 public Tracked track(Command command) throws IOException {
124 try {
125 return (Tracked)command.visit(this);
126 } catch (IOException e) {
127 throw e;
128 } catch (Throwable e) {
129 throw IOExceptionSupport.create(e);
130 }
131 }
132
133 public void trackBack(Command command) {
134 if (command != null) {
135 if (trackMessages && command.isMessage()) {
136 Message message = (Message) command;
137 if (message.getTransactionId()==null) {
138 currentCacheSize = currentCacheSize + message.getSize();
139 }
140 } else if (command instanceof MessagePull) {
141 // just needs to be a rough estimate of size, ~4 identifiers
142 currentCacheSize += 400;
143 }
144 }
145 }
146
147 public void restore(Transport transport) throws IOException {
148 // Restore the connections.
149 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
150 ConnectionState connectionState = iter.next();
151 connectionState.getInfo().setFailoverReconnect(true);
152 if (LOG.isDebugEnabled()) {
153 LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
154 }
155 transport.oneway(connectionState.getInfo());
156 restoreTempDestinations(transport, connectionState);
157
158 if (restoreSessions) {
159 restoreSessions(transport, connectionState);
160 }
161
162 if (restoreTransaction) {
163 restoreTransactions(transport, connectionState);
164 }
165 }
166 //now flush messages
167 for (Command msg:messageCache.values()) {
168 if (LOG.isDebugEnabled()) {
169 LOG.debug("command: " + msg.getCommandId());
170 }
171 transport.oneway(msg);
172 }
173 }
174
175 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
176 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
177 for (TransactionState transactionState : connectionState.getTransactionStates()) {
178 if (LOG.isDebugEnabled()) {
179 LOG.debug("tx: " + transactionState.getId());
180 }
181
182 // rollback any completed transactions - no way to know if commit got there
183 // or if reply went missing
184 //
185 if (!transactionState.getCommands().isEmpty()) {
186 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
187 if (lastCommand instanceof TransactionInfo) {
188 TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
189 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
190 if (LOG.isDebugEnabled()) {
191 LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
192 }
193 toRollback.add(transactionInfo);
194 continue;
195 }
196 }
197 }
198
199 // replay short lived producers that may have been involved in the transaction
200 for (ProducerState producerState : transactionState.getProducerStates().values()) {
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("tx replay producer :" + producerState.getInfo());
203 }
204 transport.oneway(producerState.getInfo());
205 }
206
207 for (Command command : transactionState.getCommands()) {
208 if (LOG.isDebugEnabled()) {
209 LOG.debug("tx replay: " + command);
210 }
211 transport.oneway(command);
212 }
213
214 for (ProducerState producerState : transactionState.getProducerStates().values()) {
215 if (LOG.isDebugEnabled()) {
216 LOG.debug("tx remove replayed producer :" + producerState.getInfo());
217 }
218 transport.oneway(producerState.getInfo().createRemoveCommand());
219 }
220 }
221
222 for (TransactionInfo command: toRollback) {
223 // respond to the outstanding commit
224 ExceptionResponse response = new ExceptionResponse();
225 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
226 response.setCorrelationId(command.getCommandId());
227 transport.getTransportListener().onCommand(response);
228 }
229 }
230
231 /**
232 * @param transport
233 * @param connectionState
234 * @throws IOException
235 */
236 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
237 // Restore the connection's sessions
238 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
239 SessionState sessionState = (SessionState)iter2.next();
240 if (LOG.isDebugEnabled()) {
241 LOG.debug("session: " + sessionState.getInfo().getSessionId());
242 }
243 transport.oneway(sessionState.getInfo());
244
245 if (restoreProducers) {
246 restoreProducers(transport, sessionState);
247 }
248
249 if (restoreConsumers) {
250 restoreConsumers(transport, sessionState);
251 }
252 }
253 }
254
255 /**
256 * @param transport
257 * @param sessionState
258 * @throws IOException
259 */
260 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
261 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
262 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
263 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
264 for (ConsumerState consumerState : sessionState.getConsumerStates()) {
265 ConsumerInfo infoToSend = consumerState.getInfo();
266 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
267 infoToSend = consumerState.getInfo().copy();
268 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
269 infoToSend.setPrefetchSize(0);
270 if (LOG.isDebugEnabled()) {
271 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
272 }
273 }
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("restore consumer: " + infoToSend.getConsumerId());
276 }
277 transport.oneway(infoToSend);
278 }
279 }
280
281 /**
282 * @param transport
283 * @param sessionState
284 * @throws IOException
285 */
286 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
287 // Restore the session's producers
288 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
289 ProducerState producerState = (ProducerState)iter3.next();
290 if (LOG.isDebugEnabled()) {
291 LOG.debug("producer: " + producerState.getInfo().getProducerId());
292 }
293 transport.oneway(producerState.getInfo());
294 }
295 }
296
297 /**
298 * @param transport
299 * @param connectionState
300 * @throws IOException
301 */
302 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
303 throws IOException {
304 // Restore the connection's temp destinations.
305 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
306 transport.oneway((DestinationInfo)iter2.next());
307 }
308 }
309
310 public Response processAddDestination(DestinationInfo info) {
311 if (info != null) {
312 ConnectionState cs = connectionStates.get(info.getConnectionId());
313 if (cs != null && info.getDestination().isTemporary()) {
314 cs.addTempDestination(info);
315 }
316 }
317 return TRACKED_RESPONSE_MARKER;
318 }
319
320 public Response processRemoveDestination(DestinationInfo info) {
321 if (info != null) {
322 ConnectionState cs = connectionStates.get(info.getConnectionId());
323 if (cs != null && info.getDestination().isTemporary()) {
324 cs.removeTempDestination(info.getDestination());
325 }
326 }
327 return TRACKED_RESPONSE_MARKER;
328 }
329
330 public Response processAddProducer(ProducerInfo info) {
331 if (info != null && info.getProducerId() != null) {
332 SessionId sessionId = info.getProducerId().getParentId();
333 if (sessionId != null) {
334 ConnectionId connectionId = sessionId.getParentId();
335 if (connectionId != null) {
336 ConnectionState cs = connectionStates.get(connectionId);
337 if (cs != null) {
338 SessionState ss = cs.getSessionState(sessionId);
339 if (ss != null) {
340 ss.addProducer(info);
341 }
342 }
343 }
344 }
345 }
346 return TRACKED_RESPONSE_MARKER;
347 }
348
349 public Response processRemoveProducer(ProducerId id) {
350 if (id != null) {
351 SessionId sessionId = id.getParentId();
352 if (sessionId != null) {
353 ConnectionId connectionId = sessionId.getParentId();
354 if (connectionId != null) {
355 ConnectionState cs = connectionStates.get(connectionId);
356 if (cs != null) {
357 SessionState ss = cs.getSessionState(sessionId);
358 if (ss != null) {
359 ss.removeProducer(id);
360 }
361 }
362 }
363 }
364 }
365 return TRACKED_RESPONSE_MARKER;
366 }
367
368 public Response processAddConsumer(ConsumerInfo info) {
369 if (info != null) {
370 SessionId sessionId = info.getConsumerId().getParentId();
371 if (sessionId != null) {
372 ConnectionId connectionId = sessionId.getParentId();
373 if (connectionId != null) {
374 ConnectionState cs = connectionStates.get(connectionId);
375 if (cs != null) {
376 SessionState ss = cs.getSessionState(sessionId);
377 if (ss != null) {
378 ss.addConsumer(info);
379 }
380 }
381 }
382 }
383 }
384 return TRACKED_RESPONSE_MARKER;
385 }
386
387 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
388 if (id != null) {
389 SessionId sessionId = id.getParentId();
390 if (sessionId != null) {
391 ConnectionId connectionId = sessionId.getParentId();
392 if (connectionId != null) {
393 ConnectionState cs = connectionStates.get(connectionId);
394 if (cs != null) {
395 SessionState ss = cs.getSessionState(sessionId);
396 if (ss != null) {
397 ss.removeConsumer(id);
398 }
399 }
400 }
401 }
402 }
403 return TRACKED_RESPONSE_MARKER;
404 }
405
406 public Response processAddSession(SessionInfo info) {
407 if (info != null) {
408 ConnectionId connectionId = info.getSessionId().getParentId();
409 if (connectionId != null) {
410 ConnectionState cs = connectionStates.get(connectionId);
411 if (cs != null) {
412 cs.addSession(info);
413 }
414 }
415 }
416 return TRACKED_RESPONSE_MARKER;
417 }
418
419 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
420 if (id != null) {
421 ConnectionId connectionId = id.getParentId();
422 if (connectionId != null) {
423 ConnectionState cs = connectionStates.get(connectionId);
424 if (cs != null) {
425 cs.removeSession(id);
426 }
427 }
428 }
429 return TRACKED_RESPONSE_MARKER;
430 }
431
432 public Response processAddConnection(ConnectionInfo info) {
433 if (info != null) {
434 connectionStates.put(info.getConnectionId(), new ConnectionState(info));
435 }
436 return TRACKED_RESPONSE_MARKER;
437 }
438
439 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
440 if (id != null) {
441 connectionStates.remove(id);
442 }
443 return TRACKED_RESPONSE_MARKER;
444 }
445
446 public Response processMessage(Message send) throws Exception {
447 if (send != null) {
448 if (trackTransactions && send.getTransactionId() != null) {
449 ProducerId producerId = send.getProducerId();
450 ConnectionId connectionId = producerId.getParentId().getParentId();
451 if (connectionId != null) {
452 ConnectionState cs = connectionStates.get(connectionId);
453 if (cs != null) {
454 TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
455 if (transactionState != null) {
456 transactionState.addCommand(send);
457
458 if (trackTransactionProducers) {
459 // for jmstemplate, track the producer in case it is closed before commit
460 // and needs to be replayed
461 SessionState ss = cs.getSessionState(producerId.getParentId());
462 ProducerState producerState = ss.getProducerState(producerId);
463 producerState.setTransactionState(transactionState);
464 }
465 }
466 }
467 }
468 return TRACKED_RESPONSE_MARKER;
469 }else if (trackMessages) {
470 messageCache.put(send.getMessageId(), send.copy());
471 }
472 }
473 return null;
474 }
475
476 public Response processBeginTransaction(TransactionInfo info) {
477 if (trackTransactions && info != null && info.getTransactionId() != null) {
478 ConnectionId connectionId = info.getConnectionId();
479 if (connectionId != null) {
480 ConnectionState cs = connectionStates.get(connectionId);
481 if (cs != null) {
482 cs.addTransactionState(info.getTransactionId());
483 TransactionState state = cs.getTransactionState(info.getTransactionId());
484 state.addCommand(info);
485 }
486 }
487 return TRACKED_RESPONSE_MARKER;
488 }
489 return null;
490 }
491
492 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
493 if (trackTransactions && info != null) {
494 ConnectionId connectionId = info.getConnectionId();
495 if (connectionId != null) {
496 ConnectionState cs = connectionStates.get(connectionId);
497 if (cs != null) {
498 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
499 if (transactionState != null) {
500 transactionState.addCommand(info);
501 return new Tracked(new PrepareReadonlyTransactionAction(info));
502 }
503 }
504 }
505 }
506 return null;
507 }
508
509 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
510 if (trackTransactions && info != null) {
511 ConnectionId connectionId = info.getConnectionId();
512 if (connectionId != null) {
513 ConnectionState cs = connectionStates.get(connectionId);
514 if (cs != null) {
515 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
516 if (transactionState != null) {
517 transactionState.addCommand(info);
518 return new Tracked(new RemoveTransactionAction(info));
519 }
520 }
521 }
522 }
523 return null;
524 }
525
526 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
527 if (trackTransactions && info != null) {
528 ConnectionId connectionId = info.getConnectionId();
529 if (connectionId != null) {
530 ConnectionState cs = connectionStates.get(connectionId);
531 if (cs != null) {
532 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
533 if (transactionState != null) {
534 transactionState.addCommand(info);
535 return new Tracked(new RemoveTransactionAction(info));
536 }
537 }
538 }
539 }
540 return null;
541 }
542
543 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
544 if (trackTransactions && info != null) {
545 ConnectionId connectionId = info.getConnectionId();
546 if (connectionId != null) {
547 ConnectionState cs = connectionStates.get(connectionId);
548 if (cs != null) {
549 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
550 if (transactionState != null) {
551 transactionState.addCommand(info);
552 return new Tracked(new RemoveTransactionAction(info));
553 }
554 }
555 }
556 }
557 return null;
558 }
559
560 public Response processEndTransaction(TransactionInfo info) throws Exception {
561 if (trackTransactions && info != null) {
562 ConnectionId connectionId = info.getConnectionId();
563 if (connectionId != null) {
564 ConnectionState cs = connectionStates.get(connectionId);
565 if (cs != null) {
566 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
567 if (transactionState != null) {
568 transactionState.addCommand(info);
569 }
570 }
571 }
572 return TRACKED_RESPONSE_MARKER;
573 }
574 return null;
575 }
576
577 @Override
578 public Response processMessagePull(MessagePull pull) throws Exception {
579 if (pull != null) {
580 // leave a single instance in the cache
581 final String id = pull.getDestination() + "::" + pull.getConsumerId();
582 messageCache.put(id.intern(), pull);
583 }
584 return null;
585 }
586
587 public boolean isRestoreConsumers() {
588 return restoreConsumers;
589 }
590
591 public void setRestoreConsumers(boolean restoreConsumers) {
592 this.restoreConsumers = restoreConsumers;
593 }
594
595 public boolean isRestoreProducers() {
596 return restoreProducers;
597 }
598
599 public void setRestoreProducers(boolean restoreProducers) {
600 this.restoreProducers = restoreProducers;
601 }
602
603 public boolean isRestoreSessions() {
604 return restoreSessions;
605 }
606
607 public void setRestoreSessions(boolean restoreSessions) {
608 this.restoreSessions = restoreSessions;
609 }
610
611 public boolean isTrackTransactions() {
612 return trackTransactions;
613 }
614
615 public void setTrackTransactions(boolean trackTransactions) {
616 this.trackTransactions = trackTransactions;
617 }
618
619 public boolean isTrackTransactionProducers() {
620 return this.trackTransactionProducers;
621 }
622
623 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
624 this.trackTransactionProducers = trackTransactionProducers;
625 }
626
627 public boolean isRestoreTransaction() {
628 return restoreTransaction;
629 }
630
631 public void setRestoreTransaction(boolean restoreTransaction) {
632 this.restoreTransaction = restoreTransaction;
633 }
634
635 public boolean isTrackMessages() {
636 return trackMessages;
637 }
638
639 public void setTrackMessages(boolean trackMessages) {
640 this.trackMessages = trackMessages;
641 }
642
643 public int getMaxCacheSize() {
644 return maxCacheSize;
645 }
646
647 public void setMaxCacheSize(int maxCacheSize) {
648 this.maxCacheSize = maxCacheSize;
649 }
650
651 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
652 ConnectionState connectionState = connectionStates.get(connectionId);
653 if (connectionState != null) {
654 connectionState.setConnectionInterruptProcessingComplete(true);
655 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
656 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
657 ConsumerControl control = new ConsumerControl();
658 control.setConsumerId(entry.getKey());
659 control.setPrefetch(entry.getValue().getPrefetchSize());
660 control.setDestination(entry.getValue().getDestination());
661 try {
662 if (LOG.isDebugEnabled()) {
663 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
664 }
665 transport.oneway(control);
666 } catch (Exception ex) {
667 if (LOG.isDebugEnabled()) {
668 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
669 + " with: " + control.getPrefetch(), ex);
670 }
671 }
672 }
673 stalledConsumers.clear();
674 }
675 }
676
677 public void transportInterrupted(ConnectionId connectionId) {
678 ConnectionState connectionState = connectionStates.get(connectionId);
679 if (connectionState != null) {
680 connectionState.setConnectionInterruptProcessingComplete(false);
681 }
682 }
683 }