001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Hashtable; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 import java.util.Set; 028 import java.util.concurrent.ConcurrentHashMap; 029 import java.util.concurrent.CopyOnWriteArraySet; 030 import java.util.concurrent.ThreadPoolExecutor; 031 032 import javax.management.InstanceNotFoundException; 033 import javax.management.MalformedObjectNameException; 034 import javax.management.ObjectName; 035 import javax.management.openmbean.CompositeData; 036 import javax.management.openmbean.CompositeDataSupport; 037 import javax.management.openmbean.CompositeType; 038 import javax.management.openmbean.OpenDataException; 039 import javax.management.openmbean.TabularData; 040 import javax.management.openmbean.TabularDataSupport; 041 import javax.management.openmbean.TabularType; 042 043 import org.apache.activemq.broker.Broker; 044 import org.apache.activemq.broker.BrokerService; 045 import org.apache.activemq.broker.ConnectionContext; 046 import org.apache.activemq.broker.ProducerBrokerExchange; 047 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 048 import org.apache.activemq.broker.region.Destination; 049 import org.apache.activemq.broker.region.DestinationFactory; 050 import org.apache.activemq.broker.region.DestinationFactoryImpl; 051 import org.apache.activemq.broker.region.DestinationInterceptor; 052 import org.apache.activemq.broker.region.Queue; 053 import org.apache.activemq.broker.region.Region; 054 import org.apache.activemq.broker.region.RegionBroker; 055 import org.apache.activemq.broker.region.Subscription; 056 import org.apache.activemq.broker.region.Topic; 057 import org.apache.activemq.broker.region.TopicRegion; 058 import org.apache.activemq.broker.region.TopicSubscription; 059 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 060 import org.apache.activemq.command.ActiveMQDestination; 061 import org.apache.activemq.command.ActiveMQMessage; 062 import org.apache.activemq.command.ActiveMQTopic; 063 import org.apache.activemq.command.ConsumerInfo; 064 import org.apache.activemq.command.Message; 065 import org.apache.activemq.command.MessageId; 066 import org.apache.activemq.command.ProducerInfo; 067 import org.apache.activemq.command.SubscriptionInfo; 068 import org.apache.activemq.store.MessageRecoveryListener; 069 import org.apache.activemq.store.PersistenceAdapter; 070 import org.apache.activemq.store.TopicMessageStore; 071 import org.apache.activemq.thread.Scheduler; 072 import org.apache.activemq.thread.TaskRunnerFactory; 073 import org.apache.activemq.transaction.XATransaction; 074 import org.apache.activemq.usage.SystemUsage; 075 import org.apache.activemq.util.JMXSupport; 076 import org.apache.activemq.util.ServiceStopper; 077 import org.apache.activemq.util.SubscriptionKey; 078 import org.slf4j.Logger; 079 import org.slf4j.LoggerFactory; 080 081 public class ManagedRegionBroker extends RegionBroker { 082 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); 083 private final ManagementContext managementContext; 084 private final ObjectName brokerObjectName; 085 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); 086 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); 087 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); 088 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); 089 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 090 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 091 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 092 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 093 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 094 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 095 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 096 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 097 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 098 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 099 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 100 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); 101 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); 102 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); 103 /* This is the first broker in the broker interceptor chain. */ 104 private Broker contextBroker; 105 106 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, 107 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 108 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); 109 this.managementContext = context; 110 this.brokerObjectName = brokerObjectName; 111 } 112 113 @Override 114 public void start() throws Exception { 115 super.start(); 116 // build all existing durable subscriptions 117 buildExistingSubscriptions(); 118 } 119 120 @Override 121 protected void doStop(ServiceStopper stopper) { 122 super.doStop(stopper); 123 // lets remove any mbeans not yet removed 124 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) { 125 ObjectName name = iter.next(); 126 try { 127 managementContext.unregisterMBean(name); 128 } catch (InstanceNotFoundException e) { 129 LOG.warn("The MBean: " + name + " is no longer registered with JMX"); 130 } catch (Exception e) { 131 stopper.onException(this, e); 132 } 133 } 134 registeredMBeans.clear(); 135 } 136 137 @Override 138 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 139 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 140 } 141 142 @Override 143 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 144 return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 145 } 146 147 @Override 148 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 149 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 150 } 151 152 @Override 153 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 154 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 155 } 156 157 public void register(ActiveMQDestination destName, Destination destination) { 158 // TODO refactor to allow views for custom destinations 159 try { 160 ObjectName objectName = createObjectName(destName); 161 DestinationView view; 162 if (destination instanceof Queue) { 163 view = new QueueView(this, (Queue)destination); 164 } else if (destination instanceof Topic) { 165 view = new TopicView(this, (Topic)destination); 166 } else { 167 view = null; 168 LOG.warn("JMX View is not supported for custom destination: " + destination); 169 } 170 if (view != null) { 171 registerDestination(objectName, destName, view); 172 } 173 } catch (Exception e) { 174 LOG.error("Failed to register destination " + destName, e); 175 } 176 } 177 178 public void unregister(ActiveMQDestination destName) { 179 try { 180 ObjectName objectName = createObjectName(destName); 181 unregisterDestination(objectName); 182 } catch (Exception e) { 183 LOG.error("Failed to unregister " + destName, e); 184 } 185 } 186 187 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { 188 String connectionClientId = context.getClientId(); 189 ObjectName brokerJmxObjectName = brokerObjectName; 190 String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName); 191 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); 192 try { 193 ObjectName objectName = new ObjectName(objectNameStr); 194 SubscriptionView view; 195 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) { 196 // add offline subscribers to inactive list 197 SubscriptionInfo info = new SubscriptionInfo(); 198 info.setClientId(context.getClientId()); 199 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName()); 200 info.setDestination(sub.getConsumerInfo().getDestination()); 201 info.setSelector(sub.getSelector()); 202 addInactiveSubscription(key, info, sub); 203 } else { 204 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 205 if (sub.getConsumerInfo().isDurable()) { 206 view = new DurableSubscriptionView(this, context.getClientId(), userName, sub); 207 } else { 208 if (sub instanceof TopicSubscription) { 209 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub); 210 } else { 211 view = new SubscriptionView(context.getClientId(), userName, sub); 212 } 213 } 214 registerSubscription(objectName, sub.getConsumerInfo(), key, view); 215 } 216 subscriptionMap.put(sub, objectName); 217 return objectName; 218 } catch (Exception e) { 219 LOG.error("Failed to register subscription " + sub, e); 220 return null; 221 } 222 } 223 224 public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) { 225 Hashtable<String, String> map = brokerJmxObjectName.getKeyPropertyList(); 226 String brokerDomain = brokerJmxObjectName.getDomain(); 227 String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,"; 228 String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString(); 229 String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName()); 230 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId); 231 String persistentMode = "persistentMode="; 232 String consumerId = ""; 233 if (info.isDurable()) { 234 persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName()); 235 } else { 236 persistentMode += "Non-Durable"; 237 if (info.getConsumerId() != null) { 238 consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString()); 239 } 240 } 241 objectNameStr += persistentMode + ","; 242 objectNameStr += destinationType + ","; 243 objectNameStr += destinationName + ","; 244 objectNameStr += clientId; 245 objectNameStr += consumerId; 246 return objectNameStr; 247 } 248 249 @Override 250 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 251 Subscription sub = super.addConsumer(context, info); 252 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 253 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 254 if (inactiveName != null) { 255 // if it was inactive, register it 256 registerSubscription(context, sub); 257 } 258 return sub; 259 } 260 261 @Override 262 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 263 for (Subscription sub : subscriptionMap.keySet()) { 264 if (sub.getConsumerInfo().equals(info)) { 265 // unregister all consumer subs 266 unregisterSubscription(subscriptionMap.get(sub), true); 267 } 268 } 269 super.removeConsumer(context, info); 270 } 271 272 @Override 273 public void addProducer(ConnectionContext context, ProducerInfo info) 274 throws Exception { 275 super.addProducer(context, info); 276 String connectionClientId = context.getClientId(); 277 ObjectName objectName = createObjectName(info, connectionClientId); 278 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 279 ProducerView view = new ProducerView(info, connectionClientId, userName, this); 280 registerProducer(objectName, info.getDestination(), view); 281 } 282 283 @Override 284 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 285 ObjectName objectName = createObjectName(info, context.getClientId()); 286 unregisterProducer(objectName); 287 super.removeProducer(context, info); 288 } 289 290 @Override 291 public void send(ProducerBrokerExchange exchange, Message message) throws Exception { 292 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) { 293 ProducerInfo info = exchange.getProducerState().getInfo(); 294 if (info.getDestination() == null && info.getProducerId() != null) { 295 ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId()); 296 ProducerView view = this.dynamicDestinationProducers.get(objectName); 297 if (view != null) { 298 ActiveMQDestination dest = message.getDestination(); 299 if (dest != null) { 300 view.setLastUsedDestinationName(dest); 301 } 302 } 303 } 304 } 305 super.send(exchange, message); 306 } 307 308 public void unregisterSubscription(Subscription sub) { 309 ObjectName name = subscriptionMap.remove(sub); 310 if (name != null) { 311 try { 312 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 313 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 314 if (inactiveName != null) { 315 inactiveDurableTopicSubscribers.remove(inactiveName); 316 managementContext.unregisterMBean(inactiveName); 317 } 318 } catch (Exception e) { 319 LOG.error("Failed to unregister subscription " + sub, e); 320 } 321 } 322 } 323 324 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception { 325 if (dest.isQueue()) { 326 if (dest.isTemporary()) { 327 temporaryQueues.put(key, view); 328 } else { 329 queues.put(key, view); 330 } 331 } else { 332 if (dest.isTemporary()) { 333 temporaryTopics.put(key, view); 334 } else { 335 topics.put(key, view); 336 } 337 } 338 try { 339 AnnotatedMBean.registerMBean(managementContext, view, key); 340 registeredMBeans.add(key); 341 } catch (Throwable e) { 342 LOG.warn("Failed to register MBean: " + key); 343 LOG.debug("Failure reason: " + e, e); 344 } 345 } 346 347 protected void unregisterDestination(ObjectName key) throws Exception { 348 349 DestinationView view = removeAndRemember(topics, key, null); 350 view = removeAndRemember(queues, key, view); 351 view = removeAndRemember(temporaryQueues, key, view); 352 view = removeAndRemember(temporaryTopics, key, view); 353 if (registeredMBeans.remove(key)) { 354 try { 355 managementContext.unregisterMBean(key); 356 } catch (Throwable e) { 357 LOG.warn("Failed to unregister MBean: " + key); 358 LOG.debug("Failure reason: " + e, e); 359 } 360 } 361 if (view != null) { 362 key = view.getSlowConsumerStrategy(); 363 if (key!= null && registeredMBeans.remove(key)) { 364 try { 365 managementContext.unregisterMBean(key); 366 } catch (Throwable e) { 367 LOG.warn("Failed to unregister slow consumer strategy MBean: " + key); 368 LOG.debug("Failure reason: " + e, e); 369 } 370 } 371 } 372 } 373 374 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception { 375 376 if (dest != null) { 377 if (dest.isQueue()) { 378 if (dest.isTemporary()) { 379 temporaryQueueProducers.put(key, view); 380 } else { 381 queueProducers.put(key, view); 382 } 383 } else { 384 if (dest.isTemporary()) { 385 temporaryTopicProducers.put(key, view); 386 } else { 387 topicProducers.put(key, view); 388 } 389 } 390 } else { 391 dynamicDestinationProducers.put(key, view); 392 } 393 394 try { 395 AnnotatedMBean.registerMBean(managementContext, view, key); 396 registeredMBeans.add(key); 397 } catch (Throwable e) { 398 LOG.warn("Failed to register MBean: " + key); 399 LOG.debug("Failure reason: " + e, e); 400 } 401 } 402 403 protected void unregisterProducer(ObjectName key) throws Exception { 404 queueProducers.remove(key); 405 topicProducers.remove(key); 406 temporaryQueueProducers.remove(key); 407 temporaryTopicProducers.remove(key); 408 dynamicDestinationProducers.remove(key); 409 if (registeredMBeans.remove(key)) { 410 try { 411 managementContext.unregisterMBean(key); 412 } catch (Throwable e) { 413 LOG.warn("Failed to unregister MBean: " + key); 414 LOG.debug("Failure reason: " + e, e); 415 } 416 } 417 } 418 419 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) { 420 DestinationView candidate = map.remove(key); 421 if (candidate != null && view == null) { 422 view = candidate; 423 } 424 return candidate != null ? candidate : view; 425 } 426 427 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { 428 ActiveMQDestination dest = info.getDestination(); 429 if (dest.isQueue()) { 430 if (dest.isTemporary()) { 431 temporaryQueueSubscribers.put(key, view); 432 } else { 433 queueSubscribers.put(key, view); 434 } 435 } else { 436 if (dest.isTemporary()) { 437 temporaryTopicSubscribers.put(key, view); 438 } else { 439 if (info.isDurable()) { 440 durableTopicSubscribers.put(key, view); 441 // unregister any inactive durable subs 442 try { 443 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 444 if (inactiveName != null) { 445 inactiveDurableTopicSubscribers.remove(inactiveName); 446 registeredMBeans.remove(inactiveName); 447 managementContext.unregisterMBean(inactiveName); 448 } 449 } catch (Throwable e) { 450 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e); 451 } 452 } else { 453 topicSubscribers.put(key, view); 454 } 455 } 456 } 457 458 try { 459 AnnotatedMBean.registerMBean(managementContext, view, key); 460 registeredMBeans.add(key); 461 } catch (Throwable e) { 462 LOG.warn("Failed to register MBean: " + key); 463 LOG.debug("Failure reason: " + e, e); 464 } 465 466 } 467 468 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception { 469 queueSubscribers.remove(key); 470 topicSubscribers.remove(key); 471 temporaryQueueSubscribers.remove(key); 472 temporaryTopicSubscribers.remove(key); 473 if (registeredMBeans.remove(key)) { 474 try { 475 managementContext.unregisterMBean(key); 476 } catch (Throwable e) { 477 LOG.warn("Failed to unregister MBean: " + key); 478 LOG.debug("Failure reason: " + e, e); 479 } 480 } 481 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key); 482 if (view != null) { 483 // need to put this back in the inactive list 484 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName()); 485 if (addToInactive) { 486 SubscriptionInfo info = new SubscriptionInfo(); 487 info.setClientId(subscriptionKey.getClientId()); 488 info.setSubscriptionName(subscriptionKey.getSubscriptionName()); 489 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 490 info.setSelector(view.getSelector()); 491 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null)); 492 } 493 } 494 } 495 496 protected void buildExistingSubscriptions() throws Exception { 497 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>(); 498 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 499 if (destinations != null) { 500 for (ActiveMQDestination dest : destinations) { 501 if (dest.isTopic()) { 502 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest); 503 if (infos != null) { 504 for (int i = 0; i < infos.length; i++) { 505 SubscriptionInfo info = infos[i]; 506 SubscriptionKey key = new SubscriptionKey(info); 507 if (!alreadyKnown(key)) { 508 LOG.debug("Restoring durable subscription mbean: " + info); 509 subscriptions.put(key, info); 510 } 511 } 512 } 513 } 514 } 515 } 516 517 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) { 518 addInactiveSubscription(entry.getKey(), entry.getValue(), null); 519 } 520 } 521 522 private boolean alreadyKnown(SubscriptionKey key) { 523 boolean known = false; 524 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key); 525 if (LOG.isTraceEnabled()) { 526 LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered"); 527 } 528 return known; 529 } 530 531 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) { 532 try { 533 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info); 534 ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName)); 535 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription); 536 537 try { 538 AnnotatedMBean.registerMBean(managementContext, view, objectName); 539 registeredMBeans.add(objectName); 540 } catch (Throwable e) { 541 LOG.warn("Failed to register MBean: " + key); 542 LOG.debug("Failure reason: " + e, e); 543 } 544 545 inactiveDurableTopicSubscribers.put(objectName, view); 546 subscriptionKeys.put(key, objectName); 547 } catch (Exception e) { 548 LOG.error("Failed to register subscription " + info, e); 549 } 550 } 551 552 public CompositeData[] browse(SubscriptionView view) throws OpenDataException { 553 List<Message> messages = getSubscriberMessages(view); 554 CompositeData c[] = new CompositeData[messages.size()]; 555 for (int i = 0; i < c.length; i++) { 556 try { 557 c[i] = OpenTypeSupport.convert(messages.get(i)); 558 } catch (Throwable e) { 559 LOG.error("failed to browse : " + view, e); 560 } 561 } 562 return c; 563 } 564 565 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 566 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 567 List<Message> messages = getSubscriberMessages(view); 568 CompositeType ct = factory.getCompositeType(); 569 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); 570 TabularDataSupport rc = new TabularDataSupport(tt); 571 for (int i = 0; i < messages.size(); i++) { 572 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i)))); 573 } 574 return rc; 575 } 576 577 protected List<Message> getSubscriberMessages(SubscriptionView view) { 578 // TODO It is very dangerous operation for big backlogs 579 if (!(destinationFactory instanceof DestinationFactoryImpl)) { 580 throw new RuntimeException("unsupported by " + destinationFactory); 581 } 582 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); 583 final List<Message> result = new ArrayList<Message>(); 584 try { 585 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); 586 TopicMessageStore store = adapter.createTopicMessageStore(topic); 587 store.recover(new MessageRecoveryListener() { 588 public boolean recoverMessage(Message message) throws Exception { 589 result.add(message); 590 return true; 591 } 592 593 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 594 throw new RuntimeException("Should not be called."); 595 } 596 597 public boolean hasSpace() { 598 return true; 599 } 600 601 public boolean isDuplicate(MessageId id) { 602 return false; 603 } 604 }); 605 } catch (Throwable e) { 606 LOG.error("Failed to browse messages for Subscription " + view, e); 607 } 608 return result; 609 610 } 611 612 protected ObjectName[] getTopics() { 613 Set<ObjectName> set = topics.keySet(); 614 return set.toArray(new ObjectName[set.size()]); 615 } 616 617 protected ObjectName[] getQueues() { 618 Set<ObjectName> set = queues.keySet(); 619 return set.toArray(new ObjectName[set.size()]); 620 } 621 622 protected ObjectName[] getTemporaryTopics() { 623 Set<ObjectName> set = temporaryTopics.keySet(); 624 return set.toArray(new ObjectName[set.size()]); 625 } 626 627 protected ObjectName[] getTemporaryQueues() { 628 Set<ObjectName> set = temporaryQueues.keySet(); 629 return set.toArray(new ObjectName[set.size()]); 630 } 631 632 protected ObjectName[] getTopicSubscribers() { 633 Set<ObjectName> set = topicSubscribers.keySet(); 634 return set.toArray(new ObjectName[set.size()]); 635 } 636 637 protected ObjectName[] getDurableTopicSubscribers() { 638 Set<ObjectName> set = durableTopicSubscribers.keySet(); 639 return set.toArray(new ObjectName[set.size()]); 640 } 641 642 protected ObjectName[] getQueueSubscribers() { 643 Set<ObjectName> set = queueSubscribers.keySet(); 644 return set.toArray(new ObjectName[set.size()]); 645 } 646 647 protected ObjectName[] getTemporaryTopicSubscribers() { 648 Set<ObjectName> set = temporaryTopicSubscribers.keySet(); 649 return set.toArray(new ObjectName[set.size()]); 650 } 651 652 protected ObjectName[] getTemporaryQueueSubscribers() { 653 Set<ObjectName> set = temporaryQueueSubscribers.keySet(); 654 return set.toArray(new ObjectName[set.size()]); 655 } 656 657 protected ObjectName[] getInactiveDurableTopicSubscribers() { 658 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet(); 659 return set.toArray(new ObjectName[set.size()]); 660 } 661 662 protected ObjectName[] getTopicProducers() { 663 Set<ObjectName> set = topicProducers.keySet(); 664 return set.toArray(new ObjectName[set.size()]); 665 } 666 667 protected ObjectName[] getQueueProducers() { 668 Set<ObjectName> set = queueProducers.keySet(); 669 return set.toArray(new ObjectName[set.size()]); 670 } 671 672 protected ObjectName[] getTemporaryTopicProducers() { 673 Set<ObjectName> set = temporaryTopicProducers.keySet(); 674 return set.toArray(new ObjectName[set.size()]); 675 } 676 677 protected ObjectName[] getTemporaryQueueProducers() { 678 Set<ObjectName> set = temporaryQueueProducers.keySet(); 679 return set.toArray(new ObjectName[set.size()]); 680 } 681 682 protected ObjectName[] getDynamicDestinationProducers() { 683 Set<ObjectName> set = dynamicDestinationProducers.keySet(); 684 return set.toArray(new ObjectName[set.size()]); 685 } 686 687 public Broker getContextBroker() { 688 return contextBroker; 689 } 690 691 public void setContextBroker(Broker contextBroker) { 692 this.contextBroker = contextBroker; 693 } 694 695 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException { 696 // Build the object name for the destination 697 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList(); 698 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=" 699 + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination=" 700 + JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); 701 return objectName; 702 } 703 704 protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException { 705 // Build the object name for the producer info 706 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList(); 707 708 String destinationType = "destinationType="; 709 String destinationName = "destinationName="; 710 711 if (producerInfo.getDestination() == null) { 712 destinationType += "Dynamic"; 713 destinationName = null; 714 } else { 715 destinationType += producerInfo.getDestination().getDestinationTypeAsString(); 716 destinationName += JMXSupport.encodeObjectNamePart(producerInfo.getDestination().getPhysicalName()); 717 } 718 719 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId); 720 String producerId = "producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString()); 721 722 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," 723 + "Type=Producer" + "," 724 + destinationType + "," 725 + (destinationName != null ? destinationName + "," : "") 726 + clientId + "," + producerId); 727 return objectName; 728 } 729 730 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { 731 ObjectName objectName = null; 732 try { 733 objectName = createObjectName(strategy); 734 if (!registeredMBeans.contains(objectName)) { 735 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy); 736 AnnotatedMBean.registerMBean(managementContext, view, objectName); 737 registeredMBeans.add(objectName); 738 } 739 } catch (Exception e) { 740 LOG.warn("Failed to register MBean: " + strategy); 741 LOG.debug("Failure reason: " + e, e); 742 } 743 return objectName; 744 } 745 746 protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException { 747 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList(); 748 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") 749 + "," + "Type=RecoveredXaTransaction" 750 + "," + "Xid=" 751 + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString())); 752 return objectName; 753 } 754 755 public void registerRecoveredTransactionMBean(XATransaction transaction) { 756 try { 757 ObjectName objectName = createObjectName(transaction); 758 if (!registeredMBeans.contains(objectName)) { 759 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction); 760 AnnotatedMBean.registerMBean(managementContext, view, objectName); 761 registeredMBeans.add(objectName); 762 } 763 } catch (Exception e) { 764 LOG.warn("Failed to register prepared transaction MBean: " + transaction); 765 LOG.debug("Failure reason: " + e, e); 766 } 767 } 768 769 public void unregister(XATransaction transaction) { 770 try { 771 ObjectName objectName = createObjectName(transaction); 772 if (registeredMBeans.remove(objectName)) { 773 try { 774 managementContext.unregisterMBean(objectName); 775 } catch (Throwable e) { 776 LOG.warn("Failed to unregister MBean: " + objectName); 777 LOG.debug("Failure reason: " + e, e); 778 } 779 } 780 } catch (Exception e) { 781 LOG.warn("Failed to create object name to unregister " + transaction, e); 782 } 783 } 784 785 private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{ 786 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList(); 787 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," 788 + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName())); 789 return objectName; 790 } 791 792 public ObjectName getSubscriberObjectName(Subscription key) { 793 return subscriptionMap.get(key); 794 } 795 796 public Subscription getSubscriber(ObjectName key) { 797 Subscription sub = null; 798 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) { 799 if (entry.getValue().equals(key)) { 800 sub = entry.getKey(); 801 break; 802 } 803 } 804 return sub; 805 } 806 }