001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.util; 018 019 import java.io.DataOutputStream; 020 import java.io.IOException; 021 import java.net.DatagramPacket; 022 import java.net.DatagramSocket; 023 import java.net.InetAddress; 024 import java.net.InetSocketAddress; 025 import java.net.SocketAddress; 026 import java.net.URI; 027 import java.net.URISyntaxException; 028 import java.net.UnknownHostException; 029 030 import org.apache.activemq.broker.BrokerPluginSupport; 031 import org.apache.activemq.broker.ConnectionContext; 032 import org.apache.activemq.broker.ConsumerBrokerExchange; 033 import org.apache.activemq.broker.ProducerBrokerExchange; 034 import org.apache.activemq.broker.region.Subscription; 035 import org.apache.activemq.command.ActiveMQDestination; 036 import org.apache.activemq.command.BrokerId; 037 import org.apache.activemq.command.ConnectionInfo; 038 import org.apache.activemq.command.ConsumerInfo; 039 import org.apache.activemq.command.DataStructure; 040 import org.apache.activemq.command.DestinationInfo; 041 import org.apache.activemq.command.JournalTrace; 042 import org.apache.activemq.command.Message; 043 import org.apache.activemq.command.MessageAck; 044 import org.apache.activemq.command.MessageDispatch; 045 import org.apache.activemq.command.MessageDispatchNotification; 046 import org.apache.activemq.command.MessagePull; 047 import org.apache.activemq.command.ProducerInfo; 048 import org.apache.activemq.command.RemoveSubscriptionInfo; 049 import org.apache.activemq.command.Response; 050 import org.apache.activemq.command.SessionInfo; 051 import org.apache.activemq.command.TransactionId; 052 import org.apache.activemq.command.TransactionInfo; 053 import org.apache.activemq.openwire.OpenWireFormatFactory; 054 import org.apache.activemq.util.ByteArrayOutputStream; 055 import org.apache.activemq.util.ByteSequence; 056 import org.apache.activemq.wireformat.WireFormat; 057 import org.apache.activemq.wireformat.WireFormatFactory; 058 import org.slf4j.Logger; 059 import org.slf4j.LoggerFactory; 060 061 /** 062 * A Broker interceptor which allows you to trace all operations to a UDP 063 * socket. 064 * 065 * @org.apache.xbean.XBean element="udpTraceBrokerPlugin" 066 * 067 */ 068 public class UDPTraceBrokerPlugin extends BrokerPluginSupport { 069 070 private static final Logger LOG = LoggerFactory.getLogger(UDPTraceBrokerPlugin.class); 071 protected WireFormat wireFormat; 072 protected WireFormatFactory wireFormatFactory; 073 protected int maxTraceDatagramSize = 1024 * 4; 074 protected URI destination; 075 protected DatagramSocket socket; 076 077 protected BrokerId brokerId; 078 protected SocketAddress address; 079 protected boolean broadcast; 080 081 public UDPTraceBrokerPlugin() { 082 try { 083 destination = new URI("udp://127.0.0.1:61616"); 084 } catch (URISyntaxException wontHappen) { 085 } 086 } 087 088 public void start() throws Exception { 089 super.start(); 090 if (getWireFormat() == null) { 091 throw new IllegalArgumentException("Wireformat must be specifed."); 092 } 093 if (address == null) { 094 address = createSocketAddress(destination); 095 } 096 socket = createSocket(); 097 098 brokerId = super.getBrokerId(); 099 trace(new JournalTrace("START")); 100 } 101 102 protected DatagramSocket createSocket() throws IOException { 103 DatagramSocket s = new DatagramSocket(); 104 s.setSendBufferSize(maxTraceDatagramSize); 105 s.setBroadcast(broadcast); 106 return s; 107 } 108 109 public void stop() throws Exception { 110 trace(new JournalTrace("STOP")); 111 socket.close(); 112 super.stop(); 113 } 114 115 private void trace(DataStructure command) { 116 try { 117 118 ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize); 119 DataOutputStream out = new DataOutputStream(baos); 120 wireFormat.marshal(brokerId, out); 121 wireFormat.marshal(command, out); 122 out.close(); 123 ByteSequence sequence = baos.toByteSequence(); 124 DatagramPacket datagram = new DatagramPacket(sequence.getData(), sequence.getOffset(), sequence.getLength(), address); 125 socket.send(datagram); 126 127 } catch (Throwable e) { 128 LOG.debug("Failed to trace: " + command, e); 129 } 130 } 131 132 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 133 trace(messageSend); 134 super.send(producerExchange, messageSend); 135 } 136 137 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 138 trace(ack); 139 super.acknowledge(consumerExchange, ack); 140 } 141 142 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 143 trace(info); 144 super.addConnection(context, info); 145 } 146 147 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 148 trace(info); 149 return super.addConsumer(context, info); 150 } 151 152 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 153 trace(info); 154 super.addDestinationInfo(context, info); 155 } 156 157 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 158 trace(info); 159 super.addProducer(context, info); 160 } 161 162 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 163 trace(info); 164 super.addSession(context, info); 165 } 166 167 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 168 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN)); 169 super.beginTransaction(context, xid); 170 } 171 172 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 173 trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE)); 174 super.commitTransaction(context, xid, onePhase); 175 } 176 177 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 178 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET)); 179 super.forgetTransaction(context, xid); 180 } 181 182 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 183 trace(pull); 184 return super.messagePull(context, pull); 185 } 186 187 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 188 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE)); 189 return super.prepareTransaction(context, xid); 190 } 191 192 public void postProcessDispatch(MessageDispatch messageDispatch) { 193 trace(messageDispatch); 194 super.postProcessDispatch(messageDispatch); 195 } 196 197 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 198 trace(messageDispatchNotification); 199 super.processDispatchNotification(messageDispatchNotification); 200 } 201 202 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 203 trace(info.createRemoveCommand()); 204 super.removeConnection(context, info, error); 205 } 206 207 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 208 trace(info.createRemoveCommand()); 209 super.removeConsumer(context, info); 210 } 211 212 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 213 super.removeDestination(context, destination, timeout); 214 } 215 216 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 217 trace(info); 218 super.removeDestinationInfo(context, info); 219 } 220 221 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 222 trace(info.createRemoveCommand()); 223 super.removeProducer(context, info); 224 } 225 226 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 227 trace(info.createRemoveCommand()); 228 super.removeSession(context, info); 229 } 230 231 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 232 trace(info); 233 super.removeSubscription(context, info); 234 } 235 236 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 237 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK)); 238 super.rollbackTransaction(context, xid); 239 } 240 241 public WireFormat getWireFormat() { 242 if (wireFormat == null) { 243 wireFormat = createWireFormat(); 244 } 245 return wireFormat; 246 } 247 248 protected WireFormat createWireFormat() { 249 return getWireFormatFactory().createWireFormat(); 250 } 251 252 public void setWireFormat(WireFormat wireFormat) { 253 this.wireFormat = wireFormat; 254 } 255 256 public WireFormatFactory getWireFormatFactory() { 257 if (wireFormatFactory == null) { 258 wireFormatFactory = createWireFormatFactory(); 259 } 260 return wireFormatFactory; 261 } 262 263 protected OpenWireFormatFactory createWireFormatFactory() { 264 OpenWireFormatFactory wf = new OpenWireFormatFactory(); 265 wf.setCacheEnabled(false); 266 wf.setVersion(1); 267 wf.setTightEncodingEnabled(true); 268 wf.setSizePrefixDisabled(true); 269 return wf; 270 } 271 272 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 273 this.wireFormatFactory = wireFormatFactory; 274 } 275 276 protected SocketAddress createSocketAddress(URI location) throws UnknownHostException { 277 InetAddress a = InetAddress.getByName(location.getHost()); 278 int port = location.getPort(); 279 return new InetSocketAddress(a, port); 280 } 281 282 public URI getDestination() { 283 return destination; 284 } 285 286 public void setDestination(URI destination) { 287 this.destination = destination; 288 } 289 290 public int getMaxTraceDatagramSize() { 291 return maxTraceDatagramSize; 292 } 293 294 public void setMaxTraceDatagramSize(int maxTraceDatagramSize) { 295 this.maxTraceDatagramSize = maxTraceDatagramSize; 296 } 297 298 public boolean isBroadcast() { 299 return broadcast; 300 } 301 302 public void setBroadcast(boolean broadcast) { 303 this.broadcast = broadcast; 304 } 305 306 public SocketAddress getAddress() { 307 return address; 308 } 309 310 public void setAddress(SocketAddress address) { 311 this.address = address; 312 } 313 314 }