001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.console.command.store.amq.reader; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.HashSet; 022 import java.util.Iterator; 023 import java.util.Set; 024 025 import javax.jms.InvalidSelectorException; 026 import javax.jms.Message; 027 028 import org.apache.activemq.command.DataStructure; 029 import org.apache.activemq.filter.BooleanExpression; 030 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 031 import org.apache.activemq.kaha.impl.async.Location; 032 import org.apache.activemq.openwire.OpenWireFormat; 033 import org.apache.activemq.selector.SelectorParser; 034 import org.apache.activemq.util.ByteSequence; 035 import org.apache.activemq.wireformat.WireFormat; 036 037 /** 038 * Reads and iterates through data log files for the AMQMessage Store 039 * 040 */ 041 public class AMQReader implements Iterable<Message> { 042 043 private AsyncDataManager dataManager; 044 private WireFormat wireFormat = new OpenWireFormat(); 045 private File file; 046 private BooleanExpression expression; 047 048 /** 049 * List all the data files in a directory 050 * @param directory 051 * @return result 052 * @throws IOException 053 */ 054 public static Set<File> listDataFiles(File directory) throws IOException{ 055 Set<File>result = new HashSet<File>(); 056 if (directory == null || !directory.exists() || !directory.isDirectory()) { 057 throw new IOException("Invalid Directory " + directory); 058 } 059 AsyncDataManager dataManager = new AsyncDataManager(); 060 dataManager.setDirectory(directory); 061 dataManager.start(); 062 Set<File> set = dataManager.getFiles(); 063 if (set != null) { 064 result.addAll(set); 065 } 066 dataManager.close(); 067 return result; 068 } 069 /** 070 * Create the AMQReader to read a directory of amq data logs - or an 071 * individual data log file 072 * 073 * @param file the directory - or file 074 * @throws IOException 075 * @throws InvalidSelectorException 076 * @throws IOException 077 * @throws InvalidSelectorException 078 */ 079 public AMQReader(File file) throws InvalidSelectorException, IOException { 080 this(file,null); 081 } 082 083 /** 084 * Create the AMQReader to read a directory of amq data logs - or an 085 * individual data log file 086 * 087 * @param file the directory - or file 088 * @param selector the JMS selector or null to select all 089 * @throws IOException 090 * @throws InvalidSelectorException 091 */ 092 public AMQReader(File file, String selector) throws IOException, InvalidSelectorException { 093 String str = selector != null ? selector.trim() : null; 094 if (str != null && str.length() > 0) { 095 this.expression=SelectorParser.parse(str); 096 } 097 dataManager = new AsyncDataManager(); 098 dataManager.setArchiveDataLogs(false); 099 if (file.isDirectory()) { 100 dataManager.setDirectory(file); 101 } else { 102 dataManager.setDirectory(file.getParentFile()); 103 dataManager.setDirectoryArchive(file); 104 this.file = file; 105 } 106 dataManager.start(); 107 } 108 109 public Iterator<Message> iterator() { 110 return new AMQIterator(this,this.expression); 111 } 112 113 114 protected MessageLocation getNextMessage(MessageLocation lastLocation) 115 throws IllegalStateException, IOException { 116 if (this.file != null) { 117 return getInternalNextMessage(this.file, lastLocation); 118 } 119 return getInternalNextMessage(lastLocation); 120 } 121 122 private MessageLocation getInternalNextMessage(MessageLocation lastLocation) 123 throws IllegalStateException, IOException { 124 return getInternalNextMessage(null, lastLocation); 125 } 126 127 private MessageLocation getInternalNextMessage(File file, 128 MessageLocation lastLocation) throws IllegalStateException, 129 IOException { 130 MessageLocation result = lastLocation; 131 if (result != null) { 132 result.setMessage(null); 133 } 134 Message message = null; 135 Location pos = lastLocation != null ? lastLocation.getLocation() : null; 136 while ((pos = getNextLocation(file, pos)) != null) { 137 message = getMessage(pos); 138 if (message != null) { 139 if (result == null) { 140 result = new MessageLocation(); 141 } 142 result.setMessage(message); 143 break; 144 } 145 } 146 result.setLocation(pos); 147 if (pos == null && message == null) { 148 result = null; 149 } else { 150 result.setLocation(pos); 151 } 152 return result; 153 } 154 155 private Location getNextLocation(File file, Location last) 156 throws IllegalStateException, IOException { 157 if (file != null) { 158 return dataManager.getNextLocation(file, last, true); 159 } 160 return dataManager.getNextLocation(last); 161 } 162 163 private Message getMessage(Location location) throws IOException { 164 ByteSequence data = dataManager.read(location); 165 DataStructure c = (DataStructure) wireFormat.unmarshal(data); 166 if (c instanceof Message) { 167 return (Message) c; 168 } 169 return null; 170 171 } 172 }