001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.lang.reflect.InvocationTargetException; 022 import java.lang.reflect.Method; 023 import java.net.MalformedURLException; 024 import java.net.URI; 025 import java.net.URL; 026 import java.util.NoSuchElementException; 027 import java.util.concurrent.atomic.AtomicInteger; 028 import javax.management.ObjectName; 029 030 import org.apache.activemq.ActiveMQConnectionMetaData; 031 import org.apache.activemq.broker.BrokerService; 032 import org.apache.activemq.broker.ConnectionContext; 033 import org.apache.activemq.broker.TransportConnector; 034 import org.apache.activemq.broker.region.Subscription; 035 import org.apache.activemq.command.ActiveMQQueue; 036 import org.apache.activemq.command.ActiveMQTopic; 037 import org.apache.activemq.command.ConsumerId; 038 import org.apache.activemq.command.ConsumerInfo; 039 import org.apache.activemq.command.RemoveSubscriptionInfo; 040 import org.apache.activemq.network.NetworkConnector; 041 import org.apache.activemq.util.BrokerSupport; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 /** 046 * 047 */ 048 public class BrokerView implements BrokerViewMBean { 049 private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class); 050 ManagedRegionBroker broker; 051 private final BrokerService brokerService; 052 private final AtomicInteger sessionIdCounter = new AtomicInteger(0); 053 private ObjectName jmsJobScheduler; 054 055 public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { 056 this.brokerService = brokerService; 057 this.broker = managedBroker; 058 } 059 060 public ManagedRegionBroker getBroker() { 061 return broker; 062 } 063 064 public void setBroker(ManagedRegionBroker broker) { 065 this.broker = broker; 066 } 067 068 public String getBrokerId() { 069 return safeGetBroker().getBrokerId().toString(); 070 } 071 072 public String getBrokerName() { 073 return safeGetBroker().getBrokerName(); 074 } 075 076 public String getBrokerVersion() { 077 return ActiveMQConnectionMetaData.PROVIDER_VERSION; 078 } 079 080 public void gc() throws Exception { 081 brokerService.getBroker().gc(); 082 try { 083 brokerService.getPersistenceAdapter().checkpoint(true); 084 } catch (IOException e) { 085 LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e); 086 } 087 } 088 089 public void start() throws Exception { 090 brokerService.start(); 091 } 092 093 public void stop() throws Exception { 094 brokerService.stop(); 095 } 096 097 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) 098 throws Exception { 099 brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval); 100 } 101 102 public long getTotalEnqueueCount() { 103 return safeGetBroker().getDestinationStatistics().getEnqueues().getCount(); 104 } 105 106 public long getTotalDequeueCount() { 107 return safeGetBroker().getDestinationStatistics().getDequeues().getCount(); 108 } 109 110 public long getTotalConsumerCount() { 111 return safeGetBroker().getDestinationStatistics().getConsumers().getCount(); 112 } 113 114 public long getTotalProducerCount() { 115 return safeGetBroker().getDestinationStatistics().getProducers().getCount(); 116 } 117 118 public long getTotalMessageCount() { 119 return safeGetBroker().getDestinationStatistics().getMessages().getCount(); 120 } 121 122 public long getTotalMessagesCached() { 123 return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount(); 124 } 125 126 public int getMemoryPercentUsage() { 127 return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage(); 128 } 129 130 public long getMemoryLimit() { 131 return brokerService.getSystemUsage().getMemoryUsage().getLimit(); 132 } 133 134 public void setMemoryLimit(long limit) { 135 brokerService.getSystemUsage().getMemoryUsage().setLimit(limit); 136 } 137 138 public long getStoreLimit() { 139 return brokerService.getSystemUsage().getStoreUsage().getLimit(); 140 } 141 142 public int getStorePercentUsage() { 143 return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); 144 } 145 146 public long getTempLimit() { 147 return brokerService.getSystemUsage().getTempUsage().getLimit(); 148 } 149 150 public int getTempPercentUsage() { 151 return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); 152 } 153 154 public void setStoreLimit(long limit) { 155 brokerService.getSystemUsage().getStoreUsage().setLimit(limit); 156 } 157 158 public void setTempLimit(long limit) { 159 brokerService.getSystemUsage().getTempUsage().setLimit(limit); 160 } 161 162 public void resetStatistics() { 163 safeGetBroker().getDestinationStatistics().reset(); 164 } 165 166 public void enableStatistics() { 167 safeGetBroker().getDestinationStatistics().setEnabled(true); 168 } 169 170 public void disableStatistics() { 171 safeGetBroker().getDestinationStatistics().setEnabled(false); 172 } 173 174 public boolean isStatisticsEnabled() { 175 return safeGetBroker().getDestinationStatistics().isEnabled(); 176 } 177 178 public boolean isPersistent() { 179 return brokerService.isPersistent(); 180 } 181 182 public boolean isSlave() { 183 return brokerService.isSlave(); 184 } 185 186 public void terminateJVM(int exitCode) { 187 System.exit(exitCode); 188 } 189 190 public ObjectName[] getTopics() { 191 return safeGetBroker().getTopics(); 192 } 193 194 public ObjectName[] getQueues() { 195 return safeGetBroker().getQueues(); 196 } 197 198 public ObjectName[] getTemporaryTopics() { 199 return safeGetBroker().getTemporaryTopics(); 200 } 201 202 public ObjectName[] getTemporaryQueues() { 203 return safeGetBroker().getTemporaryQueues(); 204 } 205 206 public ObjectName[] getTopicSubscribers() { 207 return safeGetBroker().getTopicSubscribers(); 208 } 209 210 public ObjectName[] getDurableTopicSubscribers() { 211 return safeGetBroker().getDurableTopicSubscribers(); 212 } 213 214 public ObjectName[] getQueueSubscribers() { 215 return safeGetBroker().getQueueSubscribers(); 216 } 217 218 public ObjectName[] getTemporaryTopicSubscribers() { 219 return safeGetBroker().getTemporaryTopicSubscribers(); 220 } 221 222 public ObjectName[] getTemporaryQueueSubscribers() { 223 return safeGetBroker().getTemporaryQueueSubscribers(); 224 } 225 226 public ObjectName[] getInactiveDurableTopicSubscribers() { 227 return safeGetBroker().getInactiveDurableTopicSubscribers(); 228 } 229 230 public ObjectName[] getTopicProducers() { 231 return safeGetBroker().getTopicProducers(); 232 } 233 234 public ObjectName[] getQueueProducers() { 235 return safeGetBroker().getQueueProducers(); 236 } 237 238 public ObjectName[] getTemporaryTopicProducers() { 239 return safeGetBroker().getTemporaryTopicProducers(); 240 } 241 242 public ObjectName[] getTemporaryQueueProducers() { 243 return safeGetBroker().getTemporaryQueueProducers(); 244 } 245 246 public ObjectName[] getDynamicDestinationProducers() { 247 return safeGetBroker().getDynamicDestinationProducers(); 248 } 249 250 public String addConnector(String discoveryAddress) throws Exception { 251 TransportConnector connector = brokerService.addConnector(discoveryAddress); 252 if (connector == null) { 253 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 254 } 255 connector.start(); 256 return connector.getName(); 257 } 258 259 public String addNetworkConnector(String discoveryAddress) throws Exception { 260 NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress); 261 if (connector == null) { 262 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 263 } 264 connector.start(); 265 return connector.getName(); 266 } 267 268 public boolean removeConnector(String connectorName) throws Exception { 269 TransportConnector connector = brokerService.getConnectorByName(connectorName); 270 if (connector == null) { 271 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 272 } 273 connector.stop(); 274 return brokerService.removeConnector(connector); 275 } 276 277 public boolean removeNetworkConnector(String connectorName) throws Exception { 278 NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName); 279 if (connector == null) { 280 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 281 } 282 connector.stop(); 283 return brokerService.removeNetworkConnector(connector); 284 } 285 286 public void addTopic(String name) throws Exception { 287 safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true); 288 } 289 290 public void addQueue(String name) throws Exception { 291 safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true); 292 } 293 294 public void removeTopic(String name) throws Exception { 295 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000); 296 } 297 298 public void removeQueue(String name) throws Exception { 299 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000); 300 } 301 302 public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, 303 String selector) throws Exception { 304 ConnectionContext context = new ConnectionContext(); 305 context.setBroker(safeGetBroker()); 306 context.setClientId(clientId); 307 ConsumerInfo info = new ConsumerInfo(); 308 ConsumerId consumerId = new ConsumerId(); 309 consumerId.setConnectionId(clientId); 310 consumerId.setSessionId(sessionIdCounter.incrementAndGet()); 311 consumerId.setValue(0); 312 info.setConsumerId(consumerId); 313 info.setDestination(new ActiveMQTopic(topicName)); 314 info.setSubscriptionName(subscriberName); 315 info.setSelector(selector); 316 Subscription subscription = safeGetBroker().addConsumer(context, info); 317 safeGetBroker().removeConsumer(context, info); 318 if (subscription != null) { 319 return subscription.getObjectName(); 320 } 321 return null; 322 } 323 324 public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception { 325 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 326 info.setClientId(clientId); 327 info.setSubscriptionName(subscriberName); 328 ConnectionContext context = new ConnectionContext(); 329 context.setBroker(safeGetBroker()); 330 context.setClientId(clientId); 331 safeGetBroker().removeSubscription(context, info); 332 } 333 334 // doc comment inherited from BrokerViewMBean 335 public void reloadLog4jProperties() throws Throwable { 336 337 // Avoid a direct dependency on log4j.. use reflection. 338 try { 339 ClassLoader cl = getClass().getClassLoader(); 340 Class<?> logManagerClass = cl.loadClass("org.apache.log4j.LogManager"); 341 342 Method resetConfiguration = logManagerClass.getMethod("resetConfiguration", new Class[]{}); 343 resetConfiguration.invoke(null, new Object[]{}); 344 345 String configurationOptionStr = System.getProperty("log4j.configuration"); 346 URL log4jprops = null; 347 if (configurationOptionStr != null) { 348 try { 349 log4jprops = new URL(configurationOptionStr); 350 } catch (MalformedURLException ex) { 351 log4jprops = cl.getResource("log4j.properties"); 352 } 353 } else { 354 log4jprops = cl.getResource("log4j.properties"); 355 } 356 357 if (log4jprops != null) { 358 Class<?> propertyConfiguratorClass = cl.loadClass("org.apache.log4j.PropertyConfigurator"); 359 Method configure = propertyConfiguratorClass.getMethod("configure", new Class[]{URL.class}); 360 configure.invoke(null, new Object[]{log4jprops}); 361 } 362 } catch (InvocationTargetException e) { 363 throw e.getTargetException(); 364 } 365 } 366 367 public String getOpenWireURL() { 368 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); 369 return answer != null ? answer : ""; 370 } 371 372 public String getStompURL() { 373 String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp"); 374 return answer != null ? answer : ""; 375 } 376 377 public String getSslURL() { 378 String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl"); 379 return answer != null ? answer : ""; 380 } 381 382 public String getStompSslURL() { 383 String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl"); 384 return answer != null ? answer : ""; 385 } 386 387 public String getVMURL() { 388 URI answer = brokerService.getVmConnectorURI(); 389 return answer != null ? answer.toString() : ""; 390 } 391 392 public String getDataDirectory() { 393 File file = brokerService.getDataDirectoryFile(); 394 try { 395 return file != null ? file.getCanonicalPath():""; 396 } catch (IOException e) { 397 return ""; 398 } 399 } 400 401 public ObjectName getJMSJobScheduler() { 402 return this.jmsJobScheduler; 403 } 404 405 public void setJMSJobScheduler(ObjectName name) { 406 this.jmsJobScheduler=name; 407 } 408 409 private ManagedRegionBroker safeGetBroker() { 410 if (broker == null) { 411 throw new IllegalStateException("Broker is not yet started."); 412 } 413 414 return broker; 415 } 416 }