001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.util.ArrayList; 020 import java.util.LinkedList; 021 import java.util.List; 022 import org.apache.activemq.command.MessageDispatch; 023 024 public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel { 025 private static final Integer MAX_PRIORITY = 10; 026 private final Object mutex = new Object(); 027 private final LinkedList<MessageDispatch>[] lists; 028 private boolean closed; 029 private boolean running; 030 private int size = 0; 031 032 public SimplePriorityMessageDispatchChannel() { 033 this.lists = new LinkedList[MAX_PRIORITY]; 034 for (int i = 0; i < MAX_PRIORITY; i++) { 035 lists[i] = new LinkedList<MessageDispatch>(); 036 } 037 } 038 039 /* 040 * (non-Javadoc) 041 * @see 042 * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq 043 * .command.MessageDispatch) 044 */ 045 public void enqueue(MessageDispatch message) { 046 synchronized (mutex) { 047 getList(message).addLast(message); 048 049 this.size++; 050 mutex.notify(); 051 } 052 } 053 054 /* 055 * (non-Javadoc) 056 * @see 057 * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq 058 * .command.MessageDispatch) 059 */ 060 public void enqueueFirst(MessageDispatch message) { 061 synchronized (mutex) { 062 getList(message).addFirst(message); 063 this.size++; 064 mutex.notify(); 065 } 066 } 067 068 /* 069 * (non-Javadoc) 070 * @see org.apache.activemq.MessageDispatchChannelI#isEmpty() 071 */ 072 public boolean isEmpty() { 073 // synchronized (mutex) { 074 return this.size == 0; 075 // } 076 } 077 078 /* 079 * (non-Javadoc) 080 * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long) 081 */ 082 public MessageDispatch dequeue(long timeout) throws InterruptedException { 083 synchronized (mutex) { 084 // Wait until the consumer is ready to deliver messages. 085 while (timeout != 0 && !closed && (isEmpty() || !running)) { 086 if (timeout == -1) { 087 mutex.wait(); 088 } else { 089 mutex.wait(timeout); 090 break; 091 } 092 } 093 if (closed || !running || isEmpty()) { 094 return null; 095 } 096 return removeFirst(); 097 } 098 } 099 100 /* 101 * (non-Javadoc) 102 * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait() 103 */ 104 public MessageDispatch dequeueNoWait() { 105 synchronized (mutex) { 106 if (closed || !running || isEmpty()) { 107 return null; 108 } 109 return removeFirst(); 110 } 111 } 112 113 /* 114 * (non-Javadoc) 115 * @see org.apache.activemq.MessageDispatchChannelI#peek() 116 */ 117 public MessageDispatch peek() { 118 synchronized (mutex) { 119 if (closed || !running || isEmpty()) { 120 return null; 121 } 122 return getFirst(); 123 } 124 } 125 126 /* 127 * (non-Javadoc) 128 * @see org.apache.activemq.MessageDispatchChannelI#start() 129 */ 130 public void start() { 131 synchronized (mutex) { 132 running = true; 133 mutex.notifyAll(); 134 } 135 } 136 137 /* 138 * (non-Javadoc) 139 * @see org.apache.activemq.MessageDispatchChannelI#stop() 140 */ 141 public void stop() { 142 synchronized (mutex) { 143 running = false; 144 mutex.notifyAll(); 145 } 146 } 147 148 /* 149 * (non-Javadoc) 150 * @see org.apache.activemq.MessageDispatchChannelI#close() 151 */ 152 public void close() { 153 synchronized (mutex) { 154 if (!closed) { 155 running = false; 156 closed = true; 157 } 158 mutex.notifyAll(); 159 } 160 } 161 162 /* 163 * (non-Javadoc) 164 * @see org.apache.activemq.MessageDispatchChannelI#clear() 165 */ 166 public void clear() { 167 synchronized (mutex) { 168 for (int i = 0; i < MAX_PRIORITY; i++) { 169 lists[i].clear(); 170 } 171 } 172 } 173 174 /* 175 * (non-Javadoc) 176 * @see org.apache.activemq.MessageDispatchChannelI#isClosed() 177 */ 178 public boolean isClosed() { 179 return closed; 180 } 181 182 /* 183 * (non-Javadoc) 184 * @see org.apache.activemq.MessageDispatchChannelI#size() 185 */ 186 public int size() { 187 synchronized (mutex) { 188 return this.size; 189 } 190 } 191 192 /* 193 * (non-Javadoc) 194 * @see org.apache.activemq.MessageDispatchChannelI#getMutex() 195 */ 196 public Object getMutex() { 197 return mutex; 198 } 199 200 /* 201 * (non-Javadoc) 202 * @see org.apache.activemq.MessageDispatchChannelI#isRunning() 203 */ 204 public boolean isRunning() { 205 return running; 206 } 207 208 /* 209 * (non-Javadoc) 210 * @see org.apache.activemq.MessageDispatchChannelI#removeAll() 211 */ 212 public List<MessageDispatch> removeAll() { 213 214 synchronized (mutex) { 215 ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size()); 216 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 217 List<MessageDispatch> list = lists[i]; 218 result.addAll(list); 219 size -= list.size(); 220 list.clear(); 221 } 222 return result; 223 } 224 } 225 226 @Override 227 public String toString() { 228 229 String result = ""; 230 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 231 result += i + ":{" + lists[i].toString() + "}"; 232 } 233 return result; 234 235 } 236 237 protected int getPriority(MessageDispatch message) { 238 int priority = javax.jms.Message.DEFAULT_PRIORITY; 239 if (message.getMessage() != null) { 240 priority = Math.max(message.getMessage().getPriority(), 0); 241 priority = Math.min(priority, 9); 242 } 243 return priority; 244 } 245 246 protected LinkedList<MessageDispatch> getList(MessageDispatch md) { 247 return lists[getPriority(md)]; 248 } 249 250 private final MessageDispatch removeFirst() { 251 if (this.size > 0) { 252 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 253 LinkedList<MessageDispatch> list = lists[i]; 254 if (!list.isEmpty()) { 255 this.size--; 256 return list.removeFirst(); 257 } 258 } 259 } 260 return null; 261 } 262 263 private final MessageDispatch getFirst() { 264 if (this.size > 0) { 265 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 266 LinkedList<MessageDispatch> list = lists[i]; 267 if (!list.isEmpty()) { 268 return list.getFirst(); 269 } 270 } 271 } 272 return null; 273 } 274 }