001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.MessageReference;
021import org.apache.activemq.broker.region.Queue;
022import org.apache.activemq.command.Message;
023import org.apache.activemq.usage.SystemUsage;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Store based Cursor for Queues
029 */
030public class StoreQueueCursor extends AbstractPendingMessageCursor {
031
032    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
033    private final Broker broker;
034    private int pendingCount;
035    private final Queue queue;
036    private PendingMessageCursor nonPersistent;
037    private final QueueStorePrefetch persistent;
038    private boolean started;
039    private PendingMessageCursor currentCursor;
040
041    /**
042     * Construct
043     * @param broker
044     * @param queue
045     */
046    public StoreQueueCursor(Broker broker,Queue queue) {
047        super((queue != null ? queue.isPrioritizedMessages():false));
048        this.broker=broker;
049        this.queue = queue;
050        this.persistent = new QueueStorePrefetch(queue);
051        currentCursor = persistent;
052    }
053
054    public synchronized void start() throws Exception {
055        started = true;
056        super.start();
057        if (nonPersistent == null) {
058            if (broker.getBrokerService().isPersistent()) {
059                nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
060            }else {
061                nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
062            }
063            nonPersistent.setMaxBatchSize(getMaxBatchSize());
064            nonPersistent.setSystemUsage(systemUsage);
065            nonPersistent.setEnableAudit(isEnableAudit());
066            nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
067            nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
068        }
069        nonPersistent.setMessageAudit(getMessageAudit());
070        nonPersistent.start();
071        persistent.setMessageAudit(getMessageAudit());
072        persistent.start();
073        pendingCount = persistent.size() + nonPersistent.size();
074    }
075
076    public synchronized void stop() throws Exception {
077        started = false;
078        if (nonPersistent != null) {
079//            nonPersistent.clear();
080//            nonPersistent.stop();
081//            nonPersistent.gc();
082          nonPersistent.destroy();
083        }
084        persistent.stop();
085        persistent.gc();
086        super.stop();
087        pendingCount = 0;
088    }
089
090    public synchronized void addMessageLast(MessageReference node) throws Exception {
091        if (node != null) {
092            Message msg = node.getMessage();
093            if (started) {
094                pendingCount++;
095                if (!msg.isPersistent()) {
096                    nonPersistent.addMessageLast(node);
097                }
098            }
099            if (msg.isPersistent()) {
100                persistent.addMessageLast(node);
101            }
102        }
103    }
104
105    public synchronized void addMessageFirst(MessageReference node) throws Exception {
106        if (node != null) {
107            Message msg = node.getMessage();
108            if (started) {
109                pendingCount++;
110                if (!msg.isPersistent()) {
111                    nonPersistent.addMessageFirst(node);
112                }
113            }
114            if (msg.isPersistent()) {
115                persistent.addMessageFirst(node);
116            }
117        }
118    }
119
120    public synchronized void clear() {
121        pendingCount = 0;
122    }
123
124    public synchronized boolean hasNext() {
125        try {
126            getNextCursor();
127        } catch (Exception e) {
128            LOG.error("Failed to get current cursor ", e);
129            throw new RuntimeException(e);
130       }
131       return currentCursor != null ? currentCursor.hasNext() : false;
132    }
133
134    public synchronized MessageReference next() {
135        MessageReference result = currentCursor != null ? currentCursor.next() : null;
136        return result;
137    }
138
139    public synchronized void remove() {
140        if (currentCursor != null) {
141            currentCursor.remove();
142        }
143        pendingCount--;
144    }
145
146    public synchronized void remove(MessageReference node) {
147        if (!node.isPersistent()) {
148            nonPersistent.remove(node);
149        } else {
150            persistent.remove(node);
151        }
152        pendingCount--;
153    }
154
155    public synchronized void reset() {
156        nonPersistent.reset();
157        persistent.reset();
158        pendingCount = persistent.size() + nonPersistent.size();
159    }
160
161    public void release() {
162        nonPersistent.release();
163        persistent.release();
164    }
165
166
167    public synchronized int size() {
168        if (pendingCount < 0) {
169            pendingCount = persistent.size() + nonPersistent.size();
170        }
171        return pendingCount;
172    }
173
174    public synchronized boolean isEmpty() {
175        // if negative, more messages arrived in store since last reset so non empty
176        return pendingCount == 0;
177    }
178
179    /**
180     * Informs the Broker if the subscription needs to intervention to recover
181     * it's state e.g. DurableTopicSubscriber may do
182     *
183     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
184     * @return true if recovery required
185     */
186    public boolean isRecoveryRequired() {
187        return false;
188    }
189
190    /**
191     * @return the nonPersistent Cursor
192     */
193    public PendingMessageCursor getNonPersistent() {
194        return this.nonPersistent;
195    }
196
197    /**
198     * @param nonPersistent cursor to set
199     */
200    public void setNonPersistent(PendingMessageCursor nonPersistent) {
201        this.nonPersistent = nonPersistent;
202    }
203
204    public void setMaxBatchSize(int maxBatchSize) {
205        persistent.setMaxBatchSize(maxBatchSize);
206        if (nonPersistent != null) {
207            nonPersistent.setMaxBatchSize(maxBatchSize);
208        }
209        super.setMaxBatchSize(maxBatchSize);
210    }
211
212
213    public void setMaxProducersToAudit(int maxProducersToAudit) {
214        super.setMaxProducersToAudit(maxProducersToAudit);
215        if (persistent != null) {
216            persistent.setMaxProducersToAudit(maxProducersToAudit);
217        }
218        if (nonPersistent != null) {
219            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
220        }
221    }
222
223    public void setMaxAuditDepth(int maxAuditDepth) {
224        super.setMaxAuditDepth(maxAuditDepth);
225        if (persistent != null) {
226            persistent.setMaxAuditDepth(maxAuditDepth);
227        }
228        if (nonPersistent != null) {
229            nonPersistent.setMaxAuditDepth(maxAuditDepth);
230        }
231    }
232
233    public void setEnableAudit(boolean enableAudit) {
234        super.setEnableAudit(enableAudit);
235        if (persistent != null) {
236            persistent.setEnableAudit(enableAudit);
237        }
238        if (nonPersistent != null) {
239            nonPersistent.setEnableAudit(enableAudit);
240        }
241    }
242
243    @Override
244    public void setUseCache(boolean useCache) {
245        super.setUseCache(useCache);
246        if (persistent != null) {
247            persistent.setUseCache(useCache);
248        }
249        if (nonPersistent != null) {
250            nonPersistent.setUseCache(useCache);
251        }
252    }
253
254    @Override
255    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
256        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
257        if (persistent != null) {
258            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
259        }
260        if (nonPersistent != null) {
261            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
262        }
263    }
264
265
266
267    public synchronized void gc() {
268        if (persistent != null) {
269            persistent.gc();
270        }
271        if (nonPersistent != null) {
272            nonPersistent.gc();
273        }
274        pendingCount = persistent.size() + nonPersistent.size();
275    }
276
277    public void setSystemUsage(SystemUsage usageManager) {
278        super.setSystemUsage(usageManager);
279        if (persistent != null) {
280            persistent.setSystemUsage(usageManager);
281        }
282        if (nonPersistent != null) {
283            nonPersistent.setSystemUsage(usageManager);
284        }
285    }
286
287    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
288        if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
289            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
290            // sanity check
291            if (currentCursor.isEmpty()) {
292                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
293            }
294        }
295        return currentCursor;
296    }
297
298    @Override
299    public boolean isCacheEnabled() {
300        boolean cacheEnabled = isUseCache();
301        if (cacheEnabled) {
302            if (persistent != null) {
303                cacheEnabled &= persistent.isCacheEnabled();
304            }
305            if (nonPersistent != null) {
306                cacheEnabled &= nonPersistent.isCacheEnabled();
307            }
308            setCacheEnabled(cacheEnabled);
309        }
310        return cacheEnabled;
311    }
312}