001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.util; 018 019 import java.util.Set; 020 import javax.annotation.PostConstruct; 021 import org.apache.activemq.broker.BrokerPluginSupport; 022 import org.apache.activemq.broker.Connection; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.broker.ConsumerBrokerExchange; 025 import org.apache.activemq.broker.ProducerBrokerExchange; 026 import org.apache.activemq.broker.region.Destination; 027 import org.apache.activemq.broker.region.MessageReference; 028 import org.apache.activemq.broker.region.Subscription; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.BrokerInfo; 031 import org.apache.activemq.command.ConnectionInfo; 032 import org.apache.activemq.command.ConsumerInfo; 033 import org.apache.activemq.command.DestinationInfo; 034 import org.apache.activemq.command.Message; 035 import org.apache.activemq.command.MessageAck; 036 import org.apache.activemq.command.MessageDispatch; 037 import org.apache.activemq.command.MessageDispatchNotification; 038 import org.apache.activemq.command.MessagePull; 039 import org.apache.activemq.command.ProducerInfo; 040 import org.apache.activemq.command.RemoveSubscriptionInfo; 041 import org.apache.activemq.command.Response; 042 import org.apache.activemq.command.SessionInfo; 043 import org.apache.activemq.command.TransactionId; 044 import org.apache.activemq.usage.Usage; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 /** 049 * A simple Broker intercepter which allows you to enable/disable logging. 050 * 051 * @org.apache.xbean.XBean 052 */ 053 054 public class LoggingBrokerPlugin extends BrokerPluginSupport { 055 056 private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class); 057 058 private boolean logAll = false; 059 private boolean logMessageEvents = false; 060 private boolean logConnectionEvents = true; 061 private boolean logSessionEvents = true; 062 private boolean logTransactionEvents = false; 063 private boolean logConsumerEvents = false; 064 private boolean logProducerEvents = false; 065 private boolean logInternalEvents = false; 066 067 /** 068 * @throws Exception 069 * @org.apache.xbean.InitMethod 070 */ 071 @PostConstruct 072 public void afterPropertiesSet() throws Exception { 073 LOG.info("Created LoggingBrokerPlugin: " + this.toString()); 074 } 075 076 public boolean isLogAll() { 077 return logAll; 078 } 079 080 /** 081 * Logger all Events that go through the Plugin 082 */ 083 public void setLogAll(boolean logAll) { 084 this.logAll = logAll; 085 } 086 087 public boolean isLogMessageEvents() { 088 return logMessageEvents; 089 } 090 091 /** 092 * Logger Events that are related to message processing 093 */ 094 public void setLogMessageEvents(boolean logMessageEvents) { 095 this.logMessageEvents = logMessageEvents; 096 } 097 098 public boolean isLogConnectionEvents() { 099 return logConnectionEvents; 100 } 101 102 /** 103 * Logger Events that are related to connections 104 */ 105 public void setLogConnectionEvents(boolean logConnectionEvents) { 106 this.logConnectionEvents = logConnectionEvents; 107 } 108 109 public boolean isLogSessionEvents() { 110 return logSessionEvents; 111 } 112 113 /** 114 * Logger Events that are related to sessions 115 */ 116 public void setLogSessionEvents(boolean logSessionEvents) { 117 this.logSessionEvents = logSessionEvents; 118 } 119 120 public boolean isLogTransactionEvents() { 121 return logTransactionEvents; 122 } 123 124 /** 125 * Logger Events that are related to transaction processing 126 */ 127 public void setLogTransactionEvents(boolean logTransactionEvents) { 128 this.logTransactionEvents = logTransactionEvents; 129 } 130 131 public boolean isLogConsumerEvents() { 132 return logConsumerEvents; 133 } 134 135 /** 136 * Logger Events that are related to Consumers 137 */ 138 public void setLogConsumerEvents(boolean logConsumerEvents) { 139 this.logConsumerEvents = logConsumerEvents; 140 } 141 142 public boolean isLogProducerEvents() { 143 return logProducerEvents; 144 } 145 146 /** 147 * Logger Events that are related to Producers 148 */ 149 public void setLogProducerEvents(boolean logProducerEvents) { 150 this.logProducerEvents = logProducerEvents; 151 } 152 153 public boolean isLogInternalEvents() { 154 return logInternalEvents; 155 } 156 157 /** 158 * Logger Events that are normally internal to the broker 159 */ 160 public void setLogInternalEvents(boolean logInternalEvents) { 161 this.logInternalEvents = logInternalEvents; 162 } 163 164 @Override 165 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 166 if (isLogAll() || isLogConsumerEvents()) { 167 LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId() 168 + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); 169 if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) { 170 LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId() 171 + ", Last Message Id: " + ack.getLastMessageId()); 172 } 173 } 174 super.acknowledge(consumerExchange, ack); 175 } 176 177 @Override 178 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 179 if (isLogAll() || isLogConsumerEvents()) { 180 LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName()); 181 } 182 return super.messagePull(context, pull); 183 } 184 185 @Override 186 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 187 if (isLogAll() || isLogConnectionEvents()) { 188 LOG.info("Adding Connection : " + info); 189 } 190 super.addConnection(context, info); 191 } 192 193 @Override 194 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 195 if (isLogAll() || isLogConsumerEvents()) { 196 LOG.info("Adding Consumer : " + info); 197 } 198 return super.addConsumer(context, info); 199 } 200 201 @Override 202 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 203 if (isLogAll() || isLogProducerEvents()) { 204 LOG.info("Adding Producer :" + info); 205 } 206 super.addProducer(context, info); 207 } 208 209 @Override 210 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 211 if (isLogAll() || isLogTransactionEvents()) { 212 LOG.info("Commiting transaction : " + xid.getTransactionKey()); 213 } 214 super.commitTransaction(context, xid, onePhase); 215 } 216 217 @Override 218 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 219 if (isLogAll() || isLogConsumerEvents()) { 220 LOG.info("Removing subscription : " + info); 221 } 222 super.removeSubscription(context, info); 223 } 224 225 @Override 226 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 227 228 TransactionId[] result = super.getPreparedTransactions(context); 229 if ((isLogAll() || isLogTransactionEvents()) && result != null) { 230 StringBuffer tids = new StringBuffer(); 231 for (TransactionId tid : result) { 232 if (tids.length() > 0) { 233 tids.append(", "); 234 } 235 tids.append(tid.getTransactionKey()); 236 } 237 LOG.info("Prepared transactions : " + tids); 238 } 239 return result; 240 } 241 242 @Override 243 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 244 if (isLogAll() || isLogTransactionEvents()) { 245 LOG.info("Preparing transaction : " + xid.getTransactionKey()); 246 } 247 return super.prepareTransaction(context, xid); 248 } 249 250 @Override 251 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 252 if (isLogAll() || isLogConnectionEvents()) { 253 LOG.info("Removing Connection : " + info); 254 } 255 super.removeConnection(context, info, error); 256 } 257 258 @Override 259 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 260 if (isLogAll() || isLogConsumerEvents()) { 261 LOG.info("Removing Consumer : " + info); 262 } 263 super.removeConsumer(context, info); 264 } 265 266 @Override 267 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 268 if (isLogAll() || isLogProducerEvents()) { 269 LOG.info("Removing Producer : " + info); 270 } 271 super.removeProducer(context, info); 272 } 273 274 @Override 275 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 276 if (isLogAll() || isLogTransactionEvents()) { 277 LOG.info("Rolling back Transaction : " + xid.getTransactionKey()); 278 } 279 super.rollbackTransaction(context, xid); 280 } 281 282 @Override 283 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 284 if (isLogAll() || isLogProducerEvents()) { 285 LOG.info("Sending message : " + messageSend.copy()); 286 } 287 super.send(producerExchange, messageSend); 288 } 289 290 @Override 291 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 292 if (isLogAll() || isLogTransactionEvents()) { 293 LOG.info("Beginning transaction : " + xid.getTransactionKey()); 294 } 295 super.beginTransaction(context, xid); 296 } 297 298 @Override 299 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 300 if (isLogAll() || isLogTransactionEvents()) { 301 LOG.info("Forgetting transaction : " + transactionId.getTransactionKey()); 302 } 303 super.forgetTransaction(context, transactionId); 304 } 305 306 @Override 307 public Connection[] getClients() throws Exception { 308 Connection[] result = super.getClients(); 309 310 if (isLogAll() || isLogInternalEvents()) { 311 if (result == null) { 312 LOG.info("Get Clients returned empty list."); 313 } else { 314 StringBuffer cids = new StringBuffer(); 315 for (Connection c : result) { 316 cids.append(cids.length() > 0 ? ", " : ""); 317 cids.append(c.getConnectionId()); 318 } 319 LOG.info("Connected clients : " + cids); 320 } 321 } 322 return super.getClients(); 323 } 324 325 @Override 326 public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, 327 ActiveMQDestination destination, boolean create) throws Exception { 328 if (isLogAll() || isLogInternalEvents()) { 329 LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":" 330 + destination.getPhysicalName()); 331 } 332 return super.addDestination(context, destination, create); 333 } 334 335 @Override 336 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 337 throws Exception { 338 if (isLogAll() || isLogInternalEvents()) { 339 LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":" 340 + destination.getPhysicalName()); 341 } 342 super.removeDestination(context, destination, timeout); 343 } 344 345 @Override 346 public ActiveMQDestination[] getDestinations() throws Exception { 347 ActiveMQDestination[] result = super.getDestinations(); 348 if (isLogAll() || isLogInternalEvents()) { 349 if (result == null) { 350 LOG.info("Get Destinations returned empty list."); 351 } else { 352 StringBuffer destinations = new StringBuffer(); 353 for (ActiveMQDestination dest : result) { 354 destinations.append(destinations.length() > 0 ? ", " : ""); 355 destinations.append(dest.getPhysicalName()); 356 } 357 LOG.info("Get Destinations : " + destinations); 358 } 359 } 360 return result; 361 } 362 363 @Override 364 public void start() throws Exception { 365 if (isLogAll() || isLogInternalEvents()) { 366 LOG.info("Starting " + getBrokerName()); 367 } 368 super.start(); 369 } 370 371 @Override 372 public void stop() throws Exception { 373 if (isLogAll() || isLogInternalEvents()) { 374 LOG.info("Stopping " + getBrokerName()); 375 } 376 super.stop(); 377 } 378 379 @Override 380 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 381 if (isLogAll() || isLogSessionEvents()) { 382 LOG.info("Adding Session : " + info); 383 } 384 super.addSession(context, info); 385 } 386 387 @Override 388 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 389 if (isLogAll() || isLogSessionEvents()) { 390 LOG.info("Removing Session : " + info); 391 } 392 super.removeSession(context, info); 393 } 394 395 @Override 396 public void addBroker(Connection connection, BrokerInfo info) { 397 if (isLogAll() || isLogInternalEvents()) { 398 LOG.info("Adding Broker " + info.getBrokerName()); 399 } 400 super.addBroker(connection, info); 401 } 402 403 @Override 404 public void removeBroker(Connection connection, BrokerInfo info) { 405 if (isLogAll() || isLogInternalEvents()) { 406 LOG.info("Removing Broker " + info.getBrokerName()); 407 } 408 super.removeBroker(connection, info); 409 } 410 411 @Override 412 public BrokerInfo[] getPeerBrokerInfos() { 413 BrokerInfo[] result = super.getPeerBrokerInfos(); 414 if (isLogAll() || isLogInternalEvents()) { 415 if (result == null) { 416 LOG.info("Get Peer Broker Infos returned empty list."); 417 } else { 418 StringBuffer peers = new StringBuffer(); 419 for (BrokerInfo bi : result) { 420 peers.append(peers.length() > 0 ? ", " : ""); 421 peers.append(bi.getBrokerName()); 422 } 423 LOG.info("Get Peer Broker Infos : " + peers); 424 } 425 } 426 return result; 427 } 428 429 @Override 430 public void preProcessDispatch(MessageDispatch messageDispatch) { 431 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 432 LOG.info("preProcessDispatch :" + messageDispatch); 433 } 434 super.preProcessDispatch(messageDispatch); 435 } 436 437 @Override 438 public void postProcessDispatch(MessageDispatch messageDispatch) { 439 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 440 LOG.info("postProcessDispatch :" + messageDispatch); 441 } 442 super.postProcessDispatch(messageDispatch); 443 } 444 445 @Override 446 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 447 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 448 LOG.info("ProcessDispatchNotification :" + messageDispatchNotification); 449 } 450 super.processDispatchNotification(messageDispatchNotification); 451 } 452 453 @Override 454 public Set<ActiveMQDestination> getDurableDestinations() { 455 Set<ActiveMQDestination> result = super.getDurableDestinations(); 456 if (isLogAll() || isLogInternalEvents()) { 457 if (result == null) { 458 LOG.info("Get Durable Destinations returned empty list."); 459 } else { 460 StringBuffer destinations = new StringBuffer(); 461 for (ActiveMQDestination dest : result) { 462 destinations.append(destinations.length() > 0 ? ", " : ""); 463 destinations.append(dest.getPhysicalName()); 464 } 465 LOG.info("Get Durable Destinations : " + destinations); 466 } 467 } 468 return result; 469 } 470 471 @Override 472 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 473 if (isLogAll() || isLogInternalEvents()) { 474 LOG.info("Adding destination info : " + info); 475 } 476 super.addDestinationInfo(context, info); 477 } 478 479 @Override 480 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 481 if (isLogAll() || isLogInternalEvents()) { 482 LOG.info("Removing destination info : " + info); 483 } 484 super.removeDestinationInfo(context, info); 485 } 486 487 @Override 488 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 489 if (isLogAll() || isLogInternalEvents()) { 490 String msg = "Unable to display message."; 491 492 msg = message.getMessage().toString(); 493 494 LOG.info("Message has expired : " + msg); 495 } 496 super.messageExpired(context, message, subscription); 497 } 498 499 @Override 500 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 501 Subscription subscription) { 502 if (isLogAll() || isLogInternalEvents()) { 503 String msg = "Unable to display message."; 504 505 msg = messageReference.getMessage().toString(); 506 507 LOG.info("Sending to DLQ : " + msg); 508 } 509 super.sendToDeadLetterQueue(context, messageReference, subscription); 510 } 511 512 @Override 513 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 514 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 515 LOG.info("Fast Producer : " + producerInfo); 516 } 517 super.fastProducer(context, producerInfo); 518 } 519 520 @Override 521 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 522 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 523 LOG.info("Destination is full : " + destination.getName()); 524 } 525 super.isFull(context, destination, usage); 526 } 527 528 @Override 529 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 530 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 531 String msg = "Unable to display message."; 532 533 msg = messageReference.getMessage().toString(); 534 535 LOG.info("Message consumed : " + msg); 536 } 537 super.messageConsumed(context, messageReference); 538 } 539 540 @Override 541 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 542 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 543 String msg = "Unable to display message."; 544 545 msg = messageReference.getMessage().toString(); 546 547 LOG.info("Message delivered : " + msg); 548 } 549 super.messageDelivered(context, messageReference); 550 } 551 552 @Override 553 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 554 if (isLogAll() || isLogInternalEvents()) { 555 String msg = "Unable to display message."; 556 557 msg = messageReference.getMessage().toString(); 558 559 LOG.info("Message discarded : " + msg); 560 } 561 super.messageDiscarded(context, sub, messageReference); 562 } 563 564 @Override 565 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 566 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 567 LOG.info("Detected slow consumer on " + destination.getName()); 568 StringBuffer buf = new StringBuffer("Connection("); 569 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId()); 570 buf.append(") Session("); 571 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId()); 572 buf.append(")"); 573 LOG.info(buf.toString()); 574 } 575 super.slowConsumer(context, destination, subs); 576 } 577 578 @Override 579 public void nowMasterBroker() { 580 if (isLogAll() || isLogInternalEvents()) { 581 LOG.info("Is now the master broker : " + getBrokerName()); 582 } 583 super.nowMasterBroker(); 584 } 585 586 @Override 587 public String toString() { 588 StringBuffer buf = new StringBuffer(); 589 buf.append("LoggingBrokerPlugin("); 590 buf.append("logAll="); 591 buf.append(isLogAll()); 592 buf.append(", logConnectionEvents="); 593 buf.append(isLogConnectionEvents()); 594 buf.append(", logSessionEvents="); 595 buf.append(isLogSessionEvents()); 596 buf.append(", logConsumerEvents="); 597 buf.append(isLogConsumerEvents()); 598 buf.append(", logProducerEvents="); 599 buf.append(isLogProducerEvents()); 600 buf.append(", logMessageEvents="); 601 buf.append(isLogMessageEvents()); 602 buf.append(", logTransactionEvents="); 603 buf.append(isLogTransactionEvents()); 604 buf.append(", logInternalEvents="); 605 buf.append(isLogInternalEvents()); 606 buf.append(")"); 607 return buf.toString(); 608 } 609 }