001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.LinkedList;
021import java.util.List;
022import org.apache.activemq.command.MessageDispatch;
023
024public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
025    private static final Integer MAX_PRIORITY = 10;
026    private final Object mutex = new Object();
027    private final LinkedList<MessageDispatch>[] lists;
028    private boolean closed;
029    private boolean running;
030    private int size = 0;
031
032    public SimplePriorityMessageDispatchChannel() {
033        this.lists = new LinkedList[MAX_PRIORITY];
034        for (int i = 0; i < MAX_PRIORITY; i++) {
035            lists[i] = new LinkedList<MessageDispatch>();
036        }
037    }
038
039    /*
040     * (non-Javadoc)
041     * @see
042     * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
043     * .command.MessageDispatch)
044     */
045    public void enqueue(MessageDispatch message) {
046        synchronized (mutex) {
047            getList(message).addLast(message);
048
049            this.size++;
050            mutex.notify();
051        }
052    }
053
054    /*
055     * (non-Javadoc)
056     * @see
057     * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
058     * .command.MessageDispatch)
059     */
060    public void enqueueFirst(MessageDispatch message) {
061        synchronized (mutex) {
062            getList(message).addFirst(message);
063            this.size++;
064            mutex.notify();
065        }
066    }
067
068    /*
069     * (non-Javadoc)
070     * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
071     */
072    public boolean isEmpty() {
073        // synchronized (mutex) {
074        return this.size == 0;
075        // }
076    }
077
078    /*
079     * (non-Javadoc)
080     * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
081     */
082    public MessageDispatch dequeue(long timeout) throws InterruptedException {
083        synchronized (mutex) {
084            // Wait until the consumer is ready to deliver messages.
085            while (timeout != 0 && !closed && (isEmpty() || !running)) {
086                if (timeout == -1) {
087                    mutex.wait();
088                } else {
089                    mutex.wait(timeout);
090                    break;
091                }
092            }
093            if (closed || !running || isEmpty()) {
094                return null;
095            }
096            return removeFirst();
097        }
098    }
099
100    /*
101     * (non-Javadoc)
102     * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
103     */
104    public MessageDispatch dequeueNoWait() {
105        synchronized (mutex) {
106            if (closed || !running || isEmpty()) {
107                return null;
108            }
109            return removeFirst();
110        }
111    }
112
113    /*
114     * (non-Javadoc)
115     * @see org.apache.activemq.MessageDispatchChannelI#peek()
116     */
117    public MessageDispatch peek() {
118        synchronized (mutex) {
119            if (closed || !running || isEmpty()) {
120                return null;
121            }
122            return getFirst();
123        }
124    }
125
126    /*
127     * (non-Javadoc)
128     * @see org.apache.activemq.MessageDispatchChannelI#start()
129     */
130    public void start() {
131        synchronized (mutex) {
132            running = true;
133            mutex.notifyAll();
134        }
135    }
136
137    /*
138     * (non-Javadoc)
139     * @see org.apache.activemq.MessageDispatchChannelI#stop()
140     */
141    public void stop() {
142        synchronized (mutex) {
143            running = false;
144            mutex.notifyAll();
145        }
146    }
147
148    /*
149     * (non-Javadoc)
150     * @see org.apache.activemq.MessageDispatchChannelI#close()
151     */
152    public void close() {
153        synchronized (mutex) {
154            if (!closed) {
155                running = false;
156                closed = true;
157            }
158            mutex.notifyAll();
159        }
160    }
161
162    /*
163     * (non-Javadoc)
164     * @see org.apache.activemq.MessageDispatchChannelI#clear()
165     */
166    public void clear() {
167        synchronized (mutex) {
168            for (int i = 0; i < MAX_PRIORITY; i++) {
169                lists[i].clear();
170            }
171        }
172    }
173
174    /*
175     * (non-Javadoc)
176     * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
177     */
178    public boolean isClosed() {
179        return closed;
180    }
181
182    /*
183     * (non-Javadoc)
184     * @see org.apache.activemq.MessageDispatchChannelI#size()
185     */
186    public int size() {
187        synchronized (mutex) {
188            return this.size;
189        }
190    }
191
192    /*
193     * (non-Javadoc)
194     * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
195     */
196    public Object getMutex() {
197        return mutex;
198    }
199
200    /*
201     * (non-Javadoc)
202     * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
203     */
204    public boolean isRunning() {
205        return running;
206    }
207
208    /*
209     * (non-Javadoc)
210     * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
211     */
212    public List<MessageDispatch> removeAll() {
213
214        synchronized (mutex) {
215            ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
216            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
217                List<MessageDispatch> list = lists[i];
218                result.addAll(list);
219                size -= list.size();
220                list.clear();
221            }
222            return result;
223        }
224    }
225
226    @Override
227    public String toString() {
228
229        String result = "";
230        for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
231            result += i + ":{" + lists[i].toString() + "}";
232        }
233        return result;
234
235    }
236
237    protected int getPriority(MessageDispatch message) {
238        int priority = javax.jms.Message.DEFAULT_PRIORITY;
239        if (message.getMessage() != null) {
240                priority = Math.max(message.getMessage().getPriority(), 0);
241                priority = Math.min(priority, 9);
242        }
243        return priority;
244    }
245
246    protected LinkedList<MessageDispatch> getList(MessageDispatch md) {
247        return lists[getPriority(md)];
248    }
249
250    private final MessageDispatch removeFirst() {
251        if (this.size > 0) {
252            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
253                LinkedList<MessageDispatch> list = lists[i];
254                if (!list.isEmpty()) {
255                    this.size--;
256                    return list.removeFirst();
257                }
258            }
259        }
260        return null;
261    }
262
263    private final MessageDispatch getFirst() {
264        if (this.size > 0) {
265            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
266                LinkedList<MessageDispatch> list = lists[i];
267                if (!list.isEmpty()) {
268                    return list.getFirst();
269                }
270            }
271        }
272        return null;
273    }
274}