001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import org.apache.activemq.advisory.AdvisorySupport;
020import org.apache.activemq.broker.Broker;
021import org.apache.activemq.broker.BrokerFilter;
022import org.apache.activemq.broker.BrokerService;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.ProducerBrokerExchange;
025import org.apache.activemq.broker.jmx.BrokerViewMBean;
026import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.DestinationStatistics;
029import org.apache.activemq.broker.region.RegionBroker;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQMapMessage;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageId;
034import org.apache.activemq.command.ProducerId;
035import org.apache.activemq.command.ProducerInfo;
036import org.apache.activemq.state.ProducerState;
037import org.apache.activemq.usage.SystemUsage;
038import org.apache.activemq.util.IdGenerator;
039import org.apache.activemq.util.LongSequenceGenerator;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import javax.jms.JMSException;
044import javax.management.ObjectName;
045import java.io.File;
046import java.net.URI;
047import java.util.Set;
048/**
049 * A StatisticsBroker You can retrieve a Map Message for a Destination - or
050 * Broker containing statistics as key-value pairs The message must contain a
051 * replyTo Destination - else its ignored
052 * 
053 */
054public class StatisticsBroker extends BrokerFilter {
055    private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
056    static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
057    static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
058    static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
059    private static final IdGenerator ID_GENERATOR = new IdGenerator();
060    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
061    protected final ProducerId advisoryProducerId = new ProducerId();
062    protected BrokerViewMBean brokerView;
063
064    /**
065     * 
066     * Constructor
067     * 
068     * @param next
069     */
070    public StatisticsBroker(Broker next) {
071        super(next);
072        this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
073    }
074
075    /**
076     * Sets the persistence mode
077     * 
078     * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
079     *      org.apache.activemq.command.Message)
080     */
081    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
082        ActiveMQDestination msgDest = messageSend.getDestination();
083        ActiveMQDestination replyTo = messageSend.getReplyTo();
084        if (replyTo != null) {
085            String physicalName = msgDest.getPhysicalName();
086            boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
087                    STATS_DESTINATION_PREFIX.length());
088            boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
089                    .length());
090            boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
091                    .length());
092            BrokerService brokerService = getBrokerService();
093            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
094            if (destStats) {
095                String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
096                ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
097                Set<Destination> set = getDestinations(queryDest);
098                for (Destination dest : set) {
099                    DestinationStatistics stats = dest.getDestinationStatistics();
100                    if (stats != null) {
101                        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
102                        statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
103                        statsMessage.setLong("size", stats.getMessages().getCount());
104                        statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
105                        statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
106                        statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
107                        statsMessage.setLong("expiredCount", stats.getExpired().getCount());
108                        statsMessage.setLong("inflightCount", stats.getInflight().getCount());
109                        statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
110                        statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
111                        statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
112                        statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
113                        statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
114                        statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
115                        statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
116                        statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
117                        statsMessage.setLong("producerCount", stats.getProducers().getCount());
118                        statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
119                        sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
120                    }
121                }
122            } else if (subStats) {
123                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
124                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
125            } else if (brokerStats) {
126                ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
127                SystemUsage systemUsage = brokerService.getSystemUsage();
128                DestinationStatistics stats = regionBroker.getDestinationStatistics();
129                statsMessage.setString("brokerName", regionBroker.getBrokerName());
130                statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
131                statsMessage.setLong("size", stats.getMessages().getCount());
132                statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
133                statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
134                statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
135                statsMessage.setLong("expiredCount", stats.getExpired().getCount());
136                statsMessage.setLong("inflightCount", stats.getInflight().getCount());
137                statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
138                statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
139                statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
140                statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
141                statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
142                statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
143                statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
144                statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
145                statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
146                statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
147                statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
148                statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
149                statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
150                statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
151                statsMessage.setLong("producerCount", stats.getProducers().getCount());
152                String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
153                answer = answer != null ? answer : "";
154                statsMessage.setString("openwire", answer);
155                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
156                answer = answer != null ? answer : "";
157                statsMessage.setString("stomp", answer);
158                answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
159                answer = answer != null ? answer : "";
160                statsMessage.setString("ssl", answer);
161                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
162                answer = answer != null ? answer : "";
163                statsMessage.setString("stomp+ssl", answer);
164                URI uri = brokerService.getVmConnectorURI();
165                answer = uri != null ? uri.toString() : "";
166                statsMessage.setString("vm", answer);
167                File file = brokerService.getDataDirectoryFile();
168                answer = file != null ? file.getCanonicalPath() : "";
169                statsMessage.setString("dataDirectory", answer);
170                statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
171                sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
172            } else {
173                super.send(producerExchange, messageSend);
174            }
175        } else {
176            super.send(producerExchange, messageSend);
177        }
178    }
179
180    BrokerViewMBean getBrokerView() throws Exception {
181        if (this.brokerView == null) {
182            ObjectName brokerName = getBrokerService().getBrokerObjectName();
183            this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
184                    BrokerViewMBean.class, true);
185        }
186        return this.brokerView;
187    }
188
189    public void start() throws Exception {
190        super.start();
191        LOG.info("Starting StatisticsBroker");
192    }
193
194    public void stop() throws Exception {
195        super.stop();
196    }
197
198    protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
199        for (int i = 0; i < subscribers.length; i++) {
200            ObjectName name = subscribers[i];
201            SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
202            ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
203            sendStats(context, statsMessage, replyTo);
204        }
205    }
206
207    protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
208        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
209        statsMessage.setString("destinationName", subscriber.getDestinationName());
210        statsMessage.setString("clientId", subscriber.getClientId());
211        statsMessage.setString("connectionId", subscriber.getConnectionId());
212        statsMessage.setLong("sessionId", subscriber.getSessionId());
213        statsMessage.setString("selector", subscriber.getSelector());
214        statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
215        statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
216        statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
217        statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
218        statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
219        statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
220        statsMessage.setBoolean("exclusive", subscriber.isExclusive());
221        statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
222        statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
223        return statsMessage;
224    }
225
226    protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
227            throws Exception {
228        msg.setPersistent(false);
229        msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
230        msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
231        msg.setDestination(replyTo);
232        msg.setResponseRequired(false);
233        msg.setProducerId(this.advisoryProducerId);
234        boolean originalFlowControl = context.isProducerFlowControl();
235        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
236        producerExchange.setConnectionContext(context);
237        producerExchange.setMutable(true);
238        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
239        try {
240            context.setProducerFlowControl(false);
241            this.next.send(producerExchange, msg);
242        } finally {
243            context.setProducerFlowControl(originalFlowControl);
244        }
245    }
246}