001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.filter;
018    
019    import java.net.URI;
020    import java.util.Collections;
021    import java.util.Iterator;
022    import java.util.List;
023    
024    import javax.jms.Connection;
025    import javax.jms.ConnectionFactory;
026    import javax.jms.Destination;
027    import javax.jms.JMSException;
028    import javax.jms.QueueBrowser;
029    import javax.jms.Session;
030    
031    import org.apache.activemq.ActiveMQConnectionFactory;
032    import org.apache.activemq.command.ActiveMQQueue;
033    import org.apache.activemq.command.ActiveMQTopic;
034    
035    public class AmqMessagesQueryFilter extends AbstractQueryFilter {
036    
037        private URI brokerUrl;
038        private Destination destination;
039    
040        private ConnectionFactory connectionFactory;
041    
042        /**
043         * Create a JMS message query filter
044         *
045         * @param brokerUrl   - broker url to connect to
046         * @param destination - JMS destination to query
047         */
048        public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
049            super(null);
050            this.brokerUrl = brokerUrl;
051            this.destination = destination;
052        }
053    
054        /**
055         * Create a JMS message query filter
056         *
057         * @param brokerUrl   - broker url to connect to
058         * @param destination - JMS destination to query
059         */
060        public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) {
061            super(null);
062            this.destination = destination;
063            this.connectionFactory = connectionFactory;
064        }
065    
066        /**
067         * Queries the specified destination using the message selector format query
068         *
069         * @param queries - message selector queries
070         * @return list messages that matches the selector
071         * @throws Exception
072         */
073        public List query(List queries) throws Exception {
074            String selector = "";
075    
076            // Convert to message selector
077            for (Iterator i = queries.iterator(); i.hasNext(); ) {
078                selector = selector + "(" + i.next().toString() + ") AND ";
079            }
080    
081            // Remove last AND
082            if (!selector.equals("")) {
083                selector = selector.substring(0, selector.length() - 5);
084            }
085    
086            if (destination instanceof ActiveMQQueue) {
087                return queryMessages((ActiveMQQueue) destination, selector);
088            } else {
089                return queryMessages((ActiveMQTopic) destination, selector);
090            }
091        }
092    
093        /**
094         * Query the messages of a queue destination using a queue browser
095         *
096         * @param queue    - queue destination
097         * @param selector - message selector
098         * @return list of messages that matches the selector
099         * @throws Exception
100         */
101        protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
102            Connection conn = createConnection();
103    
104            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
105            QueueBrowser browser = sess.createBrowser(queue, selector);
106    
107            List messages = Collections.list(browser.getEnumeration());
108    
109            conn.close();
110    
111            return messages;
112        }
113    
114        /**
115         * Query the messages of a topic destination using a message consumer
116         *
117         * @param topic    - topic destination
118         * @param selector - message selector
119         * @return list of messages that matches the selector
120         * @throws Exception
121         */
122        protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
123            // TODO: should we use a durable subscriber or a retroactive non-durable
124            // subscriber?
125            // TODO: if a durable subscriber is used, how do we manage it?
126            // subscribe/unsubscribe tasks?
127            return null;
128        }
129    
130        /**
131         * Create and start a JMS connection
132         *
133         * @param brokerUrl - broker url to connect to.
134         * @return JMS connection
135         * @throws JMSException
136         * @deprecated Use createConnection() instead, and pass the url to the ConnectionFactory when it's created.
137         */
138        @Deprecated
139        protected Connection createConnection(URI brokerUrl) throws JMSException {
140            // maintain old behaviour, when called this way.
141            connectionFactory = (new ActiveMQConnectionFactory(brokerUrl));
142            return createConnection();
143        }
144    
145        /**
146         * Create and start a JMS connection
147         *
148         * @return JMS connection
149         * @throws JMSException
150         */
151        protected Connection createConnection() throws JMSException {
152            // maintain old behaviour, when called either way.
153            if (null == connectionFactory)
154                connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl()));
155    
156            Connection conn = connectionFactory.createConnection();
157            conn.start();
158            return conn;
159        }
160    
161    
162        /**
163         * Get the broker url being used.
164         *
165         * @return broker url
166         */
167        public URI getBrokerUrl() {
168            return brokerUrl;
169        }
170    
171        /**
172         * Set the broker url to use.
173         *
174         * @param brokerUrl - broker url
175         */
176        public void setBrokerUrl(URI brokerUrl) {
177            this.brokerUrl = brokerUrl;
178        }
179    
180        /**
181         * Get the destination being used.
182         *
183         * @return - JMS destination
184         */
185        public Destination getDestination() {
186            return destination;
187        }
188    
189        /**
190         * Set the destination to use.
191         *
192         * @param destination - JMS destination
193         */
194        public void setDestination(Destination destination) {
195            this.destination = destination;
196        }
197    
198    }