001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Set;
021import org.apache.activemq.advisory.AdvisorySupport;
022import org.apache.activemq.broker.BrokerService;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.policy.PolicyEntry;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQQueue;
027import org.apache.activemq.command.ActiveMQTempDestination;
028import org.apache.activemq.command.ActiveMQTopic;
029import org.apache.activemq.command.SubscriptionInfo;
030import org.apache.activemq.store.MessageStore;
031import org.apache.activemq.store.PersistenceAdapter;
032import org.apache.activemq.store.TopicMessageStore;
033import org.apache.activemq.thread.TaskRunnerFactory;
034
035/**
036 * Creates standard ActiveMQ implementations of
037 * {@link org.apache.activemq.broker.region.Destination}.
038 * 
039 * @author fateev@amazon.com
040 * 
041 */
042public class DestinationFactoryImpl extends DestinationFactory {
043
044    protected final TaskRunnerFactory taskRunnerFactory;
045    protected final PersistenceAdapter persistenceAdapter;
046    protected RegionBroker broker;
047    private final BrokerService brokerService;
048
049    public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
050        this.brokerService = brokerService;
051        this.taskRunnerFactory = taskRunnerFactory;
052        if (persistenceAdapter == null) {
053            throw new IllegalArgumentException("null persistenceAdapter");
054        }
055        this.persistenceAdapter = persistenceAdapter;
056    }
057
058    @Override
059    public void setRegionBroker(RegionBroker broker) {
060        if (broker == null) {
061            throw new IllegalArgumentException("null broker");
062        }
063        this.broker = broker;
064    }
065
066    @Override
067    public Set<ActiveMQDestination> getDestinations() {
068        return persistenceAdapter.getDestinations();
069    }
070
071    /**
072     * @return instance of {@link Queue} or {@link Topic}
073     */
074    @Override
075    public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
076        if (destination.isQueue()) {
077            if (destination.isTemporary()) {
078                final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
079                Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
080                configureQueue(queue, destination);
081                queue.initialize();
082                return queue;
083            } else {
084                MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
085                Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
086                configureQueue(queue, destination);
087                queue.initialize();
088                return queue;
089            }
090        } else if (destination.isTemporary()) {
091            
092            Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
093            configureTopic(topic, destination);
094            topic.initialize();
095            return topic;
096        } else {
097            TopicMessageStore store = null;
098            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
099                store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
100            }
101            Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
102            configureTopic(topic, destination);
103            topic.initialize();
104            return topic;
105        }
106    }
107
108    @Override
109    public void removeDestination(Destination dest) {
110        ActiveMQDestination destination = dest.getActiveMQDestination();
111        if (!destination.isTemporary()) {
112            if (destination.isQueue()) {
113                persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination);
114            }
115            else if (!AdvisorySupport.isAdvisoryTopic(destination)) {
116                persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination);
117            }
118        }
119    }
120
121    protected void configureQueue(Queue queue, ActiveMQDestination destination) {
122        if (broker == null) {
123            throw new IllegalStateException("broker property is not set");
124        }
125        if (broker.getDestinationPolicy() != null) {
126            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
127            if (entry != null) {
128                entry.configure(broker,queue);
129            }
130        }
131    }
132
133    protected void configureTopic(Topic topic, ActiveMQDestination destination) {
134        if (broker == null) {
135            throw new IllegalStateException("broker property is not set");
136        }
137        if (broker.getDestinationPolicy() != null) {
138            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
139            if (entry != null) {
140                entry.configure(broker,topic);
141            }
142        }
143    }
144
145    @Override
146    public long getLastMessageBrokerSequenceId() throws IOException {
147        return persistenceAdapter.getLastMessageBrokerSequenceId();
148    }
149
150    public PersistenceAdapter getPersistenceAdapter() {
151        return persistenceAdapter;
152    }
153
154    @Override
155    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
156        return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
157    }
158}