001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.broker.region.MessageReference;
026import org.apache.activemq.broker.region.QueueMessageReference;
027
028/**
029 * hold pending messages in a linked list (messages awaiting disptach to a
030 * consumer) cursor
031 * 
032 * 
033 */
034public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
035    private final PendingList list;
036    private Iterator<MessageReference> iter;
037    
038    public VMPendingMessageCursor(boolean prioritizedMessages) {
039        super(prioritizedMessages);
040        if (this.prioritizedMessages) {
041            this.list= new PrioritizedPendingList();
042        }else {
043            this.list = new OrderedPendingList();
044        }
045    }
046
047    
048    public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
049            throws Exception {
050        List<MessageReference> rc = new ArrayList<MessageReference>();
051        for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
052            MessageReference r = iterator.next();
053            if (r.getRegionDestination() == destination) {
054                r.decrementReferenceCount();
055                rc.add(r);
056                iterator.remove();
057            }
058        }
059        return rc;
060    }
061
062    /**
063     * @return true if there are no pending messages
064     */
065    
066    public synchronized boolean isEmpty() {
067        if (list.isEmpty()) {
068            return true;
069        } else {
070            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
071                MessageReference node = iterator.next();
072                if (node == QueueMessageReference.NULL_MESSAGE) {
073                    continue;
074                }
075                if (!node.isDropped()) {
076                    return false;
077                }
078                // We can remove dropped references.
079                iterator.remove();
080            }
081            return true;
082        }
083    }
084
085    /**
086     * reset the cursor
087     */
088    
089    public synchronized void reset() {
090        iter = list.iterator();
091        last = null;
092    }
093
094    /**
095     * add message to await dispatch
096     * 
097     * @param node
098     */
099    
100    public synchronized void addMessageLast(MessageReference node) {
101        node.incrementReferenceCount();
102        list.addMessageLast(node);
103    }
104
105    /**
106     * add message to await dispatch
107     * 
108     * @param position
109     * @param node
110     */
111    
112    public synchronized void addMessageFirst(MessageReference node) {
113        node.incrementReferenceCount();
114        list.addMessageFirst(node);
115    }
116
117    /**
118     * @return true if there pending messages to dispatch
119     */
120    
121    public synchronized boolean hasNext() {
122        return iter.hasNext();
123    }
124
125    /**
126     * @return the next pending message
127     */
128    
129    public synchronized MessageReference next() {
130        last = iter.next();
131        if (last != null) {
132            last.incrementReferenceCount();
133        }
134        return last;
135    }
136
137    /**
138     * remove the message at the cursor position
139     */
140    
141    public synchronized void remove() {
142        if (last != null) {
143            last.decrementReferenceCount();
144        }
145        iter.remove();
146    }
147
148    /**
149     * @return the number of pending messages
150     */
151    
152    public synchronized int size() {
153        return list.size();
154    }
155
156    /**
157     * clear all pending messages
158     */
159    
160    public synchronized void clear() {
161        for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
162            MessageReference ref = i.next();
163            ref.decrementReferenceCount();
164        }
165        list.clear();
166    }
167
168    
169    public synchronized void remove(MessageReference node) {
170        list.remove(node);
171        node.decrementReferenceCount();
172    }
173
174    /**
175     * Page in a restricted number of messages
176     * 
177     * @param maxItems
178     * @return a list of paged in messages
179     */
180    
181    public LinkedList<MessageReference> pageInList(int maxItems) {
182        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
183        for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
184            MessageReference ref = i.next();
185            ref.incrementReferenceCount();
186            result.add(ref);
187            if (result.size() >= maxItems) {
188                break;
189            }
190        }
191        return result;
192    }
193
194    
195    public boolean isTransient() {
196        return true;
197    }
198
199    
200    public void destroy() throws Exception {
201        super.destroy();
202        clear();
203    }
204}