001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import javax.management.openmbean.CompositeData;
020import javax.management.openmbean.OpenDataException;
021import javax.jms.JMSException;
022
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.Queue;
025import org.apache.activemq.broker.region.QueueMessageReference;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.util.BrokerSupport;
029
030/**
031 * Provides a JMX Management view of a Queue.
032 */
033public class QueueView extends DestinationView implements QueueViewMBean {
034    public QueueView(ManagedRegionBroker broker, Queue destination) {
035        super(broker, destination);
036    }
037
038    public CompositeData getMessage(String messageId) throws OpenDataException {
039        QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
040        Message rc = ref.getMessage();
041        if (rc == null) {
042            return null;
043        }
044        return OpenTypeSupport.convert(rc);
045    }
046
047    public void purge() throws Exception {
048        ((Queue)destination).purge();
049    }
050
051    public boolean removeMessage(String messageId) throws Exception {
052        return ((Queue)destination).removeMessage(messageId);
053    }
054
055    public int removeMatchingMessages(String selector) throws Exception {
056        return ((Queue)destination).removeMatchingMessages(selector);
057    }
058
059    public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
060        return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
061    }
062
063    public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
064        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
065        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
066        return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
067    }
068
069    public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
070        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
071        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
072        return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
073    }
074
075    public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
076        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
077        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
078        return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
079    }
080
081    public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
082        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
083        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
084        return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
085    }
086
087    public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
088        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
089        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
090        return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
091    }
092
093    public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
094        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
095        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
096        return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
097    }
098
099    /**
100     * Moves a message back to its original destination
101     */
102    public boolean retryMessage(String messageId) throws Exception {
103        Queue queue = (Queue) destination;
104        QueueMessageReference ref = queue.getMessage(messageId);
105        Message rc = ref.getMessage();
106        if (rc != null) {
107            ActiveMQDestination originalDestination = rc.getOriginalDestination();
108            if (originalDestination != null) {
109                ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
110                return queue.moveMessageTo(context, ref, originalDestination);
111            }
112            else {
113                throw new JMSException("No original destination for message: "+ messageId);
114            }
115        }
116        else {
117            throw new JMSException("Could not find message: "+ messageId);
118        }
119    }
120    
121    public int cursorSize() {
122        Queue queue = (Queue) destination;
123        if (queue.getMessages() != null){
124            return queue.getMessages().size();
125        }
126        return 0;
127    }
128
129   
130    public boolean doesCursorHaveMessagesBuffered() {
131       Queue queue = (Queue) destination;
132       if (queue.getMessages() != null){
133           return queue.getMessages().hasMessagesBufferedToDeliver();
134       }
135       return false;
136
137    }
138
139    
140    public boolean doesCursorHaveSpace() {
141        Queue queue = (Queue) destination;
142        if (queue.getMessages() != null){
143            return queue.getMessages().hasSpace();
144        }
145        return false;
146    }
147
148    
149    public long getCursorMemoryUsage() {
150        Queue queue = (Queue) destination;
151        if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
152            return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
153        }
154        return 0;
155    }
156
157    public int getCursorPercentUsage() {
158        Queue queue = (Queue) destination;
159        if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
160            return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
161        }
162        return 0;
163    }
164
165    public boolean isCursorFull() {
166        Queue queue = (Queue) destination;
167        if (queue.getMessages() != null){
168            return queue.getMessages().isFull();
169        }
170        return false;
171    }
172
173    public boolean isCacheEnabled() {
174        Queue queue = (Queue) destination;
175        if (queue.getMessages() != null){
176            return queue.getMessages().isCacheEnabled();
177        }
178        return false;
179    }
180}