001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.broker.region.QueueMessageReference;
028
029/**
030 * hold pending messages in a linked list (messages awaiting disptach to a
031 * consumer) cursor
032 *
033 *
034 */
035public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
036    private final PendingList list;
037    private Iterator<MessageReference> iter;
038
039    public VMPendingMessageCursor(boolean prioritizedMessages) {
040        super(prioritizedMessages);
041        if (this.prioritizedMessages) {
042            this.list= new PrioritizedPendingList();
043        }else {
044            this.list = new OrderedPendingList();
045        }
046    }
047
048
049    @Override
050    public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
051            throws Exception {
052        List<MessageReference> rc = new ArrayList<MessageReference>();
053        for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
054            MessageReference r = iterator.next();
055            if (r.getRegionDestination() == destination) {
056                r.decrementReferenceCount();
057                rc.add(r);
058                iterator.remove();
059            }
060        }
061        return rc;
062    }
063
064    /**
065     * @return true if there are no pending messages
066     */
067
068    @Override
069    public synchronized boolean isEmpty() {
070        if (list.isEmpty()) {
071            return true;
072        } else {
073            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
074                MessageReference node = iterator.next();
075                if (node == QueueMessageReference.NULL_MESSAGE) {
076                    continue;
077                }
078                if (!node.isDropped()) {
079                    return false;
080                }
081                // We can remove dropped references.
082                iterator.remove();
083            }
084            return true;
085        }
086    }
087
088    /**
089     * reset the cursor
090     */
091
092    @Override
093    public synchronized void reset() {
094        iter = list.iterator();
095        last = null;
096    }
097
098    /**
099     * add message to await dispatch
100     *
101     * @param node
102     */
103
104    @Override
105    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) {
106        node.incrementReferenceCount();
107        list.addMessageLast(node);
108        return true;
109    }
110
111    /**
112     * add message to await dispatch
113     *
114     * @param node
115     */
116
117    @Override
118    public synchronized void addMessageFirst(MessageReference node) {
119        node.incrementReferenceCount();
120        list.addMessageFirst(node);
121    }
122
123    /**
124     * @return true if there pending messages to dispatch
125     */
126
127    @Override
128    public synchronized boolean hasNext() {
129        return iter.hasNext();
130    }
131
132    /**
133     * @return the next pending message
134     */
135
136    @Override
137    public synchronized MessageReference next() {
138        last = iter.next();
139        if (last != null) {
140            last.incrementReferenceCount();
141        }
142        return last;
143    }
144
145    /**
146     * remove the message at the cursor position
147     */
148
149    @Override
150    public synchronized void remove() {
151        if (last != null) {
152            last.decrementReferenceCount();
153        }
154        iter.remove();
155    }
156
157    /**
158     * @return the number of pending messages
159     */
160
161    @Override
162    public synchronized int size() {
163        return list.size();
164    }
165
166    @Override
167    public synchronized long messageSize() {
168        return list.messageSize();
169    }
170
171    /**
172     * clear all pending messages
173     */
174
175    @Override
176    public synchronized void clear() {
177        for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
178            MessageReference ref = i.next();
179            ref.decrementReferenceCount();
180        }
181        list.clear();
182    }
183
184
185    @Override
186    public synchronized void remove(MessageReference node) {
187        list.remove(node);
188        node.decrementReferenceCount();
189    }
190
191    /**
192     * Page in a restricted number of messages
193     *
194     * @param maxItems
195     * @return a list of paged in messages
196     */
197
198    @Override
199    public LinkedList<MessageReference> pageInList(int maxItems) {
200        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
201        for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
202            MessageReference ref = i.next();
203            ref.incrementReferenceCount();
204            result.add(ref);
205            if (result.size() >= maxItems) {
206                break;
207            }
208        }
209        return result;
210    }
211
212
213    @Override
214    public boolean isTransient() {
215        return true;
216    }
217
218
219    @Override
220    public void destroy() throws Exception {
221        super.destroy();
222        clear();
223    }
224}