001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.Iterator;
020import java.util.Set;
021
022import javax.jms.JMSException;
023
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.policy.PolicyEntry;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ConsumerInfo;
028import org.apache.activemq.command.MessageDispatchNotification;
029import org.apache.activemq.thread.TaskRunnerFactory;
030import org.apache.activemq.usage.SystemUsage;
031
032/**
033 * 
034 * 
035 */
036public class QueueRegion extends AbstractRegion {
037
038    public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
039                       SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
040                       DestinationFactory destinationFactory) {
041        super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
042    }
043
044    public String toString() {
045        return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
046               + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
047    }
048
049    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
050        throws JMSException {
051        ActiveMQDestination destination = info.getDestination();
052        PolicyEntry entry = null;
053        if (destination != null && broker.getDestinationPolicy() != null) {
054            entry = broker.getDestinationPolicy().getEntryFor(destination);
055            
056        }
057        if (info.isBrowser()) {
058            QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info);
059            if (entry != null) {
060                entry.configure(broker, usageManager, sub);
061            }
062            return sub;
063        } else {
064            QueueSubscription sub =   new QueueSubscription(broker, usageManager,context, info);
065            if (entry != null) {
066                entry.configure(broker, usageManager, sub);
067            }
068            return sub;
069        }
070    }
071
072    protected Set<ActiveMQDestination> getInactiveDestinations() {
073        Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
074        for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
075            ActiveMQDestination dest = iter.next();
076            if (!dest.isQueue()) {
077                iter.remove();
078            }
079        }
080        return inactiveDestinations;
081    }
082    
083    /*
084     * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till 
085     * the notification to ensure that the subscription chosen by the master is used.
086     * 
087     * (non-Javadoc)
088     * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
089     */
090    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
091        processDispatchNotificationViaDestination(messageDispatchNotification);
092    }
093}