001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.Set;
021
022import javax.jms.InvalidSelectorException;
023import javax.management.ObjectName;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.region.Subscription;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQQueue;
030import org.apache.activemq.command.ActiveMQTopic;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.filter.DestinationFilter;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.util.JMXSupport;
035
036/**
037 *
038 */
039public class SubscriptionView implements SubscriptionViewMBean {
040
041    protected final Subscription subscription;
042    protected final String clientId;
043    protected final String userName;
044
045    /**
046     * Constructor
047     *
048     * @param subs
049     */
050    public SubscriptionView(String clientId, String userName, Subscription subs) {
051        this.clientId = clientId;
052        this.subscription = subs;
053        this.userName = userName;
054    }
055
056    /**
057     * @return the clientId
058     */
059    public String getClientId() {
060        return clientId;
061    }
062
063    /**
064     * @returns the ObjectName of the Connection that created this subscription
065     */
066    public ObjectName getConnection() {
067        ObjectName result = null;
068
069        if (clientId != null && subscription != null) {
070            ConnectionContext ctx = subscription.getContext();
071            if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
072                BrokerService service = ctx.getBroker().getBrokerService();
073                ManagementContext managementCtx = service.getManagementContext();
074                if (managementCtx != null) {
075
076                    try {
077                        ObjectName query = createConnectionQueury(managementCtx, service.getBrokerName());
078                        Set<ObjectName> names = managementCtx.queryNames(query, null);
079                        if (names.size() == 1) {
080                            result = names.iterator().next();
081                        }
082                    } catch (Exception e) {
083                    }
084                }
085            }
086        }
087        return result;
088    }
089
090    private ObjectName createConnectionQueury(ManagementContext ctx, String brokerName) throws IOException {
091        try {
092            return new ObjectName(ctx.getJmxDomainName() + ":" + "BrokerName="
093                                  + JMXSupport.encodeObjectNamePart(brokerName) + ","
094                                  + "Type=Connection," + "ConnectorName=*,"
095                                  + "Connection=" + JMXSupport.encodeObjectNamePart(clientId));
096        } catch (Throwable e) {
097            throw IOExceptionSupport.create(e);
098        }
099    }
100
101    /**
102     * @return the id of the Connection the Subscription is on
103     */
104    public String getConnectionId() {
105        ConsumerInfo info = getConsumerInfo();
106        if (info != null) {
107            return info.getConsumerId().getConnectionId();
108        }
109        return "NOTSET";
110    }
111
112    /**
113     * @return the id of the Session the subscription is on
114     */
115    public long getSessionId() {
116        ConsumerInfo info = getConsumerInfo();
117        if (info != null) {
118            return info.getConsumerId().getSessionId();
119        }
120        return 0;
121    }
122
123    /**
124     * @return the id of the Subscription
125     */
126    public long getSubcriptionId() {
127        ConsumerInfo info = getConsumerInfo();
128        if (info != null) {
129            return info.getConsumerId().getValue();
130        }
131        return 0;
132    }
133
134    /**
135     * @return the destination name
136     */
137    public String getDestinationName() {
138        ConsumerInfo info = getConsumerInfo();
139        if (info != null) {
140            ActiveMQDestination dest = info.getDestination();
141            return dest.getPhysicalName();
142        }
143        return "NOTSET";
144    }
145
146    public String getSelector() {
147        if (subscription != null) {
148            return subscription.getSelector();
149        }
150        return null;
151    }
152
153    public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
154        if (subscription != null) {
155            subscription.setSelector(selector);
156        } else {
157            throw new UnsupportedOperationException("No subscription object");
158        }
159    }
160
161    /**
162     * @return true if the destination is a Queue
163     */
164    public boolean isDestinationQueue() {
165        ConsumerInfo info = getConsumerInfo();
166        if (info != null) {
167            ActiveMQDestination dest = info.getDestination();
168            return dest.isQueue();
169        }
170        return false;
171    }
172
173    /**
174     * @return true of the destination is a Topic
175     */
176    public boolean isDestinationTopic() {
177        ConsumerInfo info = getConsumerInfo();
178        if (info != null) {
179            ActiveMQDestination dest = info.getDestination();
180            return dest.isTopic();
181        }
182        return false;
183    }
184
185    /**
186     * @return true if the destination is temporary
187     */
188    public boolean isDestinationTemporary() {
189        ConsumerInfo info = getConsumerInfo();
190        if (info != null) {
191            ActiveMQDestination dest = info.getDestination();
192            return dest.isTemporary();
193        }
194        return false;
195    }
196
197    /**
198     * @return true if the subscriber is active
199     */
200    public boolean isActive() {
201        return true;
202    }
203
204    /**
205     * The subscription should release as may references as it can to help the
206     * garbage collector reclaim memory.
207     */
208    public void gc() {
209        if (subscription != null) {
210            subscription.gc();
211        }
212    }
213
214    /**
215     * @return whether or not the subscriber is retroactive or not
216     */
217    public boolean isRetroactive() {
218        ConsumerInfo info = getConsumerInfo();
219        return info != null ? info.isRetroactive() : false;
220    }
221
222    /**
223     * @return whether or not the subscriber is an exclusive consumer
224     */
225    public boolean isExclusive() {
226        ConsumerInfo info = getConsumerInfo();
227        return info != null ? info.isExclusive() : false;
228    }
229
230    /**
231     * @return whether or not the subscriber is durable (persistent)
232     */
233    public boolean isDurable() {
234        ConsumerInfo info = getConsumerInfo();
235        return info != null ? info.isDurable() : false;
236    }
237
238    /**
239     * @return whether or not the subscriber ignores local messages
240     */
241    public boolean isNoLocal() {
242        ConsumerInfo info = getConsumerInfo();
243        return info != null ? info.isNoLocal() : false;
244    }
245
246    /**
247     * @return the maximum number of pending messages allowed in addition to the
248     *         prefetch size. If enabled to a non-zero value then this will
249     *         perform eviction of messages for slow consumers on non-durable
250     *         topics.
251     */
252    public int getMaximumPendingMessageLimit() {
253        ConsumerInfo info = getConsumerInfo();
254        return info != null ? info.getMaximumPendingMessageLimit() : 0;
255    }
256
257    /**
258     * @return the consumer priority
259     */
260    public byte getPriority() {
261        ConsumerInfo info = getConsumerInfo();
262        return info != null ? info.getPriority() : 0;
263    }
264
265    /**
266     * @return the name of the consumer which is only used for durable
267     *         consumers.
268     */
269    public String getSubcriptionName() {
270        ConsumerInfo info = getConsumerInfo();
271        return info != null ? info.getSubscriptionName() : null;
272    }
273
274    /**
275     * @return number of messages pending delivery
276     */
277    public int getPendingQueueSize() {
278        return subscription != null ? subscription.getPendingQueueSize() : 0;
279    }
280
281    /**
282     * @return number of messages dispatched
283     */
284    public int getDispatchedQueueSize() {
285        return subscription != null ? subscription.getDispatchedQueueSize() : 0;
286    }
287
288    public int getMessageCountAwaitingAcknowledge() {
289        return getDispatchedQueueSize();
290    }
291
292    /**
293     * @return number of messages that matched the subscription
294     */
295    public long getDispatchedCounter() {
296        return subscription != null ? subscription.getDispatchedCounter() : 0;
297    }
298
299    /**
300     * @return number of messages that matched the subscription
301     */
302    public long getEnqueueCounter() {
303        return subscription != null ? subscription.getEnqueueCounter() : 0;
304    }
305
306    /**
307     * @return number of messages queued by the client
308     */
309    public long getDequeueCounter() {
310        return subscription != null ? subscription.getDequeueCounter() : 0;
311    }
312
313    protected ConsumerInfo getConsumerInfo() {
314        return subscription != null ? subscription.getConsumerInfo() : null;
315    }
316
317    /**
318     * @return pretty print
319     */
320    public String toString() {
321        return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
322    }
323
324    /**
325     */
326    public int getPrefetchSize() {
327        return subscription != null ? subscription.getPrefetchSize() : 0;
328    }
329
330    public boolean isMatchingQueue(String queueName) {
331        if (isDestinationQueue()) {
332            return matchesDestination(new ActiveMQQueue(queueName));
333        }
334        return false;
335    }
336
337    public boolean isMatchingTopic(String topicName) {
338        if (isDestinationTopic()) {
339            return matchesDestination(new ActiveMQTopic(topicName));
340        }
341        return false;
342    }
343
344    /**
345     * Return true if this subscription matches the given destination
346     *
347     * @param destination the destination to compare against
348     * @return true if this subscription matches the given destination
349     */
350    public boolean matchesDestination(ActiveMQDestination destination) {
351        ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
352        DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
353        return filter.matches(destination);
354    }
355
356    @Override
357    public boolean isSlowConsumer() {
358        return subscription.isSlowConsumer();
359    }
360
361    @Override
362    public String getUserName() {
363        return userName;
364    }
365}