001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command.store.amq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.PrintWriter;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Scanner;
030
031import org.apache.activemq.command.ActiveMQBlobMessage;
032import org.apache.activemq.command.ActiveMQBytesMessage;
033import org.apache.activemq.command.ActiveMQMapMessage;
034import org.apache.activemq.command.ActiveMQMessage;
035import org.apache.activemq.command.ActiveMQObjectMessage;
036import org.apache.activemq.command.ActiveMQStreamMessage;
037import org.apache.activemq.command.ActiveMQTextMessage;
038import org.apache.activemq.command.DataStructure;
039import org.apache.activemq.command.JournalQueueAck;
040import org.apache.activemq.command.JournalTopicAck;
041import org.apache.activemq.command.JournalTrace;
042import org.apache.activemq.command.JournalTransaction;
043import org.apache.activemq.kaha.impl.async.Location;
044import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
045import org.apache.activemq.openwire.OpenWireFormat;
046import org.apache.activemq.util.ByteSequence;
047import org.apache.activemq.wireformat.WireFormat;
048import org.apache.velocity.Template;
049import org.apache.velocity.VelocityContext;
050import org.apache.velocity.app.Velocity;
051import org.apache.velocity.app.VelocityEngine;
052import org.josql.Query;
053
054/**
055 * Allows you to view the contents of a Journal.
056 * 
057 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
058 */
059public class AMQJournalTool {
060
061        private final ArrayList<File> dirs = new ArrayList<File>();
062        private final WireFormat wireFormat = new OpenWireFormat();
063        private final HashMap<String, String> resources = new HashMap<String, String>();
064
065        private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
066        private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
067        private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
068        private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
069        private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
070        private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
071        private String where;
072        private VelocityContext context;
073        private VelocityEngine velocity;
074        private boolean help;
075
076        public static void main(String[] args) throws Exception {
077                AMQJournalTool consumerTool = new AMQJournalTool();
078                String[] directories = CommandLineSupport
079                                .setOptions(consumerTool, args);
080                if (directories.length < 1) {
081                        System.out
082                                        .println("Please specify the directories with journal data to scan");
083                        return;
084                }
085                for (int i = 0; i < directories.length; i++) {
086                        consumerTool.getDirs().add(new File(directories[i]));
087                }
088                consumerTool.execute();
089        }
090
091        /**
092         * Creates a new VelocityContext that is pre-populated with the JVMs
093         * system properties. 
094         * 
095         * @return - the VelocityContext that got created.
096         */
097        protected VelocityContext createVelocityContext() {
098                VelocityContext ctx = new VelocityContext();
099                List keys = Arrays.asList(ctx.getKeys());
100
101                for (Iterator iterator = System.getProperties().entrySet()
102                                .iterator(); iterator.hasNext();) {
103                        Map.Entry kv = (Map.Entry) iterator.next();
104                        String name = (String) kv.getKey();
105                        String value = (String) kv.getValue();
106
107                        if (!keys.contains(name)) {
108                                ctx.put(name, value);
109                        }
110                }
111                return ctx;
112        }
113        
114        
115        public void execute() throws Exception {
116
117                if( help ) {
118                        showHelp();
119                        return;
120                }
121                
122                if (getDirs().size() < 1) {
123                        System.out.println("");
124                        System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
125                        System.out.println("");
126                        showHelp();
127                        return;
128                }
129
130                for (File dir : getDirs()) {
131                        if( !dir.exists() ) {
132                                System.out.println("");
133                                System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
134                                System.out.println("");
135                                showHelp();
136                                return;
137                        }
138                        if( !dir.isDirectory() ) {
139                                System.out.println("");
140                                System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
141                                System.out.println("");
142                                showHelp();
143                                return;
144                        }
145                }
146                
147                context = createVelocityContext();
148                
149                velocity = new VelocityEngine();
150                velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
151                velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
152                velocity.init();
153
154                resources.put("message", messageFormat);
155                resources.put("topicAck", topicAckFormat);
156                resources.put("queueAck", queueAckFormat);
157                resources.put("transaction", transactionFormat);
158                resources.put("trace", traceFormat);
159                resources.put("unknown", unknownFormat);
160                
161                Query query = null;
162                if (where != null) {
163                        query = new Query();
164                        query.parse("select * from "+Entry.class.getName()+" where "+where);
165
166                }
167
168                ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
169                manager.start();
170                try {
171                        Location curr = manager.getFirstLocation();
172                        while (curr != null) {
173
174                                ByteSequence data = manager.read(curr);
175                                DataStructure c = (DataStructure) wireFormat.unmarshal(data);
176
177                                Entry entry = new Entry();
178                                entry.setLocation(curr);
179                                entry.setRecord(c);
180                                entry.setData(data);
181                                entry.setQuery(query);
182                                process(entry);
183
184                                curr = manager.getNextLocation(curr);           
185                        }
186                } finally {
187                        manager.close();
188                }
189        }
190
191        private void showHelp() {
192                InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
193                Scanner scanner = new Scanner(is);
194                while (scanner.hasNextLine()) {
195                        String line = scanner.nextLine();
196                        System.out.println(line);
197                }
198                scanner.close();        }
199
200        private void process(Entry entry) throws Exception {
201
202                DataStructure record = entry.getRecord();
203
204                switch (record.getDataStructureType()) {
205                case ActiveMQMessage.DATA_STRUCTURE_TYPE:
206                        entry.setType("ActiveMQMessage");
207                        entry.setFormater("message");
208                        display(entry);
209                        break;
210                case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
211                        entry.setType("ActiveMQBytesMessage");
212                        entry.setFormater("message");
213                        display(entry);
214                        break;
215                case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
216                        entry.setType("ActiveMQBlobMessage");
217                        entry.setFormater("message");
218                        display(entry);
219                        break;
220                case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
221                        entry.setType("ActiveMQMapMessage");
222                        entry.setFormater("message");
223                        display(entry);
224                        break;
225                case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
226                        entry.setType("ActiveMQObjectMessage");
227                        entry.setFormater("message");
228                        display(entry);
229                        break;
230                case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
231                        entry.setType("ActiveMQStreamMessage");
232                        entry.setFormater("message");
233                        display(entry);
234                        break;
235                case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
236                        entry.setType("ActiveMQTextMessage");
237                        entry.setFormater("message");
238                        display(entry);
239                        break;
240                case JournalQueueAck.DATA_STRUCTURE_TYPE:
241                        entry.setType("Queue Ack");
242                        entry.setFormater("queueAck");
243                        display(entry);
244                        break;
245                case JournalTopicAck.DATA_STRUCTURE_TYPE:
246                        entry.setType("Topic Ack");
247                        entry.setFormater("topicAck");
248                        display(entry);
249                        break;
250                case JournalTransaction.DATA_STRUCTURE_TYPE:
251                        entry.setType(getType((JournalTransaction) record));
252                        entry.setFormater("transaction");
253                        display(entry);
254                        break;
255                case JournalTrace.DATA_STRUCTURE_TYPE:
256                        entry.setType("Trace");
257                        entry.setFormater("trace");
258                        display(entry);
259                        break;
260                default:
261                        entry.setType("Unknown");
262                        entry.setFormater("unknown");
263                        display(entry);
264                        break;
265                }
266        }
267
268        private String getType(JournalTransaction record) {
269                switch (record.getType()) {
270                case JournalTransaction.XA_PREPARE:
271                        return "XA Prepare";
272                case JournalTransaction.XA_COMMIT:
273                        return "XA Commit";
274                case JournalTransaction.XA_ROLLBACK:
275                        return "XA Rollback";
276                case JournalTransaction.LOCAL_COMMIT:
277                        return "Commit";
278                case JournalTransaction.LOCAL_ROLLBACK:
279                        return "Rollback";
280                }
281                return "Unknown Transaction";
282        }
283
284        private void display(Entry entry) throws Exception {
285
286                if (entry.getQuery() != null) {
287                        List list = Collections.singletonList(entry);
288                        List results = entry.getQuery().execute(list).getResults();
289                        if (results.isEmpty()) {
290                                return;
291                        }
292                }
293
294                CustomResourceLoader.setResources(resources);
295                try {
296
297                        context.put("location", entry.getLocation());
298                        context.put("record", entry.getRecord());
299                        context.put("type", entry.getType());
300                        if (entry.getRecord() instanceof ActiveMQMessage) {
301                                context.put("body", new MessageBodyFormatter(
302                                                (ActiveMQMessage) entry.getRecord()));
303                        }
304
305                        Template template = velocity.getTemplate(entry.getFormater());
306                        PrintWriter writer = new PrintWriter(System.out);
307                        template.merge(context, writer);
308                        writer.println();
309                        writer.flush();
310                } finally {
311                        CustomResourceLoader.setResources(null);
312                }
313        }
314
315        public void setMessageFormat(String messageFormat) {
316                this.messageFormat = messageFormat;
317        }
318
319        public void setTopicAckFormat(String ackFormat) {
320                this.topicAckFormat = ackFormat;
321        }
322
323        public void setTransactionFormat(String transactionFormat) {
324                this.transactionFormat = transactionFormat;
325        }
326
327        public void setTraceFormat(String traceFormat) {
328                this.traceFormat = traceFormat;
329        }
330
331        public void setUnknownFormat(String unknownFormat) {
332                this.unknownFormat = unknownFormat;
333        }
334
335        public void setQueueAckFormat(String queueAckFormat) {
336                this.queueAckFormat = queueAckFormat;
337        }
338
339        public String getQuery() {
340                return where;
341        }
342
343        public void setWhere(String query) {
344                this.where = query;
345        }
346
347        public boolean isHelp() {
348                return help;
349        }
350
351        public void setHelp(boolean help) {
352                this.help = help;
353        }
354
355        /**
356         * @return the dirs
357         */
358        public ArrayList<File> getDirs() {
359                return dirs;
360        }
361
362}