001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.util.Set;
020 import java.util.concurrent.CopyOnWriteArraySet;
021 import java.util.concurrent.atomic.AtomicBoolean;
022 import java.util.concurrent.atomic.AtomicInteger;
023
024 import org.apache.activemq.command.ConsumerId;
025 import org.apache.activemq.command.ConsumerInfo;
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 /**
030 * Represents a network bridge interface
031 *
032 *
033 */
034 public class DemandSubscription {
035 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
036
037 private final ConsumerInfo remoteInfo;
038 private final ConsumerInfo localInfo;
039 private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
040
041 private AtomicInteger dispatched = new AtomicInteger(0);
042 private AtomicBoolean activeWaiter = new AtomicBoolean();
043
044 DemandSubscription(ConsumerInfo info) {
045 remoteInfo = info;
046 localInfo = info.copy();
047 localInfo.setNetworkSubscription(true);
048 remoteSubsIds.add(info.getConsumerId());
049 }
050
051 /**
052 * Increment the consumers associated with this subscription
053 *
054 * @param id
055 * @return true if added
056 */
057 public boolean add(ConsumerId id) {
058 return remoteSubsIds.add(id);
059 }
060
061 /**
062 * Increment the consumers associated with this subscription
063 *
064 * @param id
065 * @return true if removed
066 */
067 public boolean remove(ConsumerId id) {
068 return remoteSubsIds.remove(id);
069 }
070
071 /**
072 * @return true if there are no interested consumers
073 */
074 public boolean isEmpty() {
075 return remoteSubsIds.isEmpty();
076 }
077
078 /**
079 * @return Returns the localInfo.
080 */
081 public ConsumerInfo getLocalInfo() {
082 return localInfo;
083 }
084
085 /**
086 * @return Returns the remoteInfo.
087 */
088 public ConsumerInfo getRemoteInfo() {
089 return remoteInfo;
090 }
091
092 public void waitForCompletion() {
093 if (dispatched.get() > 0) {
094 if (LOG.isDebugEnabled()) {
095 LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
096 }
097 activeWaiter.set(true);
098 if (dispatched.get() > 0) {
099 synchronized (activeWaiter) {
100 try {
101 activeWaiter.wait();
102 } catch (InterruptedException ignored) {
103 }
104 }
105 if (this.dispatched.get() > 0) {
106 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
107 }
108 }
109 }
110 }
111
112 public void decrementOutstandingResponses() {
113 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
114 synchronized (activeWaiter) {
115 activeWaiter.notifyAll();
116 }
117 }
118 }
119
120 public boolean incrementOutstandingResponses() {
121 dispatched.incrementAndGet();
122 if (activeWaiter.get()) {
123 decrementOutstandingResponses();
124 return false;
125 }
126 return true;
127 }
128 }