001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.plugin;
018    
019    import org.apache.activemq.advisory.AdvisorySupport;
020    import org.apache.activemq.broker.Broker;
021    import org.apache.activemq.broker.BrokerFilter;
022    import org.apache.activemq.broker.BrokerService;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.ProducerBrokerExchange;
025    import org.apache.activemq.broker.jmx.BrokerViewMBean;
026    import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.DestinationStatistics;
029    import org.apache.activemq.broker.region.RegionBroker;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQMapMessage;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.command.MessageId;
034    import org.apache.activemq.command.ProducerId;
035    import org.apache.activemq.command.ProducerInfo;
036    import org.apache.activemq.state.ProducerState;
037    import org.apache.activemq.usage.SystemUsage;
038    import org.apache.activemq.util.IdGenerator;
039    import org.apache.activemq.util.LongSequenceGenerator;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    import javax.jms.JMSException;
044    import javax.management.ObjectName;
045    import java.io.File;
046    import java.net.URI;
047    import java.util.Set;
048    /**
049     * A StatisticsBroker You can retrieve a Map Message for a Destination - or
050     * Broker containing statistics as key-value pairs The message must contain a
051     * replyTo Destination - else its ignored
052     * 
053     */
054    public class StatisticsBroker extends BrokerFilter {
055        private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
056        static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
057        static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
058        static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
059        private static final IdGenerator ID_GENERATOR = new IdGenerator();
060        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
061        protected final ProducerId advisoryProducerId = new ProducerId();
062        protected BrokerViewMBean brokerView;
063    
064        /**
065         * 
066         * Constructor
067         * 
068         * @param next
069         */
070        public StatisticsBroker(Broker next) {
071            super(next);
072            this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
073        }
074    
075        /**
076         * Sets the persistence mode
077         * 
078         * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
079         *      org.apache.activemq.command.Message)
080         */
081        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
082            ActiveMQDestination msgDest = messageSend.getDestination();
083            ActiveMQDestination replyTo = messageSend.getReplyTo();
084            if (replyTo != null) {
085                String physicalName = msgDest.getPhysicalName();
086                boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
087                        STATS_DESTINATION_PREFIX.length());
088                boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
089                        .length());
090                boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
091                        .length());
092                BrokerService brokerService = getBrokerService();
093                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
094                if (destStats) {
095                    String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
096                    ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
097                    Set<Destination> set = getDestinations(queryDest);
098                    for (Destination dest : set) {
099                        DestinationStatistics stats = dest.getDestinationStatistics();
100                        if (stats != null) {
101                            ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
102                            statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
103                            statsMessage.setLong("size", stats.getMessages().getCount());
104                            statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
105                            statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
106                            statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
107                            statsMessage.setLong("expiredCount", stats.getExpired().getCount());
108                            statsMessage.setLong("inflightCount", stats.getInflight().getCount());
109                            statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
110                            statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
111                            statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
112                            statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
113                            statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
114                            statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
115                            statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
116                            statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
117                            statsMessage.setLong("producerCount", stats.getProducers().getCount());
118                            statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
119                            sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
120                        }
121                    }
122                } else if (subStats) {
123                    sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
124                    sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
125                } else if (brokerStats) {
126                    ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
127                    SystemUsage systemUsage = brokerService.getSystemUsage();
128                    DestinationStatistics stats = regionBroker.getDestinationStatistics();
129                    statsMessage.setString("brokerName", regionBroker.getBrokerName());
130                    statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
131                    statsMessage.setLong("size", stats.getMessages().getCount());
132                    statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
133                    statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
134                    statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
135                    statsMessage.setLong("expiredCount", stats.getExpired().getCount());
136                    statsMessage.setLong("inflightCount", stats.getInflight().getCount());
137                    statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
138                    statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
139                    statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
140                    statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
141                    statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
142                    statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
143                    statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
144                    statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
145                    statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
146                    statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
147                    statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
148                    statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
149                    statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
150                    statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
151                    statsMessage.setLong("producerCount", stats.getProducers().getCount());
152                    String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
153                    answer = answer != null ? answer : "";
154                    statsMessage.setString("openwire", answer);
155                    answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
156                    answer = answer != null ? answer : "";
157                    statsMessage.setString("stomp", answer);
158                    answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
159                    answer = answer != null ? answer : "";
160                    statsMessage.setString("ssl", answer);
161                    answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
162                    answer = answer != null ? answer : "";
163                    statsMessage.setString("stomp+ssl", answer);
164                    URI uri = brokerService.getVmConnectorURI();
165                    answer = uri != null ? uri.toString() : "";
166                    statsMessage.setString("vm", answer);
167                    File file = brokerService.getDataDirectoryFile();
168                    answer = file != null ? file.getCanonicalPath() : "";
169                    statsMessage.setString("dataDirectory", answer);
170                    statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
171                    sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
172                } else {
173                    super.send(producerExchange, messageSend);
174                }
175            } else {
176                super.send(producerExchange, messageSend);
177            }
178        }
179    
180        BrokerViewMBean getBrokerView() throws Exception {
181            if (this.brokerView == null) {
182                ObjectName brokerName = getBrokerService().getBrokerObjectName();
183                this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
184                        BrokerViewMBean.class, true);
185            }
186            return this.brokerView;
187        }
188    
189        public void start() throws Exception {
190            super.start();
191            LOG.info("Starting StatisticsBroker");
192        }
193    
194        public void stop() throws Exception {
195            super.stop();
196        }
197    
198        protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
199            for (int i = 0; i < subscribers.length; i++) {
200                ObjectName name = subscribers[i];
201                SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
202                ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
203                sendStats(context, statsMessage, replyTo);
204            }
205        }
206    
207        protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
208            ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
209            statsMessage.setString("destinationName", subscriber.getDestinationName());
210            statsMessage.setString("clientId", subscriber.getClientId());
211            statsMessage.setString("connectionId", subscriber.getConnectionId());
212            statsMessage.setLong("sessionId", subscriber.getSessionId());
213            statsMessage.setString("selector", subscriber.getSelector());
214            statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
215            statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
216            statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
217            statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
218            statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
219            statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
220            statsMessage.setBoolean("exclusive", subscriber.isExclusive());
221            statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
222            statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
223            return statsMessage;
224        }
225    
226        protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
227                throws Exception {
228            msg.setPersistent(false);
229            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
230            msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
231            msg.setDestination(replyTo);
232            msg.setResponseRequired(false);
233            msg.setProducerId(this.advisoryProducerId);
234            boolean originalFlowControl = context.isProducerFlowControl();
235            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
236            producerExchange.setConnectionContext(context);
237            producerExchange.setMutable(true);
238            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
239            try {
240                context.setProducerFlowControl(false);
241                this.next.send(producerExchange, msg);
242            } finally {
243                context.setProducerFlowControl(originalFlowControl);
244            }
245        }
246    }