001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Iterator;
020    import org.apache.activemq.broker.region.Destination;
021    import org.apache.activemq.broker.region.MessageReference;
022    import org.apache.activemq.command.Message;
023    import org.apache.activemq.command.MessageId;
024    import org.apache.activemq.store.MessageRecoveryListener;
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    /**
029     *  Store based cursor
030     *
031     */
032    public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
033        private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class);
034        protected final Destination regionDestination;
035        protected final PendingList batchList;
036        private Iterator<MessageReference> iterator = null;
037        protected boolean batchResetNeeded = true;
038        private boolean storeHasMessages = false;
039        protected int size;
040        private MessageId lastCachedId;
041        private boolean hadSpace = false;
042    
043        protected AbstractStoreCursor(Destination destination) {
044            super((destination != null ? destination.isPrioritizedMessages():false));
045            this.regionDestination=destination;
046            if (this.prioritizedMessages) {
047                this.batchList= new PrioritizedPendingList();
048            } else {
049                this.batchList = new OrderedPendingList();
050            }
051        }
052        
053        
054        public final synchronized void start() throws Exception{
055            if (!isStarted()) {
056                clear();
057                super.start();      
058                resetBatch();
059                resetSize();
060                setCacheEnabled(!this.storeHasMessages&&useCache);
061            } 
062        }
063    
064        protected void resetSize() {
065            if (isStarted()) {
066                this.size = getStoreSize();
067            }
068            this.storeHasMessages=this.size > 0;
069        }
070    
071        public final synchronized void stop() throws Exception {
072            resetBatch();
073            super.stop();
074            gc();
075        }
076    
077        
078        public final boolean recoverMessage(Message message) throws Exception {
079            return recoverMessage(message,false);
080        }
081        
082        public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
083            boolean recovered = false;
084            if (recordUniqueId(message.getMessageId())) {
085                if (!cached) {
086                    message.setRegionDestination(regionDestination);
087                    if( message.getMemoryUsage()==null ) {
088                        message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
089                    }
090                }
091                message.incrementReferenceCount();
092                batchList.addMessageLast(message);
093                clearIterator(true);
094                recovered = true;
095                storeHasMessages = true;
096            } else {
097                /*
098                 * we should expect to get these - as the message is recorded as it before it goes into
099                 * the cache. If subsequently, we pull out that message from the store (before its deleted)
100                 * it will be a duplicate - but should be ignored
101                 */
102                if (LOG.isTraceEnabled()) {
103                    LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
104                }
105            }
106            return recovered;
107        }
108        
109        
110        public final synchronized void reset() {
111            if (batchList.isEmpty()) {
112                try {
113                    fillBatch();
114                } catch (Exception e) {
115                    LOG.error(this + " - Failed to fill batch", e);
116                    throw new RuntimeException(e);
117                }
118            }
119            clearIterator(true);
120            size();
121        }
122        
123        
124        public synchronized void release() {
125            clearIterator(false);
126        }
127        
128        private synchronized void clearIterator(boolean ensureIterator) {
129            boolean haveIterator = this.iterator != null;
130            this.iterator=null;
131            if(haveIterator&&ensureIterator) {
132                ensureIterator();
133            }
134        }
135        
136        private synchronized void ensureIterator() {
137            if(this.iterator==null) {
138                this.iterator=this.batchList.iterator();
139            }
140        }
141    
142    
143        public final void finished() {
144        }
145            
146        
147        public final synchronized boolean hasNext() {
148            if (batchList.isEmpty()) {
149                try {
150                    fillBatch();
151                } catch (Exception e) {
152                    LOG.error(this + " - Failed to fill batch", e);
153                    throw new RuntimeException(e);
154                }
155            }
156            ensureIterator();
157            return this.iterator.hasNext();
158        }
159        
160        
161        public final synchronized MessageReference next() {
162            MessageReference result = null;
163            if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
164                result = this.iterator.next();
165            }
166            last = result;
167            if (result != null) {
168                result.incrementReferenceCount();
169            }
170            return result;
171        }
172        
173        
174        public final synchronized void addMessageLast(MessageReference node) throws Exception {
175            boolean disableCache = false;
176            if (hasSpace()) {
177                if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
178                    if (LOG.isTraceEnabled()) {
179                        LOG.trace(this + " - enabling cache for empty store " + node.getMessageId());
180                    }
181                    setCacheEnabled(true);
182                }
183                if (isCacheEnabled()) {
184                    if (recoverMessage(node.getMessage(),true)) {
185                        lastCachedId = node.getMessageId();
186                    } else {
187                        // failed to recover, possible duplicate from concurrent dispatchPending,
188                        // lets not recover further in case of out of order
189                        disableCache = true;
190                    }
191                }
192            } else {
193                disableCache = true;
194            }
195    
196            if (disableCache && isCacheEnabled()) {
197                setCacheEnabled(false);
198                // sync with store on disabling the cache
199                if (lastCachedId != null) {
200                    if (LOG.isTraceEnabled()) {
201                        LOG.trace(this + " - disabling cache"
202                                + ", lastCachedId: " + lastCachedId
203                                + " current node Id: " + node.getMessageId() + " batchList size: " + batchList.size());
204                    }
205                    setBatch(lastCachedId);
206                    lastCachedId = null;
207                }
208            }
209            this.storeHasMessages = true;
210            size++;
211        }
212    
213        protected void setBatch(MessageId messageId) throws Exception {
214        }
215    
216        
217        public final synchronized void addMessageFirst(MessageReference node) throws Exception {
218            setCacheEnabled(false);
219            size++;
220        }
221    
222        
223        public final synchronized void remove() {
224            size--;
225            if (iterator!=null) {
226                iterator.remove();
227            }
228            if (last != null) {
229                last.decrementReferenceCount();
230            }
231        }
232    
233        
234        public final synchronized void remove(MessageReference node) {
235            if (batchList.remove(node) != null) {
236                size--;
237                setCacheEnabled(false);
238            }
239        }
240        
241        
242        public final synchronized void clear() {
243            gc();
244        }
245        
246        
247        public synchronized void gc() {
248            for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
249                MessageReference msg = i.next();
250                rollback(msg.getMessageId());
251                msg.decrementReferenceCount();
252            }
253            batchList.clear();
254            clearIterator(false);
255            batchResetNeeded = true;
256            // wonder do we need to determine size here, it may change before restart
257            resetSize();
258            setCacheEnabled(false);
259        }
260    
261        @Override
262        public boolean hasSpace() {
263            hadSpace = super.hasSpace();
264            return hadSpace;
265        }
266    
267        protected final synchronized void fillBatch() {
268            if (LOG.isTraceEnabled()) {
269                LOG.trace(this + " - fillBatch");
270            }
271            if (batchResetNeeded) {
272                resetBatch();
273                this.batchResetNeeded = false;
274            }
275            if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
276                try {
277                    doFillBatch();
278                } catch (Exception e) {
279                    LOG.error(this + " - Failed to fill batch", e);
280                    throw new RuntimeException(e);
281                }
282                this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
283            }
284        }
285        
286        
287        public final synchronized boolean isEmpty() {
288            // negative means more messages added to store through queue.send since last reset
289            return size == 0;
290        }
291    
292        
293        public final synchronized boolean hasMessagesBufferedToDeliver() {
294            return !batchList.isEmpty();
295        }
296    
297        
298        public final synchronized int size() {
299            if (size < 0) {
300                this.size = getStoreSize();
301            }
302            return size;
303        }
304    
305        @Override
306        public String toString() {
307            return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
308                        + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled();
309        }
310        
311        protected abstract void doFillBatch() throws Exception;
312        
313        protected abstract void resetBatch();
314        
315        protected abstract int getStoreSize();
316        
317        protected abstract boolean isStoreEmpty();
318    }