001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.LinkedList; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 import java.util.concurrent.atomic.AtomicLong; 024 import org.apache.activemq.broker.Broker; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.broker.region.Destination; 027 import org.apache.activemq.broker.region.IndirectMessageReference; 028 import org.apache.activemq.broker.region.MessageReference; 029 import org.apache.activemq.broker.region.QueueMessageReference; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 032 import org.apache.activemq.openwire.OpenWireFormat; 033 import org.apache.activemq.store.kahadb.plist.PList; 034 import org.apache.activemq.store.kahadb.plist.PListEntry; 035 import org.apache.activemq.store.kahadb.plist.PListStore; 036 import org.apache.activemq.usage.SystemUsage; 037 import org.apache.activemq.usage.Usage; 038 import org.apache.activemq.usage.UsageListener; 039 import org.apache.activemq.wireformat.WireFormat; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 import org.apache.kahadb.util.ByteSequence; 043 044 /** 045 * persist pending messages pending message (messages awaiting dispatch to a 046 * consumer) cursor 047 * 048 * 049 */ 050 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 051 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); 052 private static final AtomicLong NAME_COUNT = new AtomicLong(); 053 protected Broker broker; 054 private final PListStore store; 055 private final String name; 056 private PendingList memoryList; 057 private PList diskList; 058 private Iterator<MessageReference> iter; 059 private Destination regionDestination; 060 private boolean iterating; 061 private boolean flushRequired; 062 private final AtomicBoolean started = new AtomicBoolean(); 063 private final WireFormat wireFormat = new OpenWireFormat(); 064 /** 065 * @param broker 066 * @param name 067 * @param prioritizedMessages 068 */ 069 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { 070 super(prioritizedMessages); 071 if (this.prioritizedMessages) { 072 this.memoryList = new PrioritizedPendingList(); 073 } else { 074 this.memoryList = new OrderedPendingList(); 075 } 076 this.broker = broker; 077 // the store can be null if the BrokerService has persistence 078 // turned off 079 this.store = broker.getTempDataStore(); 080 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 081 } 082 083 @Override 084 public void start() throws Exception { 085 if (started.compareAndSet(false, true)) { 086 super.start(); 087 if (systemUsage != null) { 088 systemUsage.getMemoryUsage().addUsageListener(this); 089 } 090 } 091 } 092 093 @Override 094 public void stop() throws Exception { 095 if (started.compareAndSet(true, false)) { 096 super.stop(); 097 if (systemUsage != null) { 098 systemUsage.getMemoryUsage().removeUsageListener(this); 099 } 100 } 101 } 102 103 /** 104 * @return true if there are no pending messages 105 */ 106 @Override 107 public synchronized boolean isEmpty() { 108 if (memoryList.isEmpty() && isDiskListEmpty()) { 109 return true; 110 } 111 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 112 MessageReference node = iterator.next(); 113 if (node == QueueMessageReference.NULL_MESSAGE) { 114 continue; 115 } 116 if (!node.isDropped()) { 117 return false; 118 } 119 // We can remove dropped references. 120 iterator.remove(); 121 } 122 return isDiskListEmpty(); 123 } 124 125 /** 126 * reset the cursor 127 */ 128 @Override 129 public synchronized void reset() { 130 iterating = true; 131 last = null; 132 if (isDiskListEmpty()) { 133 this.iter = this.memoryList.iterator(); 134 } else { 135 this.iter = new DiskIterator(); 136 } 137 } 138 139 @Override 140 public synchronized void release() { 141 iterating = false; 142 if (iter instanceof DiskIterator) { 143 ((DiskIterator)iter).release(); 144 }; 145 if (flushRequired) { 146 flushRequired = false; 147 if (!hasSpace()) { 148 flushToDisk(); 149 } 150 } 151 } 152 153 @Override 154 public synchronized void destroy() throws Exception { 155 stop(); 156 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 157 Message node = (Message) i.next(); 158 node.decrementReferenceCount(); 159 } 160 memoryList.clear(); 161 destroyDiskList(); 162 } 163 164 private void destroyDiskList() throws Exception { 165 if (diskList != null) { 166 store.removePList(name); 167 diskList = null; 168 } 169 } 170 171 @Override 172 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 173 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 174 int count = 0; 175 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 176 MessageReference ref = i.next(); 177 ref.incrementReferenceCount(); 178 result.add(ref); 179 count++; 180 } 181 if (count < maxItems && !isDiskListEmpty()) { 182 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) { 183 Message message = (Message) i.next(); 184 message.setRegionDestination(regionDestination); 185 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 186 message.incrementReferenceCount(); 187 result.add(message); 188 count++; 189 } 190 } 191 return result; 192 } 193 194 /** 195 * add message to await dispatch 196 * 197 * @param node 198 * @throws Exception 199 */ 200 @Override 201 public synchronized void addMessageLast(MessageReference node) throws Exception { 202 tryAddMessageLast(node, 0); 203 } 204 205 @Override 206 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 207 if (!node.isExpired()) { 208 try { 209 regionDestination = node.getMessage().getRegionDestination(); 210 if (isDiskListEmpty()) { 211 if (hasSpace() || this.store == null) { 212 memoryList.addMessageLast(node); 213 node.incrementReferenceCount(); 214 setCacheEnabled(true); 215 return true; 216 } 217 } 218 if (!hasSpace()) { 219 if (isDiskListEmpty()) { 220 expireOldMessages(); 221 if (hasSpace()) { 222 memoryList.addMessageLast(node); 223 node.incrementReferenceCount(); 224 return true; 225 } else { 226 flushToDisk(); 227 } 228 } 229 } 230 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) { 231 ByteSequence bs = getByteSequence(node.getMessage()); 232 getDiskList().addLast(node.getMessageId().toString(), bs); 233 return true; 234 } 235 return false; 236 237 } catch (Exception e) { 238 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e); 239 throw new RuntimeException(e); 240 } 241 } else { 242 discardExpiredMessage(node); 243 } 244 //message expired 245 return true; 246 } 247 248 /** 249 * add message to await dispatch 250 * 251 * @param node 252 */ 253 @Override 254 public synchronized void addMessageFirst(MessageReference node) { 255 if (!node.isExpired()) { 256 try { 257 regionDestination = node.getMessage().getRegionDestination(); 258 if (isDiskListEmpty()) { 259 if (hasSpace()) { 260 memoryList.addMessageFirst(node); 261 node.incrementReferenceCount(); 262 setCacheEnabled(true); 263 return; 264 } 265 } 266 if (!hasSpace()) { 267 if (isDiskListEmpty()) { 268 expireOldMessages(); 269 if (hasSpace()) { 270 memoryList.addMessageFirst(node); 271 node.incrementReferenceCount(); 272 return; 273 } else { 274 flushToDisk(); 275 } 276 } 277 } 278 systemUsage.getTempUsage().waitForSpace(); 279 node.decrementReferenceCount(); 280 ByteSequence bs = getByteSequence(node.getMessage()); 281 getDiskList().addFirst(node.getMessageId().toString(), bs); 282 283 } catch (Exception e) { 284 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e); 285 throw new RuntimeException(e); 286 } 287 } else { 288 discardExpiredMessage(node); 289 } 290 } 291 292 /** 293 * @return true if there pending messages to dispatch 294 */ 295 @Override 296 public synchronized boolean hasNext() { 297 return iter.hasNext(); 298 } 299 300 /** 301 * @return the next pending message 302 */ 303 @Override 304 public synchronized MessageReference next() { 305 MessageReference reference = iter.next(); 306 last = reference; 307 if (!isDiskListEmpty()) { 308 // got from disk 309 reference.getMessage().setRegionDestination(regionDestination); 310 reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 311 } 312 reference.incrementReferenceCount(); 313 return reference; 314 } 315 316 /** 317 * remove the message at the cursor position 318 */ 319 @Override 320 public synchronized void remove() { 321 iter.remove(); 322 if (last != null) { 323 last.decrementReferenceCount(); 324 } 325 } 326 327 /** 328 * @param node 329 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 330 */ 331 @Override 332 public synchronized void remove(MessageReference node) { 333 if (memoryList.remove(node) != null) { 334 node.decrementReferenceCount(); 335 } 336 if (!isDiskListEmpty()) { 337 try { 338 getDiskList().remove(node.getMessageId().toString()); 339 } catch (IOException e) { 340 throw new RuntimeException(e); 341 } 342 } 343 } 344 345 /** 346 * @return the number of pending messages 347 */ 348 @Override 349 public synchronized int size() { 350 return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size()); 351 } 352 353 /** 354 * clear all pending messages 355 */ 356 @Override 357 public synchronized void clear() { 358 memoryList.clear(); 359 if (!isDiskListEmpty()) { 360 try { 361 getDiskList().destroy(); 362 } catch (IOException e) { 363 throw new RuntimeException(e); 364 } 365 } 366 last = null; 367 } 368 369 @Override 370 public synchronized boolean isFull() { 371 372 return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); 373 374 } 375 376 @Override 377 public boolean hasMessagesBufferedToDeliver() { 378 return !isEmpty(); 379 } 380 381 @Override 382 public void setSystemUsage(SystemUsage usageManager) { 383 super.setSystemUsage(usageManager); 384 } 385 386 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 387 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 388 synchronized (this) { 389 if (!flushRequired && size() != 0) { 390 flushRequired =true; 391 if (!iterating) { 392 expireOldMessages(); 393 if (!hasSpace()) { 394 flushToDisk(); 395 flushRequired = false; 396 } 397 } 398 } 399 } 400 } 401 } 402 403 @Override 404 public boolean isTransient() { 405 return true; 406 } 407 408 protected boolean isSpaceInMemoryList() { 409 return hasSpace() && isDiskListEmpty(); 410 } 411 412 protected synchronized void expireOldMessages() { 413 if (!memoryList.isEmpty()) { 414 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 415 MessageReference node = iterator.next(); 416 if (node.isExpired()) { 417 node.decrementReferenceCount(); 418 discardExpiredMessage(node); 419 iterator.remove(); 420 } 421 } 422 } 423 } 424 425 protected synchronized void flushToDisk() { 426 if (!memoryList.isEmpty() && store != null) { 427 long start = 0; 428 if (LOG.isTraceEnabled()) { 429 start = System.currentTimeMillis(); 430 LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size() + " " + (systemUsage != null ? systemUsage.getMemoryUsage() : "") ); 431 } 432 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 433 MessageReference node = iterator.next(); 434 node.decrementReferenceCount(); 435 ByteSequence bs; 436 try { 437 bs = getByteSequence(node.getMessage()); 438 getDiskList().addLast(node.getMessageId().toString(), bs); 439 } catch (IOException e) { 440 LOG.error("Failed to write to disk list", e); 441 throw new RuntimeException(e); 442 } 443 444 } 445 memoryList.clear(); 446 setCacheEnabled(false); 447 if (LOG.isTraceEnabled()) { 448 LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : "")); 449 } 450 } 451 } 452 453 protected boolean isDiskListEmpty() { 454 return diskList == null || diskList.isEmpty(); 455 } 456 457 protected PList getDiskList() { 458 if (diskList == null) { 459 try { 460 diskList = store.getPList(name); 461 } catch (Exception e) { 462 LOG.error("Caught an IO Exception getting the DiskList " + name, e); 463 throw new RuntimeException(e); 464 } 465 } 466 return diskList; 467 } 468 469 private void discardExpiredMessage(MessageReference reference) { 470 if (LOG.isDebugEnabled()) { 471 LOG.debug("Discarding expired message " + reference); 472 } 473 if (broker.isExpired(reference)) { 474 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 475 context.setBroker(broker); 476 reference.getRegionDestination().messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); 477 } 478 } 479 480 protected ByteSequence getByteSequence(Message message) throws IOException { 481 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 482 return new ByteSequence(packet.data, packet.offset, packet.length); 483 } 484 485 protected Message getMessage(ByteSequence bs) throws IOException { 486 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs 487 .getOffset(), bs.getLength()); 488 return (Message) this.wireFormat.unmarshal(packet); 489 490 } 491 492 final class DiskIterator implements Iterator<MessageReference> { 493 private final PList.PListIterator iterator; 494 DiskIterator() { 495 try { 496 iterator = getDiskList().iterator(); 497 } catch (Exception e) { 498 throw new RuntimeException(e); 499 } 500 } 501 502 public boolean hasNext() { 503 return iterator.hasNext(); 504 } 505 506 public MessageReference next() { 507 try { 508 PListEntry entry = iterator.next(); 509 return getMessage(entry.getByteSequence()); 510 } catch (IOException e) { 511 LOG.error("I/O error", e); 512 throw new RuntimeException(e); 513 } 514 } 515 516 public void remove() { 517 iterator.remove(); 518 } 519 520 public void release() { 521 iterator.release(); 522 } 523 } 524 }