001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.List; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import javax.jms.InvalidSelectorException; 028import javax.jms.JMSException; 029 030import org.apache.activemq.broker.Broker; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 034import org.apache.activemq.command.ActiveMQMessage; 035import org.apache.activemq.command.ConsumerControl; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.MessageDispatch; 040import org.apache.activemq.command.MessageDispatchNotification; 041import org.apache.activemq.command.MessageId; 042import org.apache.activemq.command.MessagePull; 043import org.apache.activemq.command.Response; 044import org.apache.activemq.thread.Scheduler; 045import org.apache.activemq.transaction.Synchronization; 046import org.apache.activemq.usage.SystemUsage; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A subscription that honors the pre-fetch option of the ConsumerInfo. 052 */ 053public abstract class PrefetchSubscription extends AbstractSubscription { 054 055 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 056 protected final Scheduler scheduler; 057 058 protected PendingMessageCursor pending; 059 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 060 protected final AtomicInteger prefetchExtension = new AtomicInteger(); 061 protected boolean usePrefetchExtension = true; 062 protected long enqueueCounter; 063 protected long dispatchCounter; 064 protected long dequeueCounter; 065 private int maxProducersToAudit=32; 066 private int maxAuditDepth=2048; 067 protected final SystemUsage usageManager; 068 protected final Object pendingLock = new Object(); 069 protected final Object dispatchLock = new Object(); 070 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 071 072 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { 073 super(broker,context, info); 074 this.usageManager=usageManager; 075 pending = cursor; 076 this.scheduler = broker.getScheduler(); 077 } 078 079 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { 080 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 081 } 082 083 /** 084 * Allows a message to be pulled on demand by a client 085 */ 086 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { 087 // The slave should not deliver pull messages. 088 // TODO: when the slave becomes a master, He should send a NULL message to all the 089 // consumers to 'wake them up' in case they were waiting for a message. 090 if (getPrefetchSize() == 0 && !isSlave()) { 091 092 prefetchExtension.incrementAndGet(); 093 final long dispatchCounterBeforePull = dispatchCounter; 094 095 // Have the destination push us some messages. 096 for (Destination dest : destinations) { 097 dest.iterate(); 098 } 099 dispatchPending(); 100 101 synchronized(this) { 102 // If there was nothing dispatched.. we may need to setup a timeout. 103 if (dispatchCounterBeforePull == dispatchCounter) { 104 // immediate timeout used by receiveNoWait() 105 if (pull.getTimeout() == -1) { 106 // Send a NULL message. 107 add(QueueMessageReference.NULL_MESSAGE); 108 dispatchPending(); 109 } 110 if (pull.getTimeout() > 0) { 111 scheduler.executeAfterDelay(new Runnable() { 112 @Override 113 public void run() { 114 pullTimeout(dispatchCounterBeforePull); 115 } 116 }, pull.getTimeout()); 117 } 118 } 119 } 120 } 121 return null; 122 } 123 124 /** 125 * Occurs when a pull times out. If nothing has been dispatched since the 126 * timeout was setup, then send the NULL message. 127 */ 128 final void pullTimeout(long dispatchCounterBeforePull) { 129 synchronized (pendingLock) { 130 if (dispatchCounterBeforePull == dispatchCounter) { 131 try { 132 add(QueueMessageReference.NULL_MESSAGE); 133 dispatchPending(); 134 } catch (Exception e) { 135 context.getConnection().serviceException(e); 136 } 137 } 138 } 139 } 140 141 public void add(MessageReference node) throws Exception { 142 synchronized (pendingLock) { 143 // The destination may have just been removed... 144 if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) { 145 // perhaps we should inform the caller that we are no longer valid to dispatch to? 146 return; 147 } 148 enqueueCounter++; 149 pending.addMessageLast(node); 150 } 151 dispatchPending(); 152 } 153 154 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 155 synchronized(pendingLock) { 156 try { 157 pending.reset(); 158 while (pending.hasNext()) { 159 MessageReference node = pending.next(); 160 node.decrementReferenceCount(); 161 if (node.getMessageId().equals(mdn.getMessageId())) { 162 // Synchronize between dispatched list and removal of messages from pending list 163 // related to remove subscription action 164 synchronized(dispatchLock) { 165 pending.remove(); 166 createMessageDispatch(node, node.getMessage()); 167 dispatched.add(node); 168 onDispatch(node, node.getMessage()); 169 } 170 return; 171 } 172 } 173 } finally { 174 pending.release(); 175 } 176 } 177 throw new JMSException( 178 "Slave broker out of sync with master: Dispatched message (" 179 + mdn.getMessageId() + ") was not in the pending list for " 180 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 181 } 182 183 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 184 // Handle the standard acknowledgment case. 185 boolean callDispatchMatched = false; 186 Destination destination = null; 187 188 if (!isSlave()) { 189 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 190 // suppress unexpected ack exception in this expected case 191 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack); 192 return; 193 } 194 } 195 if (LOG.isTraceEnabled()) { 196 LOG.trace("ack:" + ack); 197 } 198 synchronized(dispatchLock) { 199 if (ack.isStandardAck()) { 200 // First check if the ack matches the dispatched. When using failover this might 201 // not be the case. We don't ever want to ack the wrong messages. 202 assertAckMatchesDispatched(ack); 203 204 // Acknowledge all dispatched messages up till the message id of 205 // the acknowledgment. 206 int index = 0; 207 boolean inAckRange = false; 208 List<MessageReference> removeList = new ArrayList<MessageReference>(); 209 for (final MessageReference node : dispatched) { 210 MessageId messageId = node.getMessageId(); 211 if (ack.getFirstMessageId() == null 212 || ack.getFirstMessageId().equals(messageId)) { 213 inAckRange = true; 214 } 215 if (inAckRange) { 216 // Don't remove the nodes until we are committed. 217 if (!context.isInTransaction()) { 218 dequeueCounter++; 219 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 220 removeList.add(node); 221 } else { 222 registerRemoveSync(context, node); 223 } 224 index++; 225 acknowledge(context, ack, node); 226 if (ack.getLastMessageId().equals(messageId)) { 227 // contract prefetch if dispatch required a pull 228 if (getPrefetchSize() == 0) { 229 // Protect extension update against parallel updates. 230 while (true) { 231 int currentExtension = prefetchExtension.get(); 232 int newExtension = Math.max(0, currentExtension - index); 233 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 234 break; 235 } 236 } 237 } else if (usePrefetchExtension && context.isInTransaction()) { 238 // extend prefetch window only if not a pulling consumer 239 while (true) { 240 int currentExtension = prefetchExtension.get(); 241 int newExtension = Math.max(currentExtension, index); 242 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 243 break; 244 } 245 } 246 } 247 destination = node.getRegionDestination(); 248 callDispatchMatched = true; 249 break; 250 } 251 } 252 } 253 for (final MessageReference node : removeList) { 254 dispatched.remove(node); 255 } 256 // this only happens after a reconnect - get an ack which is not 257 // valid 258 if (!callDispatchMatched) { 259 LOG.warn("Could not correlate acknowledgment with dispatched message: " 260 + ack); 261 } 262 } else if (ack.isIndividualAck()) { 263 // Message was delivered and acknowledge - but only delete the 264 // individual message 265 for (final MessageReference node : dispatched) { 266 MessageId messageId = node.getMessageId(); 267 if (ack.getLastMessageId().equals(messageId)) { 268 // Don't remove the nodes until we are committed - immediateAck option 269 if (!context.isInTransaction()) { 270 dequeueCounter++; 271 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 272 dispatched.remove(node); 273 } else { 274 registerRemoveSync(context, node); 275 } 276 277 // Protect extension update against parallel updates. 278 while (true) { 279 int currentExtension = prefetchExtension.get(); 280 int newExtension = Math.max(0, currentExtension - 1); 281 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 282 break; 283 } 284 } 285 acknowledge(context, ack, node); 286 destination = node.getRegionDestination(); 287 callDispatchMatched = true; 288 break; 289 } 290 } 291 }else if (ack.isDeliveredAck()) { 292 // Message was delivered but not acknowledged: update pre-fetch 293 // counters. 294 int index = 0; 295 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 296 final MessageReference node = iter.next(); 297 if (node.isExpired()) { 298 if (broker.isExpired(node)) { 299 node.getRegionDestination().messageExpired(context, this, node); 300 } 301 iter.remove(); 302 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 303 } 304 if (ack.getLastMessageId().equals(node.getMessageId())) { 305 if (usePrefetchExtension) { 306 while (true) { 307 int currentExtension = prefetchExtension.get(); 308 int newExtension = Math.max(currentExtension, index + 1); 309 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 310 break; 311 } 312 } 313 } 314 destination = node.getRegionDestination(); 315 callDispatchMatched = true; 316 break; 317 } 318 } 319 if (!callDispatchMatched) { 320 throw new JMSException( 321 "Could not correlate acknowledgment with dispatched message: " 322 + ack); 323 } 324 } else if (ack.isRedeliveredAck()) { 325 // Message was re-delivered but it was not yet considered to be 326 // a DLQ message. 327 boolean inAckRange = false; 328 for (final MessageReference node : dispatched) { 329 MessageId messageId = node.getMessageId(); 330 if (ack.getFirstMessageId() == null 331 || ack.getFirstMessageId().equals(messageId)) { 332 inAckRange = true; 333 } 334 if (inAckRange) { 335 if (ack.getLastMessageId().equals(messageId)) { 336 destination = node.getRegionDestination(); 337 callDispatchMatched = true; 338 break; 339 } 340 } 341 } 342 if (!callDispatchMatched) { 343 throw new JMSException( 344 "Could not correlate acknowledgment with dispatched message: " 345 + ack); 346 } 347 } else if (ack.isPoisonAck()) { 348 // TODO: what if the message is already in a DLQ??? 349 // Handle the poison ACK case: we need to send the message to a 350 // DLQ 351 if (ack.isInTransaction()) { 352 throw new JMSException("Poison ack cannot be transacted: " 353 + ack); 354 } 355 int index = 0; 356 boolean inAckRange = false; 357 List<MessageReference> removeList = new ArrayList<MessageReference>(); 358 for (final MessageReference node : dispatched) { 359 MessageId messageId = node.getMessageId(); 360 if (ack.getFirstMessageId() == null 361 || ack.getFirstMessageId().equals(messageId)) { 362 inAckRange = true; 363 } 364 if (inAckRange) { 365 if (ack.getPoisonCause() != null) { 366 node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, 367 ack.getPoisonCause().toString()); 368 } 369 sendToDLQ(context, node); 370 node.getRegionDestination().getDestinationStatistics() 371 .getInflight().decrement(); 372 removeList.add(node); 373 dequeueCounter++; 374 index++; 375 acknowledge(context, ack, node); 376 if (ack.getLastMessageId().equals(messageId)) { 377 while (true) { 378 int currentExtension = prefetchExtension.get(); 379 int newExtension = Math.max(0, currentExtension - (index + 1)); 380 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 381 break; 382 } 383 } 384 destination = node.getRegionDestination(); 385 callDispatchMatched = true; 386 break; 387 } 388 } 389 } 390 for (final MessageReference node : removeList) { 391 dispatched.remove(node); 392 } 393 if (!callDispatchMatched) { 394 throw new JMSException( 395 "Could not correlate acknowledgment with dispatched message: " 396 + ack); 397 } 398 } 399 } 400 if (callDispatchMatched && destination != null) { 401 destination.wakeup(); 402 dispatchPending(); 403 } else { 404 if (isSlave()) { 405 throw new JMSException( 406 "Slave broker out of sync with master: Acknowledgment (" 407 + ack + ") was not in the dispatch list: " 408 + dispatched); 409 } else { 410 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " 411 + ack); 412 } 413 } 414 } 415 416 private void registerRemoveSync(ConnectionContext context, final MessageReference node) { 417 // setup a Synchronization to remove nodes from the 418 // dispatched list. 419 context.getTransaction().addSynchronization( 420 new Synchronization() { 421 422 @Override 423 public void afterCommit() 424 throws Exception { 425 synchronized(dispatchLock) { 426 dequeueCounter++; 427 dispatched.remove(node); 428 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 429 } 430 } 431 432 @Override 433 public void afterRollback() throws Exception { 434 synchronized(dispatchLock) { 435 if (isSlave()) { 436 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 437 } else { 438 // poisionAck will decrement - otherwise still inflight on client 439 } 440 } 441 } 442 }); 443 } 444 445 /** 446 * Checks an ack versus the contents of the dispatched list. 447 * called with dispatchLock held 448 * @param ack 449 * @throws JMSException if it does not match 450 */ 451 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 452 MessageId firstAckedMsg = ack.getFirstMessageId(); 453 MessageId lastAckedMsg = ack.getLastMessageId(); 454 int checkCount = 0; 455 boolean checkFoundStart = false; 456 boolean checkFoundEnd = false; 457 for (MessageReference node : dispatched) { 458 459 if (firstAckedMsg == null) { 460 checkFoundStart = true; 461 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 462 checkFoundStart = true; 463 } 464 465 if (checkFoundStart) { 466 checkCount++; 467 } 468 469 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 470 checkFoundEnd = true; 471 break; 472 } 473 } 474 if (!checkFoundStart && firstAckedMsg != null) 475 throw new JMSException("Unmatched acknowledge: " + ack 476 + "; Could not find Message-ID " + firstAckedMsg 477 + " in dispatched-list (start of ack)"); 478 if (!checkFoundEnd && lastAckedMsg != null) 479 throw new JMSException("Unmatched acknowledge: " + ack 480 + "; Could not find Message-ID " + lastAckedMsg 481 + " in dispatched-list (end of ack)"); 482 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 483 throw new JMSException("Unmatched acknowledge: " + ack 484 + "; Expected message count (" + ack.getMessageCount() 485 + ") differs from count in dispatched-list (" + checkCount 486 + ")"); 487 } 488 } 489 490 /** 491 * @param context 492 * @param node 493 * @throws IOException 494 * @throws Exception 495 */ 496 protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { 497 broker.getRoot().sendToDeadLetterQueue(context, node, this); 498 } 499 500 public int getInFlightSize() { 501 return dispatched.size(); 502 } 503 504 /** 505 * Used to determine if the broker can dispatch to the consumer. 506 * 507 * @return 508 */ 509 public boolean isFull() { 510 return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); 511 } 512 513 /** 514 * @return true when 60% or more room is left for dispatching messages 515 */ 516 public boolean isLowWaterMark() { 517 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 518 } 519 520 /** 521 * @return true when 10% or less room is left for dispatching messages 522 */ 523 public boolean isHighWaterMark() { 524 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 525 } 526 527 @Override 528 public int countBeforeFull() { 529 return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); 530 } 531 532 public int getPendingQueueSize() { 533 return pending.size(); 534 } 535 536 public int getDispatchedQueueSize() { 537 return dispatched.size(); 538 } 539 540 public long getDequeueCounter() { 541 return dequeueCounter; 542 } 543 544 public long getDispatchedCounter() { 545 return dispatchCounter; 546 } 547 548 public long getEnqueueCounter() { 549 return enqueueCounter; 550 } 551 552 @Override 553 public boolean isRecoveryRequired() { 554 return pending.isRecoveryRequired(); 555 } 556 557 public PendingMessageCursor getPending() { 558 return this.pending; 559 } 560 561 public void setPending(PendingMessageCursor pending) { 562 this.pending = pending; 563 if (this.pending!=null) { 564 this.pending.setSystemUsage(usageManager); 565 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 566 } 567 } 568 569 @Override 570 public void add(ConnectionContext context, Destination destination) throws Exception { 571 synchronized(pendingLock) { 572 super.add(context, destination); 573 pending.add(context, destination); 574 } 575 } 576 577 @Override 578 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 579 List<MessageReference> rc = new ArrayList<MessageReference>(); 580 synchronized(pendingLock) { 581 super.remove(context, destination); 582 // Here is a potential problem concerning Inflight stat: 583 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 584 // Except if each commit or rollback callback action comes before remove of subscriber. 585 rc.addAll(pending.remove(context, destination)); 586 587 // Synchronized to DispatchLock 588 synchronized(dispatchLock) { 589 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 590 for (MessageReference r : dispatched) { 591 if( r.getRegionDestination() == destination) { 592 references.add(r); 593 } 594 } 595 rc.addAll(references); 596 destination.getDestinationStatistics().getDispatched().subtract(references.size()); 597 destination.getDestinationStatistics().getInflight().subtract(references.size()); 598 dispatched.removeAll(references); 599 } 600 } 601 return rc; 602 } 603 604 protected void dispatchPending() throws IOException { 605 if (!isSlave()) { 606 synchronized(pendingLock) { 607 try { 608 int numberToDispatch = countBeforeFull(); 609 if (numberToDispatch > 0) { 610 setSlowConsumer(false); 611 setPendingBatchSize(pending, numberToDispatch); 612 int count = 0; 613 pending.reset(); 614 while (pending.hasNext() && !isFull() 615 && count < numberToDispatch) { 616 MessageReference node = pending.next(); 617 if (node == null) { 618 break; 619 } 620 621 // Synchronize between dispatched list and remove of message from pending list 622 // related to remove subscription action 623 synchronized(dispatchLock) { 624 pending.remove(); 625 node.decrementReferenceCount(); 626 if( !isDropped(node) && canDispatch(node)) { 627 628 // Message may have been sitting in the pending 629 // list a while waiting for the consumer to ak the message. 630 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 631 //increment number to dispatch 632 numberToDispatch++; 633 if (broker.isExpired(node)) { 634 node.getRegionDestination().messageExpired(context, this, node); 635 } 636 continue; 637 } 638 dispatch(node); 639 count++; 640 } 641 } 642 } 643 } else if (!isSlowConsumer()) { 644 setSlowConsumer(true); 645 for (Destination dest :destinations) { 646 dest.slowConsumer(context, this); 647 } 648 } 649 } finally { 650 pending.release(); 651 } 652 } 653 } 654 } 655 656 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 657 pending.setMaxBatchSize(numberToDispatch); 658 } 659 660 // called with dispatchLock held 661 protected boolean dispatch(final MessageReference node) throws IOException { 662 final Message message = node.getMessage(); 663 if (message == null) { 664 return false; 665 } 666 667 okForAckAsDispatchDone.countDown(); 668 669 // No reentrant lock - Patch needed to IndirectMessageReference on method lock 670 if (!isSlave()) { 671 672 MessageDispatch md = createMessageDispatch(node, message); 673 // NULL messages don't count... they don't get Acked. 674 if (node != QueueMessageReference.NULL_MESSAGE) { 675 dispatchCounter++; 676 dispatched.add(node); 677 } else { 678 while (true) { 679 int currentExtension = prefetchExtension.get(); 680 int newExtension = Math.max(0, currentExtension - 1); 681 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 682 break; 683 } 684 } 685 } 686 if (info.isDispatchAsync()) { 687 md.setTransmitCallback(new Runnable() { 688 689 public void run() { 690 // Since the message gets queued up in async dispatch, 691 // we don't want to 692 // decrease the reference count until it gets put on the 693 // wire. 694 onDispatch(node, message); 695 } 696 }); 697 context.getConnection().dispatchAsync(md); 698 } else { 699 context.getConnection().dispatchSync(md); 700 onDispatch(node, message); 701 } 702 return true; 703 } else { 704 return false; 705 } 706 } 707 708 protected void onDispatch(final MessageReference node, final Message message) { 709 if (node.getRegionDestination() != null) { 710 if (node != QueueMessageReference.NULL_MESSAGE) { 711 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 712 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 713 if (LOG.isTraceEnabled()) { 714 LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - " 715 + message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size()); 716 } 717 } 718 } 719 720 if (info.isDispatchAsync()) { 721 try { 722 dispatchPending(); 723 } catch (IOException e) { 724 context.getConnection().serviceExceptionAsync(e); 725 } 726 } 727 } 728 729 /** 730 * inform the MessageConsumer on the client to change it's prefetch 731 * 732 * @param newPrefetch 733 */ 734 public void updateConsumerPrefetch(int newPrefetch) { 735 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 736 ConsumerControl cc = new ConsumerControl(); 737 cc.setConsumerId(info.getConsumerId()); 738 cc.setPrefetch(newPrefetch); 739 context.getConnection().dispatchAsync(cc); 740 } 741 } 742 743 /** 744 * @param node 745 * @param message 746 * @return MessageDispatch 747 */ 748 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 749 MessageDispatch md = new MessageDispatch(); 750 md.setConsumerId(info.getConsumerId()); 751 752 if (node == QueueMessageReference.NULL_MESSAGE) { 753 md.setMessage(null); 754 md.setDestination(null); 755 } else { 756 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 757 md.setMessage(message); 758 md.setRedeliveryCounter(node.getRedeliveryCounter()); 759 } 760 761 return md; 762 } 763 764 /** 765 * Use when a matched message is about to be dispatched to the client. 766 * 767 * @param node 768 * @return false if the message should not be dispatched to the client 769 * (another sub may have already dispatched it for example). 770 * @throws IOException 771 */ 772 protected abstract boolean canDispatch(MessageReference node) throws IOException; 773 774 protected abstract boolean isDropped(MessageReference node); 775 776 /** 777 * Used during acknowledgment to remove the message. 778 * 779 * @throws IOException 780 */ 781 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 782 783 784 public int getMaxProducersToAudit() { 785 return maxProducersToAudit; 786 } 787 788 public void setMaxProducersToAudit(int maxProducersToAudit) { 789 this.maxProducersToAudit = maxProducersToAudit; 790 } 791 792 public int getMaxAuditDepth() { 793 return maxAuditDepth; 794 } 795 796 public void setMaxAuditDepth(int maxAuditDepth) { 797 this.maxAuditDepth = maxAuditDepth; 798 } 799 800 public boolean isUsePrefetchExtension() { 801 return usePrefetchExtension; 802 } 803 804 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 805 this.usePrefetchExtension = usePrefetchExtension; 806 } 807 808 protected int getPrefetchExtension() { 809 return this.prefetchExtension.get(); 810 } 811}