001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Hashtable;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArraySet;
030import java.util.concurrent.ThreadPoolExecutor;
031
032import javax.management.InstanceNotFoundException;
033import javax.management.MalformedObjectNameException;
034import javax.management.ObjectName;
035import javax.management.openmbean.CompositeData;
036import javax.management.openmbean.CompositeDataSupport;
037import javax.management.openmbean.CompositeType;
038import javax.management.openmbean.OpenDataException;
039import javax.management.openmbean.TabularData;
040import javax.management.openmbean.TabularDataSupport;
041import javax.management.openmbean.TabularType;
042
043import org.apache.activemq.broker.Broker;
044import org.apache.activemq.broker.BrokerService;
045import org.apache.activemq.broker.ConnectionContext;
046import org.apache.activemq.broker.ProducerBrokerExchange;
047import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048import org.apache.activemq.broker.region.Destination;
049import org.apache.activemq.broker.region.DestinationFactory;
050import org.apache.activemq.broker.region.DestinationFactoryImpl;
051import org.apache.activemq.broker.region.DestinationInterceptor;
052import org.apache.activemq.broker.region.Queue;
053import org.apache.activemq.broker.region.Region;
054import org.apache.activemq.broker.region.RegionBroker;
055import org.apache.activemq.broker.region.Subscription;
056import org.apache.activemq.broker.region.Topic;
057import org.apache.activemq.broker.region.TopicRegion;
058import org.apache.activemq.broker.region.TopicSubscription;
059import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
060import org.apache.activemq.command.ActiveMQDestination;
061import org.apache.activemq.command.ActiveMQMessage;
062import org.apache.activemq.command.ActiveMQTopic;
063import org.apache.activemq.command.ConsumerInfo;
064import org.apache.activemq.command.Message;
065import org.apache.activemq.command.MessageId;
066import org.apache.activemq.command.ProducerInfo;
067import org.apache.activemq.command.SubscriptionInfo;
068import org.apache.activemq.store.MessageRecoveryListener;
069import org.apache.activemq.store.PersistenceAdapter;
070import org.apache.activemq.store.TopicMessageStore;
071import org.apache.activemq.thread.Scheduler;
072import org.apache.activemq.thread.TaskRunnerFactory;
073import org.apache.activemq.transaction.XATransaction;
074import org.apache.activemq.usage.SystemUsage;
075import org.apache.activemq.util.JMXSupport;
076import org.apache.activemq.util.ServiceStopper;
077import org.apache.activemq.util.SubscriptionKey;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081public class ManagedRegionBroker extends RegionBroker {
082    private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
083    private final ManagementContext managementContext;
084    private final ObjectName brokerObjectName;
085    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
086    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
087    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
088    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
089    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
101    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
102    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
103    /* This is the first broker in the broker interceptor chain. */
104    private Broker contextBroker;
105
106    public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
107                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
108        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
109        this.managementContext = context;
110        this.brokerObjectName = brokerObjectName;
111    }
112
113    @Override
114    public void start() throws Exception {
115        super.start();
116        // build all existing durable subscriptions
117        buildExistingSubscriptions();
118    }
119
120    @Override
121    protected void doStop(ServiceStopper stopper) {
122        super.doStop(stopper);
123        // lets remove any mbeans not yet removed
124        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
125            ObjectName name = iter.next();
126            try {
127                managementContext.unregisterMBean(name);
128            } catch (InstanceNotFoundException e) {
129                LOG.warn("The MBean: " + name + " is no longer registered with JMX");
130            } catch (Exception e) {
131                stopper.onException(this, e);
132            }
133        }
134        registeredMBeans.clear();
135    }
136
137    @Override
138    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
139        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
140    }
141
142    @Override
143    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
144        return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
145    }
146
147    @Override
148    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
149        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
150    }
151
152    @Override
153    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
154        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
155    }
156
157    public void register(ActiveMQDestination destName, Destination destination) {
158        // TODO refactor to allow views for custom destinations
159        try {
160            ObjectName objectName = createObjectName(destName);
161            DestinationView view;
162            if (destination instanceof Queue) {
163                view = new QueueView(this, (Queue)destination);
164            } else if (destination instanceof Topic) {
165                view = new TopicView(this, (Topic)destination);
166            } else {
167                view = null;
168                LOG.warn("JMX View is not supported for custom destination: " + destination);
169            }
170            if (view != null) {
171                registerDestination(objectName, destName, view);
172            }
173        } catch (Exception e) {
174            LOG.error("Failed to register destination " + destName, e);
175        }
176    }
177
178    public void unregister(ActiveMQDestination destName) {
179        try {
180            ObjectName objectName = createObjectName(destName);
181            unregisterDestination(objectName);
182        } catch (Exception e) {
183            LOG.error("Failed to unregister " + destName, e);
184        }
185    }
186
187    public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
188        String connectionClientId = context.getClientId();
189        ObjectName brokerJmxObjectName = brokerObjectName;
190        String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
191        SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
192        try {
193            ObjectName objectName = new ObjectName(objectNameStr);
194            SubscriptionView view;
195            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
196                // add offline subscribers to inactive list
197                SubscriptionInfo info = new SubscriptionInfo();
198                info.setClientId(context.getClientId());
199                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
200                info.setDestination(sub.getConsumerInfo().getDestination());
201                info.setSelector(sub.getSelector());
202                addInactiveSubscription(key, info, sub);
203            } else {
204                String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
205                if (sub.getConsumerInfo().isDurable()) {
206                    view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
207                } else {
208                    if (sub instanceof TopicSubscription) {
209                        view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
210                    } else {
211                        view = new SubscriptionView(context.getClientId(), userName, sub);
212                    }
213                }
214                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
215            }
216            subscriptionMap.put(sub, objectName);
217            return objectName;
218        } catch (Exception e) {
219            LOG.error("Failed to register subscription " + sub, e);
220            return null;
221        }
222    }
223
224    public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
225        Hashtable<String, String> map = brokerJmxObjectName.getKeyPropertyList();
226        String brokerDomain = brokerJmxObjectName.getDomain();
227        String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
228        String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
229        String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
230        String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
231        String persistentMode = "persistentMode=";
232        String consumerId = "";
233        if (info.isDurable()) {
234            persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
235        } else {
236            persistentMode += "Non-Durable";
237            if (info.getConsumerId() != null) {
238                consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
239            }
240        }
241        objectNameStr += persistentMode + ",";
242        objectNameStr += destinationType + ",";
243        objectNameStr += destinationName + ",";
244        objectNameStr += clientId;
245        objectNameStr += consumerId;
246        return objectNameStr;
247    }
248
249    @Override
250    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
251        Subscription sub = super.addConsumer(context, info);
252        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
253        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
254        if (inactiveName != null) {
255            // if it was inactive, register it
256            registerSubscription(context, sub);
257        }
258        return sub;
259    }
260
261    @Override
262    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
263        for (Subscription sub : subscriptionMap.keySet()) {
264            if (sub.getConsumerInfo().equals(info)) {
265               // unregister all consumer subs
266               unregisterSubscription(subscriptionMap.get(sub), true);
267            }
268        }
269        super.removeConsumer(context, info);
270    }
271
272    @Override
273    public void addProducer(ConnectionContext context, ProducerInfo info)
274            throws Exception {
275        super.addProducer(context, info);
276        String connectionClientId = context.getClientId();
277        ObjectName objectName = createObjectName(info, connectionClientId);
278        String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
279        ProducerView view = new ProducerView(info, connectionClientId, userName, this);
280        registerProducer(objectName, info.getDestination(), view);
281    }
282
283    @Override
284    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
285        ObjectName objectName = createObjectName(info, context.getClientId());
286        unregisterProducer(objectName);
287        super.removeProducer(context, info);
288    }
289
290    @Override
291    public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
292        if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
293            ProducerInfo info = exchange.getProducerState().getInfo();
294            if (info.getDestination() == null && info.getProducerId() != null) {
295                ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
296                ProducerView view = this.dynamicDestinationProducers.get(objectName);
297                if (view != null) {
298                    ActiveMQDestination dest = message.getDestination();
299                    if (dest != null) {
300                        view.setLastUsedDestinationName(dest);
301                    }
302                }
303            }
304         }
305        super.send(exchange, message);
306    }
307
308    public void unregisterSubscription(Subscription sub) {
309        ObjectName name = subscriptionMap.remove(sub);
310        if (name != null) {
311            try {
312                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
313                ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
314                if (inactiveName != null) {
315                    inactiveDurableTopicSubscribers.remove(inactiveName);
316                    managementContext.unregisterMBean(inactiveName);
317                }
318            } catch (Exception e) {
319                LOG.error("Failed to unregister subscription " + sub, e);
320            }
321        }
322    }
323
324    protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
325        if (dest.isQueue()) {
326            if (dest.isTemporary()) {
327                temporaryQueues.put(key, view);
328            } else {
329                queues.put(key, view);
330            }
331        } else {
332            if (dest.isTemporary()) {
333                temporaryTopics.put(key, view);
334            } else {
335                topics.put(key, view);
336            }
337        }
338        try {
339            AnnotatedMBean.registerMBean(managementContext, view, key);
340            registeredMBeans.add(key);
341        } catch (Throwable e) {
342            LOG.warn("Failed to register MBean: " + key);
343            LOG.debug("Failure reason: " + e, e);
344        }
345    }
346
347    protected void unregisterDestination(ObjectName key) throws Exception {
348
349        DestinationView view = removeAndRemember(topics, key, null);
350        view = removeAndRemember(queues, key, view);
351        view = removeAndRemember(temporaryQueues, key, view);
352        view = removeAndRemember(temporaryTopics, key, view);
353        if (registeredMBeans.remove(key)) {
354            try {
355                managementContext.unregisterMBean(key);
356            } catch (Throwable e) {
357                LOG.warn("Failed to unregister MBean: " + key);
358                LOG.debug("Failure reason: " + e, e);
359            }
360        }
361        if (view != null) {
362            key = view.getSlowConsumerStrategy();
363            if (key!= null && registeredMBeans.remove(key)) {
364                try {
365                    managementContext.unregisterMBean(key);
366                } catch (Throwable e) {
367                    LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
368                    LOG.debug("Failure reason: " + e, e);
369                }
370            }
371        }
372    }
373
374    protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
375
376        if (dest != null) {
377            if (dest.isQueue()) {
378                if (dest.isTemporary()) {
379                    temporaryQueueProducers.put(key, view);
380                } else {
381                    queueProducers.put(key, view);
382                }
383            } else {
384                if (dest.isTemporary()) {
385                    temporaryTopicProducers.put(key, view);
386                } else {
387                    topicProducers.put(key, view);
388                }
389            }
390        } else {
391            dynamicDestinationProducers.put(key, view);
392        }
393
394        try {
395            AnnotatedMBean.registerMBean(managementContext, view, key);
396            registeredMBeans.add(key);
397        } catch (Throwable e) {
398            LOG.warn("Failed to register MBean: " + key);
399            LOG.debug("Failure reason: " + e, e);
400        }
401    }
402
403    protected void unregisterProducer(ObjectName key) throws Exception {
404        queueProducers.remove(key);
405        topicProducers.remove(key);
406        temporaryQueueProducers.remove(key);
407        temporaryTopicProducers.remove(key);
408        dynamicDestinationProducers.remove(key);
409        if (registeredMBeans.remove(key)) {
410            try {
411                managementContext.unregisterMBean(key);
412            } catch (Throwable e) {
413                LOG.warn("Failed to unregister MBean: " + key);
414                LOG.debug("Failure reason: " + e, e);
415            }
416        }
417    }
418
419    private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
420        DestinationView candidate = map.remove(key);
421        if (candidate != null && view == null) {
422            view = candidate;
423        }
424        return candidate != null ? candidate : view;
425    }
426
427    protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
428        ActiveMQDestination dest = info.getDestination();
429        if (dest.isQueue()) {
430            if (dest.isTemporary()) {
431                temporaryQueueSubscribers.put(key, view);
432            } else {
433                queueSubscribers.put(key, view);
434            }
435        } else {
436            if (dest.isTemporary()) {
437                temporaryTopicSubscribers.put(key, view);
438            } else {
439                if (info.isDurable()) {
440                    durableTopicSubscribers.put(key, view);
441                    // unregister any inactive durable subs
442                    try {
443                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
444                        if (inactiveName != null) {
445                            inactiveDurableTopicSubscribers.remove(inactiveName);
446                            registeredMBeans.remove(inactiveName);
447                            managementContext.unregisterMBean(inactiveName);
448                        }
449                    } catch (Throwable e) {
450                        LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
451                    }
452                } else {
453                    topicSubscribers.put(key, view);
454                }
455            }
456        }
457
458        try {
459            AnnotatedMBean.registerMBean(managementContext, view, key);
460            registeredMBeans.add(key);
461        } catch (Throwable e) {
462            LOG.warn("Failed to register MBean: " + key);
463            LOG.debug("Failure reason: " + e, e);
464        }
465
466    }
467
468    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
469        queueSubscribers.remove(key);
470        topicSubscribers.remove(key);
471        temporaryQueueSubscribers.remove(key);
472        temporaryTopicSubscribers.remove(key);
473        if (registeredMBeans.remove(key)) {
474            try {
475                managementContext.unregisterMBean(key);
476            } catch (Throwable e) {
477                LOG.warn("Failed to unregister MBean: " + key);
478                LOG.debug("Failure reason: " + e, e);
479            }
480        }
481        DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
482        if (view != null) {
483            // need to put this back in the inactive list
484            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
485            if (addToInactive) {
486                SubscriptionInfo info = new SubscriptionInfo();
487                info.setClientId(subscriptionKey.getClientId());
488                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
489                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
490                info.setSelector(view.getSelector());
491                addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
492            }
493        }
494    }
495
496    protected void buildExistingSubscriptions() throws Exception {
497        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
498        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
499        if (destinations != null) {
500            for (ActiveMQDestination dest : destinations) {
501                if (dest.isTopic()) {
502                    SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
503                    if (infos != null) {
504                        for (int i = 0; i < infos.length; i++) {
505                            SubscriptionInfo info = infos[i];
506                            SubscriptionKey key = new SubscriptionKey(info);
507                            if (!alreadyKnown(key)) {
508                                LOG.debug("Restoring durable subscription mbean: " + info);
509                                subscriptions.put(key, info);
510                            }
511                        }
512                    }
513                }
514            }
515        }
516
517        for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
518            addInactiveSubscription(entry.getKey(), entry.getValue(), null);
519        }
520    }
521
522    private boolean alreadyKnown(SubscriptionKey key) {
523        boolean known = false;
524        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
525        if (LOG.isTraceEnabled()) {
526            LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
527        }
528        return known;
529    }
530
531    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
532        try {
533            ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
534            ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
535            SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
536
537            try {
538                AnnotatedMBean.registerMBean(managementContext, view, objectName);
539                registeredMBeans.add(objectName);
540            } catch (Throwable e) {
541                LOG.warn("Failed to register MBean: " + key);
542                LOG.debug("Failure reason: " + e, e);
543            }
544
545            inactiveDurableTopicSubscribers.put(objectName, view);
546            subscriptionKeys.put(key, objectName);
547        } catch (Exception e) {
548            LOG.error("Failed to register subscription " + info, e);
549        }
550    }
551
552    public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
553        List<Message> messages = getSubscriberMessages(view);
554        CompositeData c[] = new CompositeData[messages.size()];
555        for (int i = 0; i < c.length; i++) {
556            try {
557                c[i] = OpenTypeSupport.convert(messages.get(i));
558            } catch (Throwable e) {
559                LOG.error("failed to browse : " + view, e);
560            }
561        }
562        return c;
563    }
564
565    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
566        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
567        List<Message> messages = getSubscriberMessages(view);
568        CompositeType ct = factory.getCompositeType();
569        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
570        TabularDataSupport rc = new TabularDataSupport(tt);
571        for (int i = 0; i < messages.size(); i++) {
572            rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
573        }
574        return rc;
575    }
576
577    protected List<Message> getSubscriberMessages(SubscriptionView view) {
578        // TODO It is very dangerous operation for big backlogs
579        if (!(destinationFactory instanceof DestinationFactoryImpl)) {
580            throw new RuntimeException("unsupported by " + destinationFactory);
581        }
582        PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
583        final List<Message> result = new ArrayList<Message>();
584        try {
585            ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
586            TopicMessageStore store = adapter.createTopicMessageStore(topic);
587            store.recover(new MessageRecoveryListener() {
588                public boolean recoverMessage(Message message) throws Exception {
589                    result.add(message);
590                    return true;
591                }
592
593                public boolean recoverMessageReference(MessageId messageReference) throws Exception {
594                    throw new RuntimeException("Should not be called.");
595                }
596
597                public boolean hasSpace() {
598                    return true;
599                }
600
601                public boolean isDuplicate(MessageId id) {
602                    return false;
603                }
604            });
605        } catch (Throwable e) {
606            LOG.error("Failed to browse messages for Subscription " + view, e);
607        }
608        return result;
609
610    }
611
612    protected ObjectName[] getTopics() {
613        Set<ObjectName> set = topics.keySet();
614        return set.toArray(new ObjectName[set.size()]);
615    }
616
617    protected ObjectName[] getQueues() {
618        Set<ObjectName> set = queues.keySet();
619        return set.toArray(new ObjectName[set.size()]);
620    }
621
622    protected ObjectName[] getTemporaryTopics() {
623        Set<ObjectName> set = temporaryTopics.keySet();
624        return set.toArray(new ObjectName[set.size()]);
625    }
626
627    protected ObjectName[] getTemporaryQueues() {
628        Set<ObjectName> set = temporaryQueues.keySet();
629        return set.toArray(new ObjectName[set.size()]);
630    }
631
632    protected ObjectName[] getTopicSubscribers() {
633        Set<ObjectName> set = topicSubscribers.keySet();
634        return set.toArray(new ObjectName[set.size()]);
635    }
636
637    protected ObjectName[] getDurableTopicSubscribers() {
638        Set<ObjectName> set = durableTopicSubscribers.keySet();
639        return set.toArray(new ObjectName[set.size()]);
640    }
641
642    protected ObjectName[] getQueueSubscribers() {
643        Set<ObjectName> set = queueSubscribers.keySet();
644        return set.toArray(new ObjectName[set.size()]);
645    }
646
647    protected ObjectName[] getTemporaryTopicSubscribers() {
648        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
649        return set.toArray(new ObjectName[set.size()]);
650    }
651
652    protected ObjectName[] getTemporaryQueueSubscribers() {
653        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
654        return set.toArray(new ObjectName[set.size()]);
655    }
656
657    protected ObjectName[] getInactiveDurableTopicSubscribers() {
658        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
659        return set.toArray(new ObjectName[set.size()]);
660    }
661
662    protected ObjectName[] getTopicProducers() {
663        Set<ObjectName> set = topicProducers.keySet();
664        return set.toArray(new ObjectName[set.size()]);
665    }
666
667    protected ObjectName[] getQueueProducers() {
668        Set<ObjectName> set = queueProducers.keySet();
669        return set.toArray(new ObjectName[set.size()]);
670    }
671
672    protected ObjectName[] getTemporaryTopicProducers() {
673        Set<ObjectName> set = temporaryTopicProducers.keySet();
674        return set.toArray(new ObjectName[set.size()]);
675    }
676
677    protected ObjectName[] getTemporaryQueueProducers() {
678        Set<ObjectName> set = temporaryQueueProducers.keySet();
679        return set.toArray(new ObjectName[set.size()]);
680    }
681
682    protected ObjectName[] getDynamicDestinationProducers() {
683        Set<ObjectName> set = dynamicDestinationProducers.keySet();
684        return set.toArray(new ObjectName[set.size()]);
685    }
686
687    public Broker getContextBroker() {
688        return contextBroker;
689    }
690
691    public void setContextBroker(Broker contextBroker) {
692        this.contextBroker = contextBroker;
693    }
694
695    protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
696        // Build the object name for the destination
697        Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
698        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
699                                               + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
700                                               + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
701        return objectName;
702    }
703
704    protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException {
705        // Build the object name for the producer info
706        Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
707
708        String destinationType = "destinationType=";
709        String destinationName = "destinationName=";
710
711        if (producerInfo.getDestination() == null) {
712            destinationType += "Dynamic";
713            destinationName = null;
714        } else {
715            destinationType += producerInfo.getDestination().getDestinationTypeAsString();
716            destinationName += JMXSupport.encodeObjectNamePart(producerInfo.getDestination().getPhysicalName());
717        }
718
719        String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
720        String producerId = "producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
721
722        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
723                                               + "Type=Producer" + ","
724                                               + destinationType + ","
725                                               + (destinationName != null ? destinationName + "," : "")
726                                               + clientId + "," + producerId);
727        return objectName;
728    }
729
730    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
731        ObjectName objectName = null;
732        try {
733            objectName = createObjectName(strategy);
734            if (!registeredMBeans.contains(objectName))  {
735                AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
736                AnnotatedMBean.registerMBean(managementContext, view, objectName);
737                registeredMBeans.add(objectName);
738            }
739        } catch (Exception e) {
740            LOG.warn("Failed to register MBean: " + strategy);
741            LOG.debug("Failure reason: " + e, e);
742        }
743        return objectName;
744    }
745
746    protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
747        Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
748        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
749                                               + "," + "Type=RecoveredXaTransaction"
750                                               + "," + "Xid="
751                                               + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
752        return objectName;
753    }
754
755    public void registerRecoveredTransactionMBean(XATransaction transaction) {
756        try {
757            ObjectName objectName = createObjectName(transaction);
758            if (!registeredMBeans.contains(objectName))  {
759                RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
760                AnnotatedMBean.registerMBean(managementContext, view, objectName);
761                registeredMBeans.add(objectName);
762            }
763        } catch (Exception e) {
764            LOG.warn("Failed to register prepared transaction MBean: " + transaction);
765            LOG.debug("Failure reason: " + e, e);
766        }
767    }
768
769    public void unregister(XATransaction transaction) {
770        try {
771            ObjectName objectName = createObjectName(transaction);
772            if (registeredMBeans.remove(objectName)) {
773                try {
774                    managementContext.unregisterMBean(objectName);
775                } catch (Throwable e) {
776                    LOG.warn("Failed to unregister MBean: " + objectName);
777                    LOG.debug("Failure reason: " + e, e);
778                }
779            }
780        } catch (Exception e) {
781            LOG.warn("Failed to create object name to unregister " + transaction, e);
782        }
783    }
784
785    private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
786        Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
787        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
788                            + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
789        return objectName;
790    }
791
792    public ObjectName getSubscriberObjectName(Subscription key) {
793        return subscriptionMap.get(key);
794    }
795
796    public Subscription getSubscriber(ObjectName key) {
797        Subscription sub = null;
798        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
799            if (entry.getValue().equals(key)) {
800                sub = entry.getKey();
801                break;
802            }
803        }
804        return sub;
805    }
806}