001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.console.command.store.amq; 018 019 import java.io.File; 020 import java.io.InputStream; 021 import java.io.PrintWriter; 022 import java.util.ArrayList; 023 import java.util.Arrays; 024 import java.util.Collections; 025 import java.util.HashMap; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Scanner; 030 031 import org.apache.activemq.command.ActiveMQBlobMessage; 032 import org.apache.activemq.command.ActiveMQBytesMessage; 033 import org.apache.activemq.command.ActiveMQMapMessage; 034 import org.apache.activemq.command.ActiveMQMessage; 035 import org.apache.activemq.command.ActiveMQObjectMessage; 036 import org.apache.activemq.command.ActiveMQStreamMessage; 037 import org.apache.activemq.command.ActiveMQTextMessage; 038 import org.apache.activemq.command.DataStructure; 039 import org.apache.activemq.command.JournalQueueAck; 040 import org.apache.activemq.command.JournalTopicAck; 041 import org.apache.activemq.command.JournalTrace; 042 import org.apache.activemq.command.JournalTransaction; 043 import org.apache.activemq.kaha.impl.async.Location; 044 import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager; 045 import org.apache.activemq.openwire.OpenWireFormat; 046 import org.apache.activemq.util.ByteSequence; 047 import org.apache.activemq.wireformat.WireFormat; 048 import org.apache.velocity.Template; 049 import org.apache.velocity.VelocityContext; 050 import org.apache.velocity.app.Velocity; 051 import org.apache.velocity.app.VelocityEngine; 052 import org.josql.Query; 053 054 /** 055 * Allows you to view the contents of a Journal. 056 * 057 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 058 */ 059 public class AMQJournalTool { 060 061 private final ArrayList<File> dirs = new ArrayList<File>(); 062 private final WireFormat wireFormat = new OpenWireFormat(); 063 private final HashMap<String, String> resources = new HashMap<String, String>(); 064 065 private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}"; 066 private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}"; 067 private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}"; 068 private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}"; 069 private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}"; 070 private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}"; 071 private String where; 072 private VelocityContext context; 073 private VelocityEngine velocity; 074 private boolean help; 075 076 public static void main(String[] args) throws Exception { 077 AMQJournalTool consumerTool = new AMQJournalTool(); 078 String[] directories = CommandLineSupport 079 .setOptions(consumerTool, args); 080 if (directories.length < 1) { 081 System.out 082 .println("Please specify the directories with journal data to scan"); 083 return; 084 } 085 for (int i = 0; i < directories.length; i++) { 086 consumerTool.getDirs().add(new File(directories[i])); 087 } 088 consumerTool.execute(); 089 } 090 091 /** 092 * Creates a new VelocityContext that is pre-populated with the JVMs 093 * system properties. 094 * 095 * @return - the VelocityContext that got created. 096 */ 097 protected VelocityContext createVelocityContext() { 098 VelocityContext ctx = new VelocityContext(); 099 List keys = Arrays.asList(ctx.getKeys()); 100 101 for (Iterator iterator = System.getProperties().entrySet() 102 .iterator(); iterator.hasNext();) { 103 Map.Entry kv = (Map.Entry) iterator.next(); 104 String name = (String) kv.getKey(); 105 String value = (String) kv.getValue(); 106 107 if (!keys.contains(name)) { 108 ctx.put(name, value); 109 } 110 } 111 return ctx; 112 } 113 114 115 public void execute() throws Exception { 116 117 if( help ) { 118 showHelp(); 119 return; 120 } 121 122 if (getDirs().size() < 1) { 123 System.out.println(""); 124 System.out.println("Invalid Usage: Please specify the directories with journal data to scan"); 125 System.out.println(""); 126 showHelp(); 127 return; 128 } 129 130 for (File dir : getDirs()) { 131 if( !dir.exists() ) { 132 System.out.println(""); 133 System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist"); 134 System.out.println(""); 135 showHelp(); 136 return; 137 } 138 if( !dir.isDirectory() ) { 139 System.out.println(""); 140 System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory"); 141 System.out.println(""); 142 showHelp(); 143 return; 144 } 145 } 146 147 context = createVelocityContext(); 148 149 velocity = new VelocityEngine(); 150 velocity.setProperty(Velocity.RESOURCE_LOADER, "all"); 151 velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName()); 152 velocity.init(); 153 154 resources.put("message", messageFormat); 155 resources.put("topicAck", topicAckFormat); 156 resources.put("queueAck", queueAckFormat); 157 resources.put("transaction", transactionFormat); 158 resources.put("trace", traceFormat); 159 resources.put("unknown", unknownFormat); 160 161 Query query = null; 162 if (where != null) { 163 query = new Query(); 164 query.parse("select * from "+Entry.class.getName()+" where "+where); 165 166 } 167 168 ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs()); 169 manager.start(); 170 try { 171 Location curr = manager.getFirstLocation(); 172 while (curr != null) { 173 174 ByteSequence data = manager.read(curr); 175 DataStructure c = (DataStructure) wireFormat.unmarshal(data); 176 177 Entry entry = new Entry(); 178 entry.setLocation(curr); 179 entry.setRecord(c); 180 entry.setData(data); 181 entry.setQuery(query); 182 process(entry); 183 184 curr = manager.getNextLocation(curr); 185 } 186 } finally { 187 manager.close(); 188 } 189 } 190 191 private void showHelp() { 192 InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt"); 193 Scanner scanner = new Scanner(is); 194 while (scanner.hasNextLine()) { 195 String line = scanner.nextLine(); 196 System.out.println(line); 197 } 198 scanner.close(); } 199 200 private void process(Entry entry) throws Exception { 201 202 DataStructure record = entry.getRecord(); 203 204 switch (record.getDataStructureType()) { 205 case ActiveMQMessage.DATA_STRUCTURE_TYPE: 206 entry.setType("ActiveMQMessage"); 207 entry.setFormater("message"); 208 display(entry); 209 break; 210 case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE: 211 entry.setType("ActiveMQBytesMessage"); 212 entry.setFormater("message"); 213 display(entry); 214 break; 215 case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE: 216 entry.setType("ActiveMQBlobMessage"); 217 entry.setFormater("message"); 218 display(entry); 219 break; 220 case ActiveMQMapMessage.DATA_STRUCTURE_TYPE: 221 entry.setType("ActiveMQMapMessage"); 222 entry.setFormater("message"); 223 display(entry); 224 break; 225 case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE: 226 entry.setType("ActiveMQObjectMessage"); 227 entry.setFormater("message"); 228 display(entry); 229 break; 230 case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE: 231 entry.setType("ActiveMQStreamMessage"); 232 entry.setFormater("message"); 233 display(entry); 234 break; 235 case ActiveMQTextMessage.DATA_STRUCTURE_TYPE: 236 entry.setType("ActiveMQTextMessage"); 237 entry.setFormater("message"); 238 display(entry); 239 break; 240 case JournalQueueAck.DATA_STRUCTURE_TYPE: 241 entry.setType("Queue Ack"); 242 entry.setFormater("queueAck"); 243 display(entry); 244 break; 245 case JournalTopicAck.DATA_STRUCTURE_TYPE: 246 entry.setType("Topic Ack"); 247 entry.setFormater("topicAck"); 248 display(entry); 249 break; 250 case JournalTransaction.DATA_STRUCTURE_TYPE: 251 entry.setType(getType((JournalTransaction) record)); 252 entry.setFormater("transaction"); 253 display(entry); 254 break; 255 case JournalTrace.DATA_STRUCTURE_TYPE: 256 entry.setType("Trace"); 257 entry.setFormater("trace"); 258 display(entry); 259 break; 260 default: 261 entry.setType("Unknown"); 262 entry.setFormater("unknown"); 263 display(entry); 264 break; 265 } 266 } 267 268 private String getType(JournalTransaction record) { 269 switch (record.getType()) { 270 case JournalTransaction.XA_PREPARE: 271 return "XA Prepare"; 272 case JournalTransaction.XA_COMMIT: 273 return "XA Commit"; 274 case JournalTransaction.XA_ROLLBACK: 275 return "XA Rollback"; 276 case JournalTransaction.LOCAL_COMMIT: 277 return "Commit"; 278 case JournalTransaction.LOCAL_ROLLBACK: 279 return "Rollback"; 280 } 281 return "Unknown Transaction"; 282 } 283 284 private void display(Entry entry) throws Exception { 285 286 if (entry.getQuery() != null) { 287 List list = Collections.singletonList(entry); 288 List results = entry.getQuery().execute(list).getResults(); 289 if (results.isEmpty()) { 290 return; 291 } 292 } 293 294 CustomResourceLoader.setResources(resources); 295 try { 296 297 context.put("location", entry.getLocation()); 298 context.put("record", entry.getRecord()); 299 context.put("type", entry.getType()); 300 if (entry.getRecord() instanceof ActiveMQMessage) { 301 context.put("body", new MessageBodyFormatter( 302 (ActiveMQMessage) entry.getRecord())); 303 } 304 305 Template template = velocity.getTemplate(entry.getFormater()); 306 PrintWriter writer = new PrintWriter(System.out); 307 template.merge(context, writer); 308 writer.println(); 309 writer.flush(); 310 } finally { 311 CustomResourceLoader.setResources(null); 312 } 313 } 314 315 public void setMessageFormat(String messageFormat) { 316 this.messageFormat = messageFormat; 317 } 318 319 public void setTopicAckFormat(String ackFormat) { 320 this.topicAckFormat = ackFormat; 321 } 322 323 public void setTransactionFormat(String transactionFormat) { 324 this.transactionFormat = transactionFormat; 325 } 326 327 public void setTraceFormat(String traceFormat) { 328 this.traceFormat = traceFormat; 329 } 330 331 public void setUnknownFormat(String unknownFormat) { 332 this.unknownFormat = unknownFormat; 333 } 334 335 public void setQueueAckFormat(String queueAckFormat) { 336 this.queueAckFormat = queueAckFormat; 337 } 338 339 public String getQuery() { 340 return where; 341 } 342 343 public void setWhere(String query) { 344 this.where = query; 345 } 346 347 public boolean isHelp() { 348 return help; 349 } 350 351 public void setHelp(boolean help) { 352 this.help = help; 353 } 354 355 /** 356 * @return the dirs 357 */ 358 public ArrayList<File> getDirs() { 359 return dirs; 360 } 361 362 }