001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import javax.management.openmbean.CompositeData;
020    import javax.management.openmbean.OpenDataException;
021    import javax.jms.JMSException;
022    
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.region.Queue;
025    import org.apache.activemq.broker.region.QueueMessageReference;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.util.BrokerSupport;
029    
030    /**
031     * Provides a JMX Management view of a Queue.
032     */
033    public class QueueView extends DestinationView implements QueueViewMBean {
034        public QueueView(ManagedRegionBroker broker, Queue destination) {
035            super(broker, destination);
036        }
037    
038        public CompositeData getMessage(String messageId) throws OpenDataException {
039            QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
040            Message rc = ref.getMessage();
041            if (rc == null) {
042                return null;
043            }
044            return OpenTypeSupport.convert(rc);
045        }
046    
047        public void purge() throws Exception {
048            ((Queue)destination).purge();
049        }
050    
051        public boolean removeMessage(String messageId) throws Exception {
052            return ((Queue)destination).removeMessage(messageId);
053        }
054    
055        public int removeMatchingMessages(String selector) throws Exception {
056            return ((Queue)destination).removeMatchingMessages(selector);
057        }
058    
059        public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
060            return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
061        }
062    
063        public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
064            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
065            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
066            return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
067        }
068    
069        public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
070            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
071            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
072            return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
073        }
074    
075        public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
076            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
077            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
078            return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
079        }
080    
081        public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
082            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
083            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
084            return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
085        }
086    
087        public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
088            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
089            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
090            return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
091        }
092    
093        public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
094            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
095            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
096            return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
097        }
098    
099        /**
100         * Moves a message back to its original destination
101         */
102        public boolean retryMessage(String messageId) throws Exception {
103            Queue queue = (Queue) destination;
104            QueueMessageReference ref = queue.getMessage(messageId);
105            Message rc = ref.getMessage();
106            if (rc != null) {
107                ActiveMQDestination originalDestination = rc.getOriginalDestination();
108                if (originalDestination != null) {
109                    ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
110                    return queue.moveMessageTo(context, ref, originalDestination);
111                }
112                else {
113                    throw new JMSException("No original destination for message: "+ messageId);
114                }
115            }
116            else {
117                throw new JMSException("Could not find message: "+ messageId);
118            }
119        }
120        
121        public int cursorSize() {
122            Queue queue = (Queue) destination;
123            if (queue.getMessages() != null){
124                return queue.getMessages().size();
125            }
126            return 0;
127        }
128    
129       
130        public boolean doesCursorHaveMessagesBuffered() {
131           Queue queue = (Queue) destination;
132           if (queue.getMessages() != null){
133               return queue.getMessages().hasMessagesBufferedToDeliver();
134           }
135           return false;
136    
137        }
138    
139        
140        public boolean doesCursorHaveSpace() {
141            Queue queue = (Queue) destination;
142            if (queue.getMessages() != null){
143                return queue.getMessages().hasSpace();
144            }
145            return false;
146        }
147    
148        
149        public long getCursorMemoryUsage() {
150            Queue queue = (Queue) destination;
151            if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
152                return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
153            }
154            return 0;
155        }
156    
157        public int getCursorPercentUsage() {
158            Queue queue = (Queue) destination;
159            if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
160                return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
161            }
162            return 0;
163        }
164    
165        public boolean isCursorFull() {
166            Queue queue = (Queue) destination;
167            if (queue.getMessages() != null){
168                return queue.getMessages().isFull();
169            }
170            return false;
171        }
172    
173        public boolean isCacheEnabled() {
174            Queue queue = (Queue) destination;
175            if (queue.getMessages() != null){
176                return queue.getMessages().isCacheEnabled();
177            }
178            return false;
179        }
180    }