001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.Enumeration;
020    import java.util.concurrent.atomic.AtomicBoolean;
021    
022    import javax.jms.IllegalStateException;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.Queue;
026    import javax.jms.QueueBrowser;
027    
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ConsumerId;
030    import org.apache.activemq.command.MessageDispatch;
031    
032    /**
033     * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
034     * queue without removing them. <p/>
035     * <P>
036     * The <CODE>getEnumeration</CODE> method returns a <CODE>
037     * java.util.Enumeration</CODE>
038     * that is used to scan the queue's messages. It may be an enumeration of the
039     * entire content of a queue, or it may contain only the messages matching a
040     * message selector. <p/>
041     * <P>
042     * Messages may be arriving and expiring while the scan is done. The JMS API
043     * does not require the content of an enumeration to be a static snapshot of
044     * queue content. Whether these changes are visible or not depends on the JMS
045     * provider. <p/>
046     * <P>
047     * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
048     * </CODE>
049     * or a <CODE>QueueSession</CODE>.
050     *
051     * @see javax.jms.Session#createBrowser
052     * @see javax.jms.QueueSession#createBrowser
053     * @see javax.jms.QueueBrowser
054     * @see javax.jms.QueueReceiver
055     */
056    
057    public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
058    
059        private final ActiveMQSession session;
060        private final ActiveMQDestination destination;
061        private final String selector;
062    
063        private ActiveMQMessageConsumer consumer;
064        private boolean closed;
065        private final ConsumerId consumerId;
066        private final AtomicBoolean browseDone = new AtomicBoolean(true);
067        private final boolean dispatchAsync;
068        private Object semaphore = new Object();
069    
070        /**
071         * Constructor for an ActiveMQQueueBrowser - used internally
072         *
073         * @param theSession
074         * @param dest
075         * @param selector
076         * @throws JMSException
077         */
078        protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
079            this.session = session;
080            this.consumerId = consumerId;
081            this.destination = destination;
082            this.selector = selector;
083            this.dispatchAsync = dispatchAsync;
084            this.consumer = createConsumer();
085        }
086    
087        /**
088         * @param session
089         * @param originalDestination
090         * @param selectorExpression
091         * @param cnum
092         * @return
093         * @throws JMSException
094         */
095        private ActiveMQMessageConsumer createConsumer() throws JMSException {
096            browseDone.set(false);
097            ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
098    
099            return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
100                .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
101                public void dispatch(MessageDispatch md) {
102                    if (md.getMessage() == null) {
103                        browseDone.set(true);
104                    } else {
105                        super.dispatch(md);
106                    }
107                    notifyMessageAvailable();
108                }
109            };
110        }
111    
112        private void destroyConsumer() {
113            if (consumer == null) {
114                return;
115            }
116            try {
117                if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) {
118                    session.commit();
119                }
120                consumer.close();
121                consumer = null;
122            } catch (JMSException e) {
123                e.printStackTrace();
124            }
125        }
126    
127        /**
128         * Gets an enumeration for browsing the current queue messages in the order
129         * they would be received.
130         *
131         * @return an enumeration for browsing the messages
132         * @throws JMSException if the JMS provider fails to get the enumeration for
133         *                 this browser due to some internal error.
134         */
135    
136        public Enumeration getEnumeration() throws JMSException {
137            checkClosed();
138            if (consumer == null) {
139                consumer = createConsumer();
140            }
141            return this;
142        }
143    
144        private void checkClosed() throws IllegalStateException {
145            if (closed) {
146                throw new IllegalStateException("The Consumer is closed");
147            }
148        }
149    
150        /**
151         * @return true if more messages to process
152         */
153        public boolean hasMoreElements() {
154            while (true) {
155    
156                synchronized (this) {
157                    if (consumer == null) {
158                        return false;
159                    }
160                }
161    
162                if (consumer.getMessageSize() > 0) {
163                    return true;
164                }
165    
166                if (browseDone.get() || !session.isRunning()) {
167                    destroyConsumer();
168                    return false;
169                }
170    
171                waitForMessage();
172            }
173        }
174    
175        /**
176         * @return the next message
177         */
178        public Object nextElement() {
179            while (true) {
180    
181                synchronized (this) {
182                    if (consumer == null) {
183                        return null;
184                    }
185                }
186    
187                try {
188                    Message answer = consumer.receiveNoWait();
189                    if (answer != null) {
190                        return answer;
191                    }
192                } catch (JMSException e) {
193                    this.session.connection.onClientInternalException(e);
194                    return null;
195                }
196    
197                if (browseDone.get() || !session.isRunning()) {
198                    destroyConsumer();
199                    return null;
200                }
201    
202                waitForMessage();
203            }
204        }
205    
206        public synchronized void close() throws JMSException {
207            destroyConsumer();
208            closed = true;
209        }
210    
211        /**
212         * Gets the queue associated with this queue browser.
213         *
214         * @return the queue
215         * @throws JMSException if the JMS provider fails to get the queue
216         *                 associated with this browser due to some internal error.
217         */
218    
219        public Queue getQueue() throws JMSException {
220            return (Queue)destination;
221        }
222    
223        public String getMessageSelector() throws JMSException {
224            return selector;
225        }
226    
227        // Implementation methods
228        // -------------------------------------------------------------------------
229    
230        /**
231         * Wait on a semaphore for a fixed amount of time for a message to come in.
232         * @throws JMSException
233         */
234        protected void waitForMessage() {
235            try {
236                consumer.sendPullCommand(-1);
237                synchronized (semaphore) {
238                    semaphore.wait(2000);
239                }
240            } catch (InterruptedException e) {
241                Thread.currentThread().interrupt();
242            } catch (JMSException e) {
243            }
244    
245        }
246    
247        protected void notifyMessageAvailable() {
248            synchronized (semaphore) {
249                semaphore.notifyAll();
250            }
251        }
252    
253        public String toString() {
254            return "ActiveMQQueueBrowser { value=" + consumerId + " }";
255        }
256    
257    }