001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Hashtable;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.CopyOnWriteArraySet;
030    import java.util.concurrent.ThreadPoolExecutor;
031    
032    import javax.management.InstanceNotFoundException;
033    import javax.management.MalformedObjectNameException;
034    import javax.management.ObjectName;
035    import javax.management.openmbean.CompositeData;
036    import javax.management.openmbean.CompositeDataSupport;
037    import javax.management.openmbean.CompositeType;
038    import javax.management.openmbean.OpenDataException;
039    import javax.management.openmbean.TabularData;
040    import javax.management.openmbean.TabularDataSupport;
041    import javax.management.openmbean.TabularType;
042    
043    import org.apache.activemq.broker.Broker;
044    import org.apache.activemq.broker.BrokerService;
045    import org.apache.activemq.broker.ConnectionContext;
046    import org.apache.activemq.broker.ProducerBrokerExchange;
047    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048    import org.apache.activemq.broker.region.Destination;
049    import org.apache.activemq.broker.region.DestinationFactory;
050    import org.apache.activemq.broker.region.DestinationFactoryImpl;
051    import org.apache.activemq.broker.region.DestinationInterceptor;
052    import org.apache.activemq.broker.region.Queue;
053    import org.apache.activemq.broker.region.Region;
054    import org.apache.activemq.broker.region.RegionBroker;
055    import org.apache.activemq.broker.region.Subscription;
056    import org.apache.activemq.broker.region.Topic;
057    import org.apache.activemq.broker.region.TopicRegion;
058    import org.apache.activemq.broker.region.TopicSubscription;
059    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
060    import org.apache.activemq.command.ActiveMQDestination;
061    import org.apache.activemq.command.ActiveMQMessage;
062    import org.apache.activemq.command.ActiveMQTopic;
063    import org.apache.activemq.command.ConsumerInfo;
064    import org.apache.activemq.command.Message;
065    import org.apache.activemq.command.MessageId;
066    import org.apache.activemq.command.ProducerInfo;
067    import org.apache.activemq.command.SubscriptionInfo;
068    import org.apache.activemq.store.MessageRecoveryListener;
069    import org.apache.activemq.store.PersistenceAdapter;
070    import org.apache.activemq.store.TopicMessageStore;
071    import org.apache.activemq.thread.Scheduler;
072    import org.apache.activemq.thread.TaskRunnerFactory;
073    import org.apache.activemq.transaction.XATransaction;
074    import org.apache.activemq.usage.SystemUsage;
075    import org.apache.activemq.util.JMXSupport;
076    import org.apache.activemq.util.ServiceStopper;
077    import org.apache.activemq.util.SubscriptionKey;
078    import org.slf4j.Logger;
079    import org.slf4j.LoggerFactory;
080    
081    public class ManagedRegionBroker extends RegionBroker {
082        private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
083        private final ManagementContext managementContext;
084        private final ObjectName brokerObjectName;
085        private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
086        private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
087        private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
088        private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
089        private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090        private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095        private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096        private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097        private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098        private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099        private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100        private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
101        private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
102        private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
103        /* This is the first broker in the broker interceptor chain. */
104        private Broker contextBroker;
105    
106        public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
107                                   DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
108            super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
109            this.managementContext = context;
110            this.brokerObjectName = brokerObjectName;
111        }
112    
113        @Override
114        public void start() throws Exception {
115            super.start();
116            // build all existing durable subscriptions
117            buildExistingSubscriptions();
118        }
119    
120        @Override
121        protected void doStop(ServiceStopper stopper) {
122            super.doStop(stopper);
123            // lets remove any mbeans not yet removed
124            for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
125                ObjectName name = iter.next();
126                try {
127                    managementContext.unregisterMBean(name);
128                } catch (InstanceNotFoundException e) {
129                    LOG.warn("The MBean: " + name + " is no longer registered with JMX");
130                } catch (Exception e) {
131                    stopper.onException(this, e);
132                }
133            }
134            registeredMBeans.clear();
135        }
136    
137        @Override
138        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
139            return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
140        }
141    
142        @Override
143        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
144            return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
145        }
146    
147        @Override
148        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
149            return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
150        }
151    
152        @Override
153        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
154            return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
155        }
156    
157        public void register(ActiveMQDestination destName, Destination destination) {
158            // TODO refactor to allow views for custom destinations
159            try {
160                ObjectName objectName = createObjectName(destName);
161                DestinationView view;
162                if (destination instanceof Queue) {
163                    view = new QueueView(this, (Queue)destination);
164                } else if (destination instanceof Topic) {
165                    view = new TopicView(this, (Topic)destination);
166                } else {
167                    view = null;
168                    LOG.warn("JMX View is not supported for custom destination: " + destination);
169                }
170                if (view != null) {
171                    registerDestination(objectName, destName, view);
172                }
173            } catch (Exception e) {
174                LOG.error("Failed to register destination " + destName, e);
175            }
176        }
177    
178        public void unregister(ActiveMQDestination destName) {
179            try {
180                ObjectName objectName = createObjectName(destName);
181                unregisterDestination(objectName);
182            } catch (Exception e) {
183                LOG.error("Failed to unregister " + destName, e);
184            }
185        }
186    
187        public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
188            String connectionClientId = context.getClientId();
189            ObjectName brokerJmxObjectName = brokerObjectName;
190            String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
191            SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
192            try {
193                ObjectName objectName = new ObjectName(objectNameStr);
194                SubscriptionView view;
195                if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
196                    // add offline subscribers to inactive list
197                    SubscriptionInfo info = new SubscriptionInfo();
198                    info.setClientId(context.getClientId());
199                    info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
200                    info.setDestination(sub.getConsumerInfo().getDestination());
201                    info.setSelector(sub.getSelector());
202                    addInactiveSubscription(key, info, sub);
203                } else {
204                    String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
205                    if (sub.getConsumerInfo().isDurable()) {
206                        view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
207                    } else {
208                        if (sub instanceof TopicSubscription) {
209                            view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
210                        } else {
211                            view = new SubscriptionView(context.getClientId(), userName, sub);
212                        }
213                    }
214                    registerSubscription(objectName, sub.getConsumerInfo(), key, view);
215                }
216                subscriptionMap.put(sub, objectName);
217                return objectName;
218            } catch (Exception e) {
219                LOG.error("Failed to register subscription " + sub, e);
220                return null;
221            }
222        }
223    
224        public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
225            Hashtable<String, String> map = brokerJmxObjectName.getKeyPropertyList();
226            String brokerDomain = brokerJmxObjectName.getDomain();
227            String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
228            String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
229            String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
230            String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
231            String persistentMode = "persistentMode=";
232            String consumerId = "";
233            if (info.isDurable()) {
234                persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
235            } else {
236                persistentMode += "Non-Durable";
237                if (info.getConsumerId() != null) {
238                    consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
239                }
240            }
241            objectNameStr += persistentMode + ",";
242            objectNameStr += destinationType + ",";
243            objectNameStr += destinationName + ",";
244            objectNameStr += clientId;
245            objectNameStr += consumerId;
246            return objectNameStr;
247        }
248    
249        @Override
250        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
251            Subscription sub = super.addConsumer(context, info);
252            SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
253            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
254            if (inactiveName != null) {
255                // if it was inactive, register it
256                registerSubscription(context, sub);
257            }
258            return sub;
259        }
260    
261        @Override
262        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
263            for (Subscription sub : subscriptionMap.keySet()) {
264                if (sub.getConsumerInfo().equals(info)) {
265                   // unregister all consumer subs
266                   unregisterSubscription(subscriptionMap.get(sub), true);
267                }
268            }
269            super.removeConsumer(context, info);
270        }
271    
272        @Override
273        public void addProducer(ConnectionContext context, ProducerInfo info)
274                throws Exception {
275            super.addProducer(context, info);
276            String connectionClientId = context.getClientId();
277            ObjectName objectName = createObjectName(info, connectionClientId);
278            String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
279            ProducerView view = new ProducerView(info, connectionClientId, userName, this);
280            registerProducer(objectName, info.getDestination(), view);
281        }
282    
283        @Override
284        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
285            ObjectName objectName = createObjectName(info, context.getClientId());
286            unregisterProducer(objectName);
287            super.removeProducer(context, info);
288        }
289    
290        @Override
291        public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
292            if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
293                ProducerInfo info = exchange.getProducerState().getInfo();
294                if (info.getDestination() == null && info.getProducerId() != null) {
295                    ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
296                    ProducerView view = this.dynamicDestinationProducers.get(objectName);
297                    if (view != null) {
298                        ActiveMQDestination dest = message.getDestination();
299                        if (dest != null) {
300                            view.setLastUsedDestinationName(dest);
301                        }
302                    }
303                }
304             }
305            super.send(exchange, message);
306        }
307    
308        public void unregisterSubscription(Subscription sub) {
309            ObjectName name = subscriptionMap.remove(sub);
310            if (name != null) {
311                try {
312                    SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
313                    ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
314                    if (inactiveName != null) {
315                        inactiveDurableTopicSubscribers.remove(inactiveName);
316                        managementContext.unregisterMBean(inactiveName);
317                    }
318                } catch (Exception e) {
319                    LOG.error("Failed to unregister subscription " + sub, e);
320                }
321            }
322        }
323    
324        protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
325            if (dest.isQueue()) {
326                if (dest.isTemporary()) {
327                    temporaryQueues.put(key, view);
328                } else {
329                    queues.put(key, view);
330                }
331            } else {
332                if (dest.isTemporary()) {
333                    temporaryTopics.put(key, view);
334                } else {
335                    topics.put(key, view);
336                }
337            }
338            try {
339                AnnotatedMBean.registerMBean(managementContext, view, key);
340                registeredMBeans.add(key);
341            } catch (Throwable e) {
342                LOG.warn("Failed to register MBean: " + key);
343                LOG.debug("Failure reason: " + e, e);
344            }
345        }
346    
347        protected void unregisterDestination(ObjectName key) throws Exception {
348    
349            DestinationView view = removeAndRemember(topics, key, null);
350            view = removeAndRemember(queues, key, view);
351            view = removeAndRemember(temporaryQueues, key, view);
352            view = removeAndRemember(temporaryTopics, key, view);
353            if (registeredMBeans.remove(key)) {
354                try {
355                    managementContext.unregisterMBean(key);
356                } catch (Throwable e) {
357                    LOG.warn("Failed to unregister MBean: " + key);
358                    LOG.debug("Failure reason: " + e, e);
359                }
360            }
361            if (view != null) {
362                key = view.getSlowConsumerStrategy();
363                if (key!= null && registeredMBeans.remove(key)) {
364                    try {
365                        managementContext.unregisterMBean(key);
366                    } catch (Throwable e) {
367                        LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
368                        LOG.debug("Failure reason: " + e, e);
369                    }
370                }
371            }
372        }
373    
374        protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
375    
376            if (dest != null) {
377                if (dest.isQueue()) {
378                    if (dest.isTemporary()) {
379                        temporaryQueueProducers.put(key, view);
380                    } else {
381                        queueProducers.put(key, view);
382                    }
383                } else {
384                    if (dest.isTemporary()) {
385                        temporaryTopicProducers.put(key, view);
386                    } else {
387                        topicProducers.put(key, view);
388                    }
389                }
390            } else {
391                dynamicDestinationProducers.put(key, view);
392            }
393    
394            try {
395                AnnotatedMBean.registerMBean(managementContext, view, key);
396                registeredMBeans.add(key);
397            } catch (Throwable e) {
398                LOG.warn("Failed to register MBean: " + key);
399                LOG.debug("Failure reason: " + e, e);
400            }
401        }
402    
403        protected void unregisterProducer(ObjectName key) throws Exception {
404            queueProducers.remove(key);
405            topicProducers.remove(key);
406            temporaryQueueProducers.remove(key);
407            temporaryTopicProducers.remove(key);
408            dynamicDestinationProducers.remove(key);
409            if (registeredMBeans.remove(key)) {
410                try {
411                    managementContext.unregisterMBean(key);
412                } catch (Throwable e) {
413                    LOG.warn("Failed to unregister MBean: " + key);
414                    LOG.debug("Failure reason: " + e, e);
415                }
416            }
417        }
418    
419        private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
420            DestinationView candidate = map.remove(key);
421            if (candidate != null && view == null) {
422                view = candidate;
423            }
424            return candidate != null ? candidate : view;
425        }
426    
427        protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
428            ActiveMQDestination dest = info.getDestination();
429            if (dest.isQueue()) {
430                if (dest.isTemporary()) {
431                    temporaryQueueSubscribers.put(key, view);
432                } else {
433                    queueSubscribers.put(key, view);
434                }
435            } else {
436                if (dest.isTemporary()) {
437                    temporaryTopicSubscribers.put(key, view);
438                } else {
439                    if (info.isDurable()) {
440                        durableTopicSubscribers.put(key, view);
441                        // unregister any inactive durable subs
442                        try {
443                            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
444                            if (inactiveName != null) {
445                                inactiveDurableTopicSubscribers.remove(inactiveName);
446                                registeredMBeans.remove(inactiveName);
447                                managementContext.unregisterMBean(inactiveName);
448                            }
449                        } catch (Throwable e) {
450                            LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
451                        }
452                    } else {
453                        topicSubscribers.put(key, view);
454                    }
455                }
456            }
457    
458            try {
459                AnnotatedMBean.registerMBean(managementContext, view, key);
460                registeredMBeans.add(key);
461            } catch (Throwable e) {
462                LOG.warn("Failed to register MBean: " + key);
463                LOG.debug("Failure reason: " + e, e);
464            }
465    
466        }
467    
468        protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
469            queueSubscribers.remove(key);
470            topicSubscribers.remove(key);
471            temporaryQueueSubscribers.remove(key);
472            temporaryTopicSubscribers.remove(key);
473            if (registeredMBeans.remove(key)) {
474                try {
475                    managementContext.unregisterMBean(key);
476                } catch (Throwable e) {
477                    LOG.warn("Failed to unregister MBean: " + key);
478                    LOG.debug("Failure reason: " + e, e);
479                }
480            }
481            DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
482            if (view != null) {
483                // need to put this back in the inactive list
484                SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
485                if (addToInactive) {
486                    SubscriptionInfo info = new SubscriptionInfo();
487                    info.setClientId(subscriptionKey.getClientId());
488                    info.setSubscriptionName(subscriptionKey.getSubscriptionName());
489                    info.setDestination(new ActiveMQTopic(view.getDestinationName()));
490                    info.setSelector(view.getSelector());
491                    addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
492                }
493            }
494        }
495    
496        protected void buildExistingSubscriptions() throws Exception {
497            Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
498            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
499            if (destinations != null) {
500                for (ActiveMQDestination dest : destinations) {
501                    if (dest.isTopic()) {
502                        SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
503                        if (infos != null) {
504                            for (int i = 0; i < infos.length; i++) {
505                                SubscriptionInfo info = infos[i];
506                                SubscriptionKey key = new SubscriptionKey(info);
507                                if (!alreadyKnown(key)) {
508                                    LOG.debug("Restoring durable subscription mbean: " + info);
509                                    subscriptions.put(key, info);
510                                }
511                            }
512                        }
513                    }
514                }
515            }
516    
517            for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
518                addInactiveSubscription(entry.getKey(), entry.getValue(), null);
519            }
520        }
521    
522        private boolean alreadyKnown(SubscriptionKey key) {
523            boolean known = false;
524            known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
525            if (LOG.isTraceEnabled()) {
526                LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
527            }
528            return known;
529        }
530    
531        protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
532            try {
533                ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
534                ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
535                SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
536    
537                try {
538                    AnnotatedMBean.registerMBean(managementContext, view, objectName);
539                    registeredMBeans.add(objectName);
540                } catch (Throwable e) {
541                    LOG.warn("Failed to register MBean: " + key);
542                    LOG.debug("Failure reason: " + e, e);
543                }
544    
545                inactiveDurableTopicSubscribers.put(objectName, view);
546                subscriptionKeys.put(key, objectName);
547            } catch (Exception e) {
548                LOG.error("Failed to register subscription " + info, e);
549            }
550        }
551    
552        public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
553            List<Message> messages = getSubscriberMessages(view);
554            CompositeData c[] = new CompositeData[messages.size()];
555            for (int i = 0; i < c.length; i++) {
556                try {
557                    c[i] = OpenTypeSupport.convert(messages.get(i));
558                } catch (Throwable e) {
559                    LOG.error("failed to browse : " + view, e);
560                }
561            }
562            return c;
563        }
564    
565        public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
566            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
567            List<Message> messages = getSubscriberMessages(view);
568            CompositeType ct = factory.getCompositeType();
569            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
570            TabularDataSupport rc = new TabularDataSupport(tt);
571            for (int i = 0; i < messages.size(); i++) {
572                rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
573            }
574            return rc;
575        }
576    
577        protected List<Message> getSubscriberMessages(SubscriptionView view) {
578            // TODO It is very dangerous operation for big backlogs
579            if (!(destinationFactory instanceof DestinationFactoryImpl)) {
580                throw new RuntimeException("unsupported by " + destinationFactory);
581            }
582            PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
583            final List<Message> result = new ArrayList<Message>();
584            try {
585                ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
586                TopicMessageStore store = adapter.createTopicMessageStore(topic);
587                store.recover(new MessageRecoveryListener() {
588                    public boolean recoverMessage(Message message) throws Exception {
589                        result.add(message);
590                        return true;
591                    }
592    
593                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
594                        throw new RuntimeException("Should not be called.");
595                    }
596    
597                    public boolean hasSpace() {
598                        return true;
599                    }
600    
601                    public boolean isDuplicate(MessageId id) {
602                        return false;
603                    }
604                });
605            } catch (Throwable e) {
606                LOG.error("Failed to browse messages for Subscription " + view, e);
607            }
608            return result;
609    
610        }
611    
612        protected ObjectName[] getTopics() {
613            Set<ObjectName> set = topics.keySet();
614            return set.toArray(new ObjectName[set.size()]);
615        }
616    
617        protected ObjectName[] getQueues() {
618            Set<ObjectName> set = queues.keySet();
619            return set.toArray(new ObjectName[set.size()]);
620        }
621    
622        protected ObjectName[] getTemporaryTopics() {
623            Set<ObjectName> set = temporaryTopics.keySet();
624            return set.toArray(new ObjectName[set.size()]);
625        }
626    
627        protected ObjectName[] getTemporaryQueues() {
628            Set<ObjectName> set = temporaryQueues.keySet();
629            return set.toArray(new ObjectName[set.size()]);
630        }
631    
632        protected ObjectName[] getTopicSubscribers() {
633            Set<ObjectName> set = topicSubscribers.keySet();
634            return set.toArray(new ObjectName[set.size()]);
635        }
636    
637        protected ObjectName[] getDurableTopicSubscribers() {
638            Set<ObjectName> set = durableTopicSubscribers.keySet();
639            return set.toArray(new ObjectName[set.size()]);
640        }
641    
642        protected ObjectName[] getQueueSubscribers() {
643            Set<ObjectName> set = queueSubscribers.keySet();
644            return set.toArray(new ObjectName[set.size()]);
645        }
646    
647        protected ObjectName[] getTemporaryTopicSubscribers() {
648            Set<ObjectName> set = temporaryTopicSubscribers.keySet();
649            return set.toArray(new ObjectName[set.size()]);
650        }
651    
652        protected ObjectName[] getTemporaryQueueSubscribers() {
653            Set<ObjectName> set = temporaryQueueSubscribers.keySet();
654            return set.toArray(new ObjectName[set.size()]);
655        }
656    
657        protected ObjectName[] getInactiveDurableTopicSubscribers() {
658            Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
659            return set.toArray(new ObjectName[set.size()]);
660        }
661    
662        protected ObjectName[] getTopicProducers() {
663            Set<ObjectName> set = topicProducers.keySet();
664            return set.toArray(new ObjectName[set.size()]);
665        }
666    
667        protected ObjectName[] getQueueProducers() {
668            Set<ObjectName> set = queueProducers.keySet();
669            return set.toArray(new ObjectName[set.size()]);
670        }
671    
672        protected ObjectName[] getTemporaryTopicProducers() {
673            Set<ObjectName> set = temporaryTopicProducers.keySet();
674            return set.toArray(new ObjectName[set.size()]);
675        }
676    
677        protected ObjectName[] getTemporaryQueueProducers() {
678            Set<ObjectName> set = temporaryQueueProducers.keySet();
679            return set.toArray(new ObjectName[set.size()]);
680        }
681    
682        protected ObjectName[] getDynamicDestinationProducers() {
683            Set<ObjectName> set = dynamicDestinationProducers.keySet();
684            return set.toArray(new ObjectName[set.size()]);
685        }
686    
687        public Broker getContextBroker() {
688            return contextBroker;
689        }
690    
691        public void setContextBroker(Broker contextBroker) {
692            this.contextBroker = contextBroker;
693        }
694    
695        protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
696            // Build the object name for the destination
697            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
698            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
699                                                   + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
700                                                   + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
701            return objectName;
702        }
703    
704        protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException {
705            // Build the object name for the producer info
706            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
707    
708            String destinationType = "destinationType=";
709            String destinationName = "destinationName=";
710    
711            if (producerInfo.getDestination() == null) {
712                destinationType += "Dynamic";
713                destinationName = null;
714            } else {
715                destinationType += producerInfo.getDestination().getDestinationTypeAsString();
716                destinationName += JMXSupport.encodeObjectNamePart(producerInfo.getDestination().getPhysicalName());
717            }
718    
719            String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
720            String producerId = "producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
721    
722            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
723                                                   + "Type=Producer" + ","
724                                                   + destinationType + ","
725                                                   + (destinationName != null ? destinationName + "," : "")
726                                                   + clientId + "," + producerId);
727            return objectName;
728        }
729    
730        public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
731            ObjectName objectName = null;
732            try {
733                objectName = createObjectName(strategy);
734                if (!registeredMBeans.contains(objectName))  {
735                    AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
736                    AnnotatedMBean.registerMBean(managementContext, view, objectName);
737                    registeredMBeans.add(objectName);
738                }
739            } catch (Exception e) {
740                LOG.warn("Failed to register MBean: " + strategy);
741                LOG.debug("Failure reason: " + e, e);
742            }
743            return objectName;
744        }
745    
746        protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
747            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
748            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
749                                                   + "," + "Type=RecoveredXaTransaction"
750                                                   + "," + "Xid="
751                                                   + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
752            return objectName;
753        }
754    
755        public void registerRecoveredTransactionMBean(XATransaction transaction) {
756            try {
757                ObjectName objectName = createObjectName(transaction);
758                if (!registeredMBeans.contains(objectName))  {
759                    RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
760                    AnnotatedMBean.registerMBean(managementContext, view, objectName);
761                    registeredMBeans.add(objectName);
762                }
763            } catch (Exception e) {
764                LOG.warn("Failed to register prepared transaction MBean: " + transaction);
765                LOG.debug("Failure reason: " + e, e);
766            }
767        }
768    
769        public void unregister(XATransaction transaction) {
770            try {
771                ObjectName objectName = createObjectName(transaction);
772                if (registeredMBeans.remove(objectName)) {
773                    try {
774                        managementContext.unregisterMBean(objectName);
775                    } catch (Throwable e) {
776                        LOG.warn("Failed to unregister MBean: " + objectName);
777                        LOG.debug("Failure reason: " + e, e);
778                    }
779                }
780            } catch (Exception e) {
781                LOG.warn("Failed to create object name to unregister " + transaction, e);
782            }
783        }
784    
785        private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
786            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
787            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
788                                + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
789            return objectName;
790        }
791    
792        public ObjectName getSubscriberObjectName(Subscription key) {
793            return subscriptionMap.get(key);
794        }
795    
796        public Subscription getSubscriber(ObjectName key) {
797            Subscription sub = null;
798            for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
799                if (entry.getValue().equals(key)) {
800                    sub = entry.getKey();
801                    break;
802                }
803            }
804            return sub;
805        }
806    }