001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.Serializable; 020 021import javax.jms.JMSException; 022import javax.jms.Message; 023 024import org.apache.activemq.broker.region.MessageReference; 025import org.apache.activemq.command.MessageId; 026import org.apache.activemq.command.ProducerId; 027import org.apache.activemq.util.BitArrayBin; 028import org.apache.activemq.util.IdGenerator; 029import org.apache.activemq.util.LRUCache; 030 031/** 032 * Provides basic audit functions for Messages without sync 033 * 034 * 035 */ 036public class ActiveMQMessageAuditNoSync implements Serializable { 037 038 private static final long serialVersionUID = 1L; 039 040 public static final int DEFAULT_WINDOW_SIZE = 2048; 041 public static final int MAXIMUM_PRODUCER_COUNT = 64; 042 private int auditDepth; 043 private int maximumNumberOfProducersToTrack; 044 private LRUCache<Object, BitArrayBin> map; 045 046 /** 047 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 048 * 64 049 */ 050 public ActiveMQMessageAuditNoSync() { 051 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); 052 } 053 054 /** 055 * Construct a MessageAudit 056 * 057 * @param auditDepth range of ids to track 058 * @param maximumNumberOfProducersToTrack number of producers expected in 059 * the system 060 */ 061 public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) { 062 this.auditDepth = auditDepth; 063 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; 064 this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true); 065 } 066 067 /** 068 * @return the auditDepth 069 */ 070 public int getAuditDepth() { 071 return auditDepth; 072 } 073 074 /** 075 * @param auditDepth the auditDepth to set 076 */ 077 public void setAuditDepth(int auditDepth) { 078 this.auditDepth = auditDepth; 079 } 080 081 /** 082 * @return the maximumNumberOfProducersToTrack 083 */ 084 public int getMaximumNumberOfProducersToTrack() { 085 return maximumNumberOfProducersToTrack; 086 } 087 088 /** 089 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set 090 */ 091 public void setMaximumNumberOfProducersToTrack( 092 int maximumNumberOfProducersToTrack) { 093 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; 094 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); 095 } 096 097 /** 098 * Checks if this message has been seen before 099 * 100 * @param message 101 * @return true if the message is a duplicate 102 * @throws JMSException 103 */ 104 public boolean isDuplicate(Message message) throws JMSException { 105 return isDuplicate(message.getJMSMessageID()); 106 } 107 108 /** 109 * checks whether this messageId has been seen before and adds this 110 * messageId to the list 111 * 112 * @param id 113 * @return true if the message is a duplicate 114 */ 115 public boolean isDuplicate(String id) { 116 boolean answer = false; 117 String seed = IdGenerator.getSeedFromId(id); 118 if (seed != null) { 119 BitArrayBin bab = map.get(seed); 120 if (bab == null) { 121 bab = new BitArrayBin(auditDepth); 122 map.put(seed, bab); 123 } 124 long index = IdGenerator.getSequenceFromId(id); 125 if (index >= 0) { 126 answer = bab.setBit(index, true); 127 } 128 } 129 return answer; 130 } 131 132 /** 133 * Checks if this message has been seen before 134 * 135 * @param message 136 * @return true if the message is a duplicate 137 */ 138 public boolean isDuplicate(final MessageReference message) { 139 MessageId id = message.getMessageId(); 140 return isDuplicate(id); 141 } 142 143 /** 144 * Checks if this messageId has been seen before 145 * 146 * @param id 147 * @return true if the message is a duplicate 148 */ 149 public boolean isDuplicate(final MessageId id) { 150 boolean answer = false; 151 152 if (id != null) { 153 ProducerId pid = id.getProducerId(); 154 if (pid != null) { 155 BitArrayBin bab = map.get(pid); 156 if (bab == null) { 157 bab = new BitArrayBin(auditDepth); 158 map.put(pid, bab); 159 } 160 answer = bab.setBit(id.getProducerSequenceId(), true); 161 } 162 } 163 return answer; 164 } 165 166 /** 167 * mark this message as being received 168 * 169 * @param message 170 */ 171 public void rollback(final MessageReference message) { 172 MessageId id = message.getMessageId(); 173 rollback(id); 174 } 175 176 /** 177 * mark this message as being received 178 * 179 * @param id 180 */ 181 public void rollback(final MessageId id) { 182 if (id != null) { 183 ProducerId pid = id.getProducerId(); 184 if (pid != null) { 185 BitArrayBin bab = map.get(pid); 186 if (bab != null) { 187 bab.setBit(id.getProducerSequenceId(), false); 188 } 189 } 190 } 191 } 192 193 public void rollback(final String id) { 194 String seed = IdGenerator.getSeedFromId(id); 195 if (seed != null) { 196 BitArrayBin bab = map.get(seed); 197 if (bab != null) { 198 long index = IdGenerator.getSequenceFromId(id); 199 bab.setBit(index, false); 200 } 201 } 202 } 203 204 /** 205 * Check the message is in order 206 * @param msg 207 * @return 208 * @throws JMSException 209 */ 210 public boolean isInOrder(Message msg) throws JMSException { 211 return isInOrder(msg.getJMSMessageID()); 212 } 213 214 /** 215 * Check the message id is in order 216 * @param id 217 * @return 218 */ 219 public boolean isInOrder(final String id) { 220 boolean answer = true; 221 222 if (id != null) { 223 String seed = IdGenerator.getSeedFromId(id); 224 if (seed != null) { 225 BitArrayBin bab = map.get(seed); 226 if (bab != null) { 227 long index = IdGenerator.getSequenceFromId(id); 228 answer = bab.isInOrder(index); 229 } 230 231 } 232 } 233 return answer; 234 } 235 236 /** 237 * Check the MessageId is in order 238 * @param message 239 * @return 240 */ 241 public boolean isInOrder(final MessageReference message) { 242 return isInOrder(message.getMessageId()); 243 } 244 245 /** 246 * Check the MessageId is in order 247 * @param id 248 * @return 249 */ 250 public boolean isInOrder(final MessageId id) { 251 boolean answer = false; 252 253 if (id != null) { 254 ProducerId pid = id.getProducerId(); 255 if (pid != null) { 256 BitArrayBin bab = map.get(pid); 257 if (bab == null) { 258 bab = new BitArrayBin(auditDepth); 259 map.put(pid, bab); 260 } 261 answer = bab.isInOrder(id.getProducerSequenceId()); 262 263 } 264 } 265 return answer; 266 } 267 268 public long getLastSeqId(ProducerId id) { 269 long result = -1; 270 BitArrayBin bab = map.get(id.toString()); 271 if (bab != null) { 272 result = bab.getLastSetIndex(); 273 } 274 return result; 275 } 276 277 public void clear() { 278 map.clear(); 279 } 280}