001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import java.io.IOException; 020 import java.util.Set; 021 022 import javax.jms.InvalidSelectorException; 023 import javax.management.ObjectName; 024 025 import org.apache.activemq.broker.BrokerService; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.region.Subscription; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQQueue; 030 import org.apache.activemq.command.ActiveMQTopic; 031 import org.apache.activemq.command.ConsumerInfo; 032 import org.apache.activemq.filter.DestinationFilter; 033 import org.apache.activemq.util.IOExceptionSupport; 034 import org.apache.activemq.util.JMXSupport; 035 036 /** 037 * 038 */ 039 public class SubscriptionView implements SubscriptionViewMBean { 040 041 protected final Subscription subscription; 042 protected final String clientId; 043 protected final String userName; 044 045 /** 046 * Constructor 047 * 048 * @param subs 049 */ 050 public SubscriptionView(String clientId, String userName, Subscription subs) { 051 this.clientId = clientId; 052 this.subscription = subs; 053 this.userName = userName; 054 } 055 056 /** 057 * @return the clientId 058 */ 059 public String getClientId() { 060 return clientId; 061 } 062 063 /** 064 * @returns the ObjectName of the Connection that created this subscription 065 */ 066 public ObjectName getConnection() { 067 ObjectName result = null; 068 069 if (clientId != null && subscription != null) { 070 ConnectionContext ctx = subscription.getContext(); 071 if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) { 072 BrokerService service = ctx.getBroker().getBrokerService(); 073 ManagementContext managementCtx = service.getManagementContext(); 074 if (managementCtx != null) { 075 076 try { 077 ObjectName query = createConnectionQueury(managementCtx, service.getBrokerName()); 078 Set<ObjectName> names = managementCtx.queryNames(query, null); 079 if (names.size() == 1) { 080 result = names.iterator().next(); 081 } 082 } catch (Exception e) { 083 } 084 } 085 } 086 } 087 return result; 088 } 089 090 private ObjectName createConnectionQueury(ManagementContext ctx, String brokerName) throws IOException { 091 try { 092 return new ObjectName(ctx.getJmxDomainName() + ":" + "BrokerName=" 093 + JMXSupport.encodeObjectNamePart(brokerName) + "," 094 + "Type=Connection," + "ConnectorName=*," 095 + "Connection=" + JMXSupport.encodeObjectNamePart(clientId)); 096 } catch (Throwable e) { 097 throw IOExceptionSupport.create(e); 098 } 099 } 100 101 /** 102 * @return the id of the Connection the Subscription is on 103 */ 104 public String getConnectionId() { 105 ConsumerInfo info = getConsumerInfo(); 106 if (info != null) { 107 return info.getConsumerId().getConnectionId(); 108 } 109 return "NOTSET"; 110 } 111 112 /** 113 * @return the id of the Session the subscription is on 114 */ 115 public long getSessionId() { 116 ConsumerInfo info = getConsumerInfo(); 117 if (info != null) { 118 return info.getConsumerId().getSessionId(); 119 } 120 return 0; 121 } 122 123 /** 124 * @return the id of the Subscription 125 */ 126 public long getSubcriptionId() { 127 ConsumerInfo info = getConsumerInfo(); 128 if (info != null) { 129 return info.getConsumerId().getValue(); 130 } 131 return 0; 132 } 133 134 /** 135 * @return the destination name 136 */ 137 public String getDestinationName() { 138 ConsumerInfo info = getConsumerInfo(); 139 if (info != null) { 140 ActiveMQDestination dest = info.getDestination(); 141 return dest.getPhysicalName(); 142 } 143 return "NOTSET"; 144 } 145 146 public String getSelector() { 147 if (subscription != null) { 148 return subscription.getSelector(); 149 } 150 return null; 151 } 152 153 public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException { 154 if (subscription != null) { 155 subscription.setSelector(selector); 156 } else { 157 throw new UnsupportedOperationException("No subscription object"); 158 } 159 } 160 161 /** 162 * @return true if the destination is a Queue 163 */ 164 public boolean isDestinationQueue() { 165 ConsumerInfo info = getConsumerInfo(); 166 if (info != null) { 167 ActiveMQDestination dest = info.getDestination(); 168 return dest.isQueue(); 169 } 170 return false; 171 } 172 173 /** 174 * @return true of the destination is a Topic 175 */ 176 public boolean isDestinationTopic() { 177 ConsumerInfo info = getConsumerInfo(); 178 if (info != null) { 179 ActiveMQDestination dest = info.getDestination(); 180 return dest.isTopic(); 181 } 182 return false; 183 } 184 185 /** 186 * @return true if the destination is temporary 187 */ 188 public boolean isDestinationTemporary() { 189 ConsumerInfo info = getConsumerInfo(); 190 if (info != null) { 191 ActiveMQDestination dest = info.getDestination(); 192 return dest.isTemporary(); 193 } 194 return false; 195 } 196 197 /** 198 * @return true if the subscriber is active 199 */ 200 public boolean isActive() { 201 return true; 202 } 203 204 /** 205 * The subscription should release as may references as it can to help the 206 * garbage collector reclaim memory. 207 */ 208 public void gc() { 209 if (subscription != null) { 210 subscription.gc(); 211 } 212 } 213 214 /** 215 * @return whether or not the subscriber is retroactive or not 216 */ 217 public boolean isRetroactive() { 218 ConsumerInfo info = getConsumerInfo(); 219 return info != null ? info.isRetroactive() : false; 220 } 221 222 /** 223 * @return whether or not the subscriber is an exclusive consumer 224 */ 225 public boolean isExclusive() { 226 ConsumerInfo info = getConsumerInfo(); 227 return info != null ? info.isExclusive() : false; 228 } 229 230 /** 231 * @return whether or not the subscriber is durable (persistent) 232 */ 233 public boolean isDurable() { 234 ConsumerInfo info = getConsumerInfo(); 235 return info != null ? info.isDurable() : false; 236 } 237 238 /** 239 * @return whether or not the subscriber ignores local messages 240 */ 241 public boolean isNoLocal() { 242 ConsumerInfo info = getConsumerInfo(); 243 return info != null ? info.isNoLocal() : false; 244 } 245 246 /** 247 * @return the maximum number of pending messages allowed in addition to the 248 * prefetch size. If enabled to a non-zero value then this will 249 * perform eviction of messages for slow consumers on non-durable 250 * topics. 251 */ 252 public int getMaximumPendingMessageLimit() { 253 ConsumerInfo info = getConsumerInfo(); 254 return info != null ? info.getMaximumPendingMessageLimit() : 0; 255 } 256 257 /** 258 * @return the consumer priority 259 */ 260 public byte getPriority() { 261 ConsumerInfo info = getConsumerInfo(); 262 return info != null ? info.getPriority() : 0; 263 } 264 265 /** 266 * @return the name of the consumer which is only used for durable 267 * consumers. 268 */ 269 public String getSubcriptionName() { 270 ConsumerInfo info = getConsumerInfo(); 271 return info != null ? info.getSubscriptionName() : null; 272 } 273 274 /** 275 * @return number of messages pending delivery 276 */ 277 public int getPendingQueueSize() { 278 return subscription != null ? subscription.getPendingQueueSize() : 0; 279 } 280 281 /** 282 * @return number of messages dispatched 283 */ 284 public int getDispatchedQueueSize() { 285 return subscription != null ? subscription.getDispatchedQueueSize() : 0; 286 } 287 288 public int getMessageCountAwaitingAcknowledge() { 289 return getDispatchedQueueSize(); 290 } 291 292 /** 293 * @return number of messages that matched the subscription 294 */ 295 public long getDispatchedCounter() { 296 return subscription != null ? subscription.getDispatchedCounter() : 0; 297 } 298 299 /** 300 * @return number of messages that matched the subscription 301 */ 302 public long getEnqueueCounter() { 303 return subscription != null ? subscription.getEnqueueCounter() : 0; 304 } 305 306 /** 307 * @return number of messages queued by the client 308 */ 309 public long getDequeueCounter() { 310 return subscription != null ? subscription.getDequeueCounter() : 0; 311 } 312 313 protected ConsumerInfo getConsumerInfo() { 314 return subscription != null ? subscription.getConsumerInfo() : null; 315 } 316 317 /** 318 * @return pretty print 319 */ 320 public String toString() { 321 return "SubscriptionView: " + getClientId() + ":" + getConnectionId(); 322 } 323 324 /** 325 */ 326 public int getPrefetchSize() { 327 return subscription != null ? subscription.getPrefetchSize() : 0; 328 } 329 330 public boolean isMatchingQueue(String queueName) { 331 if (isDestinationQueue()) { 332 return matchesDestination(new ActiveMQQueue(queueName)); 333 } 334 return false; 335 } 336 337 public boolean isMatchingTopic(String topicName) { 338 if (isDestinationTopic()) { 339 return matchesDestination(new ActiveMQTopic(topicName)); 340 } 341 return false; 342 } 343 344 /** 345 * Return true if this subscription matches the given destination 346 * 347 * @param destination the destination to compare against 348 * @return true if this subscription matches the given destination 349 */ 350 public boolean matchesDestination(ActiveMQDestination destination) { 351 ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination(); 352 DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination); 353 return filter.matches(destination); 354 } 355 356 @Override 357 public boolean isSlowConsumer() { 358 return subscription.isSlowConsumer(); 359 } 360 361 @Override 362 public String getUserName() { 363 return userName; 364 } 365 }