001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.lang.reflect.InvocationTargetException;
022    import java.lang.reflect.Method;
023    import java.net.MalformedURLException;
024    import java.net.URI;
025    import java.net.URL;
026    import java.util.NoSuchElementException;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import javax.management.ObjectName;
029    
030    import org.apache.activemq.ActiveMQConnectionMetaData;
031    import org.apache.activemq.broker.BrokerService;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.broker.TransportConnector;
034    import org.apache.activemq.broker.region.Subscription;
035    import org.apache.activemq.command.ActiveMQQueue;
036    import org.apache.activemq.command.ActiveMQTopic;
037    import org.apache.activemq.command.ConsumerId;
038    import org.apache.activemq.command.ConsumerInfo;
039    import org.apache.activemq.command.RemoveSubscriptionInfo;
040    import org.apache.activemq.network.NetworkConnector;
041    import org.apache.activemq.util.BrokerSupport;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     *
047     */
048    public class BrokerView implements BrokerViewMBean {
049        private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
050        ManagedRegionBroker broker;
051        private final BrokerService brokerService;
052        private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
053        private ObjectName jmsJobScheduler;
054    
055        public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
056            this.brokerService = brokerService;
057            this.broker = managedBroker;
058        }
059    
060        public ManagedRegionBroker getBroker() {
061            return broker;
062        }
063    
064        public void setBroker(ManagedRegionBroker broker) {
065            this.broker = broker;
066        }
067    
068        public String getBrokerId() {
069            return safeGetBroker().getBrokerId().toString();
070        }
071    
072        public String getBrokerName() {
073            return safeGetBroker().getBrokerName();
074        }
075    
076        public String getBrokerVersion() {
077            return ActiveMQConnectionMetaData.PROVIDER_VERSION;
078        }
079    
080        public void gc() throws Exception {
081            brokerService.getBroker().gc();
082            try {
083                brokerService.getPersistenceAdapter().checkpoint(true);
084            } catch (IOException e) {
085                LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e);
086            }
087        }
088    
089        public void start() throws Exception {
090            brokerService.start();
091        }
092    
093        public void stop() throws Exception {
094            brokerService.stop();
095        }
096    
097        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
098                throws Exception {
099            brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
100        }
101    
102        public long getTotalEnqueueCount() {
103            return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
104        }
105    
106        public long getTotalDequeueCount() {
107            return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
108        }
109    
110        public long getTotalConsumerCount() {
111            return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
112        }
113    
114        public long getTotalProducerCount() {
115            return safeGetBroker().getDestinationStatistics().getProducers().getCount();
116        }
117    
118        public long getTotalMessageCount() {
119            return safeGetBroker().getDestinationStatistics().getMessages().getCount();
120        }
121    
122        public long getTotalMessagesCached() {
123            return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
124        }
125    
126        public int getMemoryPercentUsage() {
127            return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
128        }
129    
130        public long getMemoryLimit() {
131            return brokerService.getSystemUsage().getMemoryUsage().getLimit();
132        }
133    
134        public void setMemoryLimit(long limit) {
135            brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
136        }
137    
138        public long getStoreLimit() {
139            return brokerService.getSystemUsage().getStoreUsage().getLimit();
140        }
141    
142        public int getStorePercentUsage() {
143            return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
144        }
145    
146        public long getTempLimit() {
147           return brokerService.getSystemUsage().getTempUsage().getLimit();
148        }
149    
150        public int getTempPercentUsage() {
151           return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
152        }
153    
154        public void setStoreLimit(long limit) {
155            brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
156        }
157    
158        public void setTempLimit(long limit) {
159            brokerService.getSystemUsage().getTempUsage().setLimit(limit);
160        }
161    
162        public void resetStatistics() {
163            safeGetBroker().getDestinationStatistics().reset();
164        }
165    
166        public void enableStatistics() {
167            safeGetBroker().getDestinationStatistics().setEnabled(true);
168        }
169    
170        public void disableStatistics() {
171            safeGetBroker().getDestinationStatistics().setEnabled(false);
172        }
173    
174        public boolean isStatisticsEnabled() {
175            return safeGetBroker().getDestinationStatistics().isEnabled();
176        }
177    
178        public boolean isPersistent() {
179            return brokerService.isPersistent();
180        }
181    
182        public boolean isSlave() {
183            return brokerService.isSlave();
184        }
185    
186        public void terminateJVM(int exitCode) {
187            System.exit(exitCode);
188        }
189    
190        public ObjectName[] getTopics() {
191            return safeGetBroker().getTopics();
192        }
193    
194        public ObjectName[] getQueues() {
195            return safeGetBroker().getQueues();
196        }
197    
198        public ObjectName[] getTemporaryTopics() {
199            return safeGetBroker().getTemporaryTopics();
200        }
201    
202        public ObjectName[] getTemporaryQueues() {
203            return safeGetBroker().getTemporaryQueues();
204        }
205    
206        public ObjectName[] getTopicSubscribers() {
207            return safeGetBroker().getTopicSubscribers();
208        }
209    
210        public ObjectName[] getDurableTopicSubscribers() {
211            return safeGetBroker().getDurableTopicSubscribers();
212        }
213    
214        public ObjectName[] getQueueSubscribers() {
215            return safeGetBroker().getQueueSubscribers();
216        }
217    
218        public ObjectName[] getTemporaryTopicSubscribers() {
219            return safeGetBroker().getTemporaryTopicSubscribers();
220        }
221    
222        public ObjectName[] getTemporaryQueueSubscribers() {
223            return safeGetBroker().getTemporaryQueueSubscribers();
224        }
225    
226        public ObjectName[] getInactiveDurableTopicSubscribers() {
227            return safeGetBroker().getInactiveDurableTopicSubscribers();
228        }
229    
230        public ObjectName[] getTopicProducers() {
231            return safeGetBroker().getTopicProducers();
232        }
233    
234        public ObjectName[] getQueueProducers() {
235            return safeGetBroker().getQueueProducers();
236        }
237    
238        public ObjectName[] getTemporaryTopicProducers() {
239            return safeGetBroker().getTemporaryTopicProducers();
240        }
241    
242        public ObjectName[] getTemporaryQueueProducers() {
243            return safeGetBroker().getTemporaryQueueProducers();
244        }
245    
246        public ObjectName[] getDynamicDestinationProducers() {
247            return safeGetBroker().getDynamicDestinationProducers();
248        }
249    
250        public String addConnector(String discoveryAddress) throws Exception {
251            TransportConnector connector = brokerService.addConnector(discoveryAddress);
252            if (connector == null) {
253                throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
254            }
255            connector.start();
256            return connector.getName();
257        }
258    
259        public String addNetworkConnector(String discoveryAddress) throws Exception {
260            NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
261            if (connector == null) {
262                throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
263            }
264            connector.start();
265            return connector.getName();
266        }
267    
268        public boolean removeConnector(String connectorName) throws Exception {
269            TransportConnector connector = brokerService.getConnectorByName(connectorName);
270            if (connector == null) {
271                throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
272            }
273            connector.stop();
274            return brokerService.removeConnector(connector);
275        }
276    
277        public boolean removeNetworkConnector(String connectorName) throws Exception {
278            NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
279            if (connector == null) {
280                throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
281            }
282            connector.stop();
283            return brokerService.removeNetworkConnector(connector);
284        }
285    
286        public void addTopic(String name) throws Exception {
287            safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
288        }
289    
290        public void addQueue(String name) throws Exception {
291            safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
292        }
293    
294        public void removeTopic(String name) throws Exception {
295            safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
296        }
297    
298        public void removeQueue(String name) throws Exception {
299            safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
300        }
301    
302        public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
303                                                  String selector) throws Exception {
304            ConnectionContext context = new ConnectionContext();
305            context.setBroker(safeGetBroker());
306            context.setClientId(clientId);
307            ConsumerInfo info = new ConsumerInfo();
308            ConsumerId consumerId = new ConsumerId();
309            consumerId.setConnectionId(clientId);
310            consumerId.setSessionId(sessionIdCounter.incrementAndGet());
311            consumerId.setValue(0);
312            info.setConsumerId(consumerId);
313            info.setDestination(new ActiveMQTopic(topicName));
314            info.setSubscriptionName(subscriberName);
315            info.setSelector(selector);
316            Subscription subscription = safeGetBroker().addConsumer(context, info);
317            safeGetBroker().removeConsumer(context, info);
318            if (subscription != null) {
319                return subscription.getObjectName();
320            }
321            return null;
322        }
323    
324        public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
325            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
326            info.setClientId(clientId);
327            info.setSubscriptionName(subscriberName);
328            ConnectionContext context = new ConnectionContext();
329            context.setBroker(safeGetBroker());
330            context.setClientId(clientId);
331            safeGetBroker().removeSubscription(context, info);
332        }
333    
334        //  doc comment inherited from BrokerViewMBean
335        public void reloadLog4jProperties() throws Throwable {
336    
337            // Avoid a direct dependency on log4j.. use reflection.
338            try {
339                ClassLoader cl = getClass().getClassLoader();
340                Class<?> logManagerClass = cl.loadClass("org.apache.log4j.LogManager");
341    
342                Method resetConfiguration = logManagerClass.getMethod("resetConfiguration", new Class[]{});
343                resetConfiguration.invoke(null, new Object[]{});
344    
345                String configurationOptionStr = System.getProperty("log4j.configuration");
346                URL log4jprops = null;
347                if (configurationOptionStr != null) {
348                    try {
349                        log4jprops = new URL(configurationOptionStr);
350                    } catch (MalformedURLException ex) {
351                        log4jprops = cl.getResource("log4j.properties");
352                    }
353                } else {
354                   log4jprops = cl.getResource("log4j.properties");
355                }
356    
357                if (log4jprops != null) {
358                    Class<?> propertyConfiguratorClass = cl.loadClass("org.apache.log4j.PropertyConfigurator");
359                    Method configure = propertyConfiguratorClass.getMethod("configure", new Class[]{URL.class});
360                    configure.invoke(null, new Object[]{log4jprops});
361                }
362            } catch (InvocationTargetException e) {
363                throw e.getTargetException();
364            }
365        }
366    
367        public String getOpenWireURL() {
368            String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
369            return answer != null ? answer : "";
370        }
371    
372        public String getStompURL() {
373            String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
374            return answer != null ? answer : "";
375        }
376    
377        public String getSslURL() {
378            String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
379            return answer != null ? answer : "";
380        }
381    
382        public String getStompSslURL() {
383            String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
384            return answer != null ? answer : "";
385        }
386    
387        public String getVMURL() {
388            URI answer = brokerService.getVmConnectorURI();
389            return answer != null ? answer.toString() : "";
390        }
391    
392        public String getDataDirectory() {
393            File file = brokerService.getDataDirectoryFile();
394            try {
395                return file != null ? file.getCanonicalPath():"";
396            } catch (IOException e) {
397                return "";
398            }
399        }
400    
401        public ObjectName getJMSJobScheduler() {
402            return this.jmsJobScheduler;
403        }
404    
405        public void setJMSJobScheduler(ObjectName name) {
406            this.jmsJobScheduler=name;
407        }
408    
409        private ManagedRegionBroker safeGetBroker() {
410            if (broker == null) {
411                throw new IllegalStateException("Broker is not yet started.");
412            }
413    
414            return broker;
415        }
416    }