001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Set;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.BaseDestination;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 * 
037 * 
038 */
039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040    protected int memoryUsageHighWaterMark = 70;
041    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042    protected SystemUsage systemUsage;
043    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045    protected boolean enableAudit=true;
046    protected ActiveMQMessageAudit audit;
047    protected boolean useCache=true;
048    private boolean cacheEnabled=true;
049    private boolean started=false;
050    protected MessageReference last = null;
051    protected final boolean prioritizedMessages;
052    
053    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054        this.prioritizedMessages=prioritizedMessages;
055    }
056  
057
058    public synchronized void start() throws Exception  {
059        if (!started && enableAudit && audit==null) {
060            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061        }
062        started=true;
063    }
064
065    public synchronized void stop() throws Exception  {
066        started=false;
067        gc();
068    }
069
070    public void add(ConnectionContext context, Destination destination) throws Exception {
071    }
072
073    @SuppressWarnings("unchecked")
074    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
075        return Collections.EMPTY_LIST;
076    }
077
078    public boolean isRecoveryRequired() {
079        return true;
080    }
081
082    public void addMessageFirst(MessageReference node) throws Exception {
083    }
084
085    public void addMessageLast(MessageReference node) throws Exception {
086    }
087    
088    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
089        addMessageLast(node);
090        return true;
091    }
092
093    public void addRecoveredMessage(MessageReference node) throws Exception {
094        addMessageLast(node);
095    }
096
097    public void clear() {
098    }
099
100    public boolean hasNext() {
101        return false;
102    }
103
104    public boolean isEmpty() {
105        return false;
106    }
107
108    public boolean isEmpty(Destination destination) {
109        return isEmpty();
110    }
111
112    public MessageReference next() {
113        return null;
114    }
115
116    public void remove() {
117    }
118
119    public void reset() {
120    }
121
122    public int size() {
123        return 0;
124    }
125
126    public int getMaxBatchSize() {
127        return maxBatchSize;
128    }
129
130    public void setMaxBatchSize(int maxBatchSize) {
131        this.maxBatchSize = maxBatchSize;
132    }
133
134    protected void fillBatch() throws Exception {
135    }
136
137    public void resetForGC() {
138        reset();
139    }
140
141    public void remove(MessageReference node) {
142    }
143
144    public void gc() {
145    }
146
147    public void setSystemUsage(SystemUsage usageManager) {
148        this.systemUsage = usageManager;
149    }
150
151    public boolean hasSpace() {
152        return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
153    }
154
155    public boolean isFull() {
156        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
157    }
158
159    public void release() {
160    }
161
162    public boolean hasMessagesBufferedToDeliver() {
163        return false;
164    }
165
166    /**
167     * @return the memoryUsageHighWaterMark
168     */
169    public int getMemoryUsageHighWaterMark() {
170        return memoryUsageHighWaterMark;
171    }
172
173    /**
174     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
175     */
176    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
177        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
178    }
179
180    /**
181     * @return the usageManager
182     */
183    public SystemUsage getSystemUsage() {
184        return this.systemUsage;
185    }
186
187    /**
188     * destroy the cursor
189     * 
190     * @throws Exception
191     */
192    public void destroy() throws Exception {
193        stop();
194    }
195
196    /**
197     * Page in a restricted number of messages
198     * 
199     * @param maxItems maximum number of messages to return
200     * @return a list of paged in messages
201     */
202    public LinkedList<MessageReference> pageInList(int maxItems) {
203        throw new RuntimeException("Not supported");
204    }
205
206    /**
207     * @return the maxProducersToAudit
208     */
209    public int getMaxProducersToAudit() {
210        return maxProducersToAudit;
211    }
212
213    /**
214     * @param maxProducersToAudit the maxProducersToAudit to set
215     */
216    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
217        this.maxProducersToAudit = maxProducersToAudit;
218        if (audit != null) {
219            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
220        }
221    }
222
223    /**
224     * @return the maxAuditDepth
225     */
226    public int getMaxAuditDepth() {
227        return maxAuditDepth;
228    }
229    
230
231    /**
232     * @param maxAuditDepth the maxAuditDepth to set
233     */
234    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
235        this.maxAuditDepth = maxAuditDepth;
236        if (audit != null) {
237            audit.setAuditDepth(maxAuditDepth);
238        }
239    }
240    
241    
242    /**
243     * @return the enableAudit
244     */
245    public boolean isEnableAudit() {
246        return enableAudit;
247    }
248
249    /**
250     * @param enableAudit the enableAudit to set
251     */
252    public synchronized void setEnableAudit(boolean enableAudit) {
253        this.enableAudit = enableAudit;
254        if (enableAudit && started && audit==null) {
255            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
256        }
257    }
258    
259    public boolean isTransient() {
260        return false;
261    }
262    
263       
264    /**
265     * set the audit
266     * @param audit new audit component
267     */
268    public void setMessageAudit(ActiveMQMessageAudit audit) {
269        this.audit=audit;
270    }
271    
272    
273    /**
274     * @return the audit
275     */
276    public ActiveMQMessageAudit getMessageAudit() {
277        return audit;
278    }
279    
280    public boolean isUseCache() {
281        return useCache;
282    }
283
284    public void setUseCache(boolean useCache) {
285        this.useCache = useCache;
286    }
287
288    public synchronized boolean isDuplicate(MessageId messageId) {
289        boolean unique = recordUniqueId(messageId);
290        rollback(messageId);
291        return !unique;
292    }
293    
294    /**
295     * records a message id and checks if it is a duplicate
296     * @param messageId
297     * @return true if id is unique, false otherwise.
298     */
299    public synchronized boolean recordUniqueId(MessageId messageId) {
300        if (!enableAudit || audit==null) {
301            return true;
302        }
303        return !audit.isDuplicate(messageId);
304    }
305    
306    public synchronized void rollback(MessageId id) {
307        if (audit != null) {
308            audit.rollback(id);
309        }
310    }
311    
312    protected synchronized boolean isStarted() {
313        return started;
314    }
315    
316    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
317        boolean result = false;
318        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
319        if (destinations != null) {
320            for (Destination dest:destinations) {
321                if (dest.isPrioritizedMessages()) {
322                    result = true;
323                    break;
324                }
325            }
326        }
327        return result;
328
329    }
330
331    public synchronized boolean isCacheEnabled() {
332        return cacheEnabled;
333    }
334
335    public synchronized void setCacheEnabled(boolean val) {
336        cacheEnabled = val;
337    }
338}