001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.Set;
020import java.util.concurrent.CopyOnWriteArraySet;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import javax.jms.Connection;
024import javax.jms.JMSException;
025import javax.jms.Message;
026import javax.jms.MessageConsumer;
027import javax.jms.MessageListener;
028import javax.jms.Session;
029
030import org.apache.activemq.Service;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ActiveMQMessage;
033import org.apache.activemq.command.ActiveMQQueue;
034import org.apache.activemq.command.ActiveMQTempQueue;
035import org.apache.activemq.command.ActiveMQTempTopic;
036import org.apache.activemq.command.ActiveMQTopic;
037import org.apache.activemq.command.DestinationInfo;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
043 * being created or deleted.
044 *
045 * 
046 */
047public class DestinationSource implements MessageListener {
048    private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class);
049    private AtomicBoolean started = new AtomicBoolean(false);
050    private final Connection connection;
051    private Session session;
052    private MessageConsumer queueConsumer;
053    private MessageConsumer topicConsumer;
054    private MessageConsumer tempTopicConsumer;
055    private MessageConsumer tempQueueConsumer;
056    private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
057    private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
058    private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
059    private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
060    private DestinationListener listener;
061
062    public DestinationSource(Connection connection) throws JMSException {
063        this.connection = connection;
064    }
065
066    public DestinationListener getListener() {
067        return listener;
068    }
069
070    public void setDestinationListener(DestinationListener listener) {
071        this.listener = listener;
072    }
073
074    /**
075     * Returns the current queues available on the broker
076     */
077    public Set<ActiveMQQueue> getQueues() {
078        return queues;
079    }
080
081    /**
082     * Returns the current topics on the broker
083     */
084    public Set<ActiveMQTopic> getTopics() {
085        return topics;
086    }
087
088    /**
089     * Returns the current temporary topics available on the broker
090     */
091    public Set<ActiveMQTempQueue> getTemporaryQueues() {
092        return temporaryQueues;
093    }
094
095    /**
096     * Returns the current temporary queues available on the broker
097     */
098    public Set<ActiveMQTempTopic> getTemporaryTopics() {
099        return temporaryTopics;
100    }
101
102    public void start() throws JMSException {
103        if (started.compareAndSet(false, true)) {
104            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
105            queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
106            queueConsumer.setMessageListener(this);
107
108            topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
109            topicConsumer.setMessageListener(this);
110
111            tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
112            tempQueueConsumer.setMessageListener(this);
113
114            tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
115            tempTopicConsumer.setMessageListener(this);
116        }
117    }
118
119    public void stop() throws JMSException {
120        if (started.compareAndSet(true, false)) {
121            if (session != null) {
122                session.close();
123            }
124        }
125    }
126
127    public void onMessage(Message message) {
128        if (message instanceof ActiveMQMessage) {
129            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
130            Object command = activeMessage.getDataStructure();
131            if (command instanceof DestinationInfo) {
132                DestinationInfo destinationInfo = (DestinationInfo) command;
133                DestinationEvent event = new DestinationEvent(this, destinationInfo);
134                fireDestinationEvent(event);
135            }
136            else {
137                LOG.warn("Unknown dataStructure: " + command);
138            }
139        }
140        else {
141            LOG.warn("Unknown message type: " + message + ". Message ignored");
142        }
143    }
144
145    protected void fireDestinationEvent(DestinationEvent event) {
146        // now lets update the data structures
147        ActiveMQDestination destination = event.getDestination();
148        boolean add = event.isAddOperation();
149        if (destination instanceof ActiveMQQueue) {
150            ActiveMQQueue queue = (ActiveMQQueue) destination;
151            if (add) {
152                queues.add(queue);
153            }
154            else {
155                queues.remove(queue);
156            }
157        }
158        else if (destination instanceof ActiveMQTopic) {
159            ActiveMQTopic topic = (ActiveMQTopic) destination;
160            if (add) {
161                topics.add(topic);
162            }
163            else {
164                topics.remove(topic);
165            }
166        }
167        else if (destination instanceof ActiveMQTempQueue) {
168            ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
169            if (add) {
170                temporaryQueues.add(queue);
171            }
172            else {
173                temporaryQueues.remove(queue);
174            }
175        }
176        else if (destination instanceof ActiveMQTempTopic) {
177            ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
178            if (add) {
179                temporaryTopics.add(topic);
180            }
181            else {
182                temporaryTopics.remove(topic);
183            }
184        }
185        else {
186            LOG.warn("Unknown destination type: " + destination);
187        }
188        if (listener != null) {
189            listener.onDestinationEvent(event);
190        }
191    }
192}