001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.util.Set;
020import java.util.concurrent.CopyOnWriteArraySet;
021import java.util.concurrent.atomic.AtomicBoolean;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.activemq.command.ConsumerId;
025import org.apache.activemq.command.ConsumerInfo;
026import org.apache.activemq.command.NetworkBridgeFilter;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Represents a network bridge interface
032 * 
033 * 
034 */
035public class DemandSubscription {
036    private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
037
038    private final ConsumerInfo remoteInfo;
039    private final ConsumerInfo localInfo;
040    private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
041
042    private AtomicInteger dispatched = new AtomicInteger(0);
043    private AtomicBoolean activeWaiter = new AtomicBoolean();
044    private NetworkBridgeFilter networkBridgeFilter;
045
046    DemandSubscription(ConsumerInfo info) {
047        remoteInfo = info;
048        localInfo = info.copy();
049        localInfo.setNetworkSubscription(true);
050        remoteSubsIds.add(info.getConsumerId());
051    }
052
053    /**
054     * Increment the consumers associated with this subscription
055     * 
056     * @param id
057     * @return true if added
058     */
059    public boolean add(ConsumerId id) {
060        return remoteSubsIds.add(id);
061    }
062
063    /**
064     * Increment the consumers associated with this subscription
065     * 
066     * @param id
067     * @return true if removed
068     */
069    public boolean remove(ConsumerId id) {
070        return remoteSubsIds.remove(id);
071    }
072
073    /**
074     * @return true if there are no interested consumers
075     */
076    public boolean isEmpty() {
077        return remoteSubsIds.isEmpty();
078    }
079
080    public int size() {
081        return remoteSubsIds.size();
082    }
083    /**
084     * @return Returns the localInfo.
085     */
086    public ConsumerInfo getLocalInfo() {
087        return localInfo;
088    }
089
090    /**
091     * @return Returns the remoteInfo.
092     */
093    public ConsumerInfo getRemoteInfo() {
094        return remoteInfo;
095    }
096
097    public void waitForCompletion() {
098        if (dispatched.get() > 0) {
099            if (LOG.isDebugEnabled()) {
100                LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
101            }
102            activeWaiter.set(true);
103            if (dispatched.get() > 0) {
104                synchronized (activeWaiter) {
105                    try {
106                        activeWaiter.wait();
107                    } catch (InterruptedException ignored) {
108                    }
109                }
110                if (this.dispatched.get() > 0) {
111                    LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
112                }
113            }
114        }
115    }
116
117    public void decrementOutstandingResponses() {
118        if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
119            synchronized (activeWaiter) {
120                activeWaiter.notifyAll();
121            }
122        }
123    }
124
125    public boolean incrementOutstandingResponses() {
126        dispatched.incrementAndGet();
127        if (activeWaiter.get()) {
128            decrementOutstandingResponses();
129            return false;
130        }
131        return true;
132    }
133
134    public NetworkBridgeFilter getNetworkBridgeFilter() {
135        return networkBridgeFilter;
136    }
137
138    public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
139        this.networkBridgeFilter = networkBridgeFilter;
140    }
141}