001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.util.Iterator; 020 import org.apache.activemq.broker.region.Destination; 021 import org.apache.activemq.broker.region.MessageReference; 022 import org.apache.activemq.command.Message; 023 import org.apache.activemq.command.MessageId; 024 import org.apache.activemq.store.MessageRecoveryListener; 025 import org.slf4j.Logger; 026 import org.slf4j.LoggerFactory; 027 028 /** 029 * Store based cursor 030 * 031 */ 032 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 033 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class); 034 protected final Destination regionDestination; 035 protected final PendingList batchList; 036 private Iterator<MessageReference> iterator = null; 037 protected boolean batchResetNeeded = true; 038 private boolean storeHasMessages = false; 039 protected int size; 040 private MessageId lastCachedId; 041 private boolean hadSpace = false; 042 043 protected AbstractStoreCursor(Destination destination) { 044 super((destination != null ? destination.isPrioritizedMessages():false)); 045 this.regionDestination=destination; 046 if (this.prioritizedMessages) { 047 this.batchList= new PrioritizedPendingList(); 048 } else { 049 this.batchList = new OrderedPendingList(); 050 } 051 } 052 053 054 public final synchronized void start() throws Exception{ 055 if (!isStarted()) { 056 clear(); 057 super.start(); 058 resetBatch(); 059 resetSize(); 060 setCacheEnabled(!this.storeHasMessages&&useCache); 061 } 062 } 063 064 protected void resetSize() { 065 if (isStarted()) { 066 this.size = getStoreSize(); 067 } 068 this.storeHasMessages=this.size > 0; 069 } 070 071 public final synchronized void stop() throws Exception { 072 resetBatch(); 073 super.stop(); 074 gc(); 075 } 076 077 078 public final boolean recoverMessage(Message message) throws Exception { 079 return recoverMessage(message,false); 080 } 081 082 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 083 boolean recovered = false; 084 if (recordUniqueId(message.getMessageId())) { 085 if (!cached) { 086 message.setRegionDestination(regionDestination); 087 if( message.getMemoryUsage()==null ) { 088 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 089 } 090 } 091 message.incrementReferenceCount(); 092 batchList.addMessageLast(message); 093 clearIterator(true); 094 recovered = true; 095 storeHasMessages = true; 096 } else { 097 /* 098 * we should expect to get these - as the message is recorded as it before it goes into 099 * the cache. If subsequently, we pull out that message from the store (before its deleted) 100 * it will be a duplicate - but should be ignored 101 */ 102 if (LOG.isTraceEnabled()) { 103 LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority()); 104 } 105 } 106 return recovered; 107 } 108 109 110 public final synchronized void reset() { 111 if (batchList.isEmpty()) { 112 try { 113 fillBatch(); 114 } catch (Exception e) { 115 LOG.error(this + " - Failed to fill batch", e); 116 throw new RuntimeException(e); 117 } 118 } 119 clearIterator(true); 120 size(); 121 } 122 123 124 public synchronized void release() { 125 clearIterator(false); 126 } 127 128 private synchronized void clearIterator(boolean ensureIterator) { 129 boolean haveIterator = this.iterator != null; 130 this.iterator=null; 131 if(haveIterator&&ensureIterator) { 132 ensureIterator(); 133 } 134 } 135 136 private synchronized void ensureIterator() { 137 if(this.iterator==null) { 138 this.iterator=this.batchList.iterator(); 139 } 140 } 141 142 143 public final void finished() { 144 } 145 146 147 public final synchronized boolean hasNext() { 148 if (batchList.isEmpty()) { 149 try { 150 fillBatch(); 151 } catch (Exception e) { 152 LOG.error(this + " - Failed to fill batch", e); 153 throw new RuntimeException(e); 154 } 155 } 156 ensureIterator(); 157 return this.iterator.hasNext(); 158 } 159 160 161 public final synchronized MessageReference next() { 162 MessageReference result = null; 163 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 164 result = this.iterator.next(); 165 } 166 last = result; 167 if (result != null) { 168 result.incrementReferenceCount(); 169 } 170 return result; 171 } 172 173 174 public final synchronized void addMessageLast(MessageReference node) throws Exception { 175 boolean disableCache = false; 176 if (hasSpace()) { 177 if (!isCacheEnabled() && size==0 && isStarted() && useCache) { 178 if (LOG.isTraceEnabled()) { 179 LOG.trace(this + " - enabling cache for empty store " + node.getMessageId()); 180 } 181 setCacheEnabled(true); 182 } 183 if (isCacheEnabled()) { 184 if (recoverMessage(node.getMessage(),true)) { 185 lastCachedId = node.getMessageId(); 186 } else { 187 // failed to recover, possible duplicate from concurrent dispatchPending, 188 // lets not recover further in case of out of order 189 disableCache = true; 190 } 191 } 192 } else { 193 disableCache = true; 194 } 195 196 if (disableCache && isCacheEnabled()) { 197 setCacheEnabled(false); 198 // sync with store on disabling the cache 199 if (lastCachedId != null) { 200 if (LOG.isTraceEnabled()) { 201 LOG.trace(this + " - disabling cache" 202 + ", lastCachedId: " + lastCachedId 203 + " current node Id: " + node.getMessageId() + " batchList size: " + batchList.size()); 204 } 205 setBatch(lastCachedId); 206 lastCachedId = null; 207 } 208 } 209 this.storeHasMessages = true; 210 size++; 211 } 212 213 protected void setBatch(MessageId messageId) throws Exception { 214 } 215 216 217 public final synchronized void addMessageFirst(MessageReference node) throws Exception { 218 setCacheEnabled(false); 219 size++; 220 } 221 222 223 public final synchronized void remove() { 224 size--; 225 if (iterator!=null) { 226 iterator.remove(); 227 } 228 if (last != null) { 229 last.decrementReferenceCount(); 230 } 231 } 232 233 234 public final synchronized void remove(MessageReference node) { 235 if (batchList.remove(node) != null) { 236 size--; 237 setCacheEnabled(false); 238 } 239 } 240 241 242 public final synchronized void clear() { 243 gc(); 244 } 245 246 247 public synchronized void gc() { 248 for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) { 249 MessageReference msg = i.next(); 250 rollback(msg.getMessageId()); 251 msg.decrementReferenceCount(); 252 } 253 batchList.clear(); 254 clearIterator(false); 255 batchResetNeeded = true; 256 // wonder do we need to determine size here, it may change before restart 257 resetSize(); 258 setCacheEnabled(false); 259 } 260 261 @Override 262 public boolean hasSpace() { 263 hadSpace = super.hasSpace(); 264 return hadSpace; 265 } 266 267 protected final synchronized void fillBatch() { 268 if (LOG.isTraceEnabled()) { 269 LOG.trace(this + " - fillBatch"); 270 } 271 if (batchResetNeeded) { 272 resetBatch(); 273 this.batchResetNeeded = false; 274 } 275 if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { 276 try { 277 doFillBatch(); 278 } catch (Exception e) { 279 LOG.error(this + " - Failed to fill batch", e); 280 throw new RuntimeException(e); 281 } 282 this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace; 283 } 284 } 285 286 287 public final synchronized boolean isEmpty() { 288 // negative means more messages added to store through queue.send since last reset 289 return size == 0; 290 } 291 292 293 public final synchronized boolean hasMessagesBufferedToDeliver() { 294 return !batchList.isEmpty(); 295 } 296 297 298 public final synchronized int size() { 299 if (size < 0) { 300 this.size = getStoreSize(); 301 } 302 return size; 303 } 304 305 @Override 306 public String toString() { 307 return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded 308 + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled(); 309 } 310 311 protected abstract void doFillBatch() throws Exception; 312 313 protected abstract void resetBatch(); 314 315 protected abstract int getStoreSize(); 316 317 protected abstract boolean isStoreEmpty(); 318 }