001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.util.Set;
020    import java.util.concurrent.CopyOnWriteArraySet;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    import java.util.concurrent.atomic.AtomicInteger;
023    
024    import org.apache.activemq.command.ConsumerId;
025    import org.apache.activemq.command.ConsumerInfo;
026    import org.apache.activemq.command.NetworkBridgeFilter;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * Represents a network bridge interface
032     * 
033     * 
034     */
035    public class DemandSubscription {
036        private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
037    
038        private final ConsumerInfo remoteInfo;
039        private final ConsumerInfo localInfo;
040        private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
041    
042        private AtomicInteger dispatched = new AtomicInteger(0);
043        private AtomicBoolean activeWaiter = new AtomicBoolean();
044        private NetworkBridgeFilter networkBridgeFilter;
045    
046        DemandSubscription(ConsumerInfo info) {
047            remoteInfo = info;
048            localInfo = info.copy();
049            localInfo.setNetworkSubscription(true);
050            remoteSubsIds.add(info.getConsumerId());
051        }
052    
053        /**
054         * Increment the consumers associated with this subscription
055         * 
056         * @param id
057         * @return true if added
058         */
059        public boolean add(ConsumerId id) {
060            return remoteSubsIds.add(id);
061        }
062    
063        /**
064         * Increment the consumers associated with this subscription
065         * 
066         * @param id
067         * @return true if removed
068         */
069        public boolean remove(ConsumerId id) {
070            return remoteSubsIds.remove(id);
071        }
072    
073        /**
074         * @return true if there are no interested consumers
075         */
076        public boolean isEmpty() {
077            return remoteSubsIds.isEmpty();
078        }
079    
080        public int size() {
081            return remoteSubsIds.size();
082        }
083        /**
084         * @return Returns the localInfo.
085         */
086        public ConsumerInfo getLocalInfo() {
087            return localInfo;
088        }
089    
090        /**
091         * @return Returns the remoteInfo.
092         */
093        public ConsumerInfo getRemoteInfo() {
094            return remoteInfo;
095        }
096    
097        public void waitForCompletion() {
098            if (dispatched.get() > 0) {
099                if (LOG.isDebugEnabled()) {
100                    LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
101                }
102                activeWaiter.set(true);
103                if (dispatched.get() > 0) {
104                    synchronized (activeWaiter) {
105                        try {
106                            activeWaiter.wait();
107                        } catch (InterruptedException ignored) {
108                        }
109                    }
110                    if (this.dispatched.get() > 0) {
111                        LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
112                    }
113                }
114            }
115        }
116    
117        public void decrementOutstandingResponses() {
118            if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
119                synchronized (activeWaiter) {
120                    activeWaiter.notifyAll();
121                }
122            }
123        }
124    
125        public boolean incrementOutstandingResponses() {
126            dispatched.incrementAndGet();
127            if (activeWaiter.get()) {
128                decrementOutstandingResponses();
129                return false;
130            }
131            return true;
132        }
133    
134        public NetworkBridgeFilter getNetworkBridgeFilter() {
135            return networkBridgeFilter;
136        }
137    
138        public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
139            this.networkBridgeFilter = networkBridgeFilter;
140        }
141    }