001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.concurrent.CountDownLatch;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import javax.jms.InvalidSelectorException;
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.broker.Broker;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034    import org.apache.activemq.command.ActiveMQMessage;
035    import org.apache.activemq.command.ConsumerControl;
036    import org.apache.activemq.command.ConsumerInfo;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageDispatch;
040    import org.apache.activemq.command.MessageDispatchNotification;
041    import org.apache.activemq.command.MessageId;
042    import org.apache.activemq.command.MessagePull;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.thread.Scheduler;
045    import org.apache.activemq.transaction.Synchronization;
046    import org.apache.activemq.usage.SystemUsage;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * A subscription that honors the pre-fetch option of the ConsumerInfo.
052     */
053    public abstract class PrefetchSubscription extends AbstractSubscription {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056        protected final Scheduler scheduler;
057    
058        protected PendingMessageCursor pending;
059        protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060        protected final AtomicInteger prefetchExtension = new AtomicInteger();
061        protected boolean usePrefetchExtension = true;
062        protected long enqueueCounter;
063        protected long dispatchCounter;
064        protected long dequeueCounter;
065        private int maxProducersToAudit=32;
066        private int maxAuditDepth=2048;
067        protected final SystemUsage usageManager;
068        protected final Object pendingLock = new Object();
069        protected final Object dispatchLock = new Object();
070        private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071    
072        public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073            super(broker,context, info);
074            this.usageManager=usageManager;
075            pending = cursor;
076            this.scheduler = broker.getScheduler();
077        }
078    
079        public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080            this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081        }
082    
083        /**
084         * Allows a message to be pulled on demand by a client
085         */
086        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087            // The slave should not deliver pull messages.
088            // TODO: when the slave becomes a master, He should send a NULL message to all the
089            // consumers to 'wake them up' in case they were waiting for a message.
090            if (getPrefetchSize() == 0 && !isSlave()) {
091    
092                prefetchExtension.incrementAndGet();
093                final long dispatchCounterBeforePull = dispatchCounter;
094    
095                // Have the destination push us some messages.
096                for (Destination dest : destinations) {
097                    dest.iterate();
098                }
099                dispatchPending();
100    
101                synchronized(this) {
102                    // If there was nothing dispatched.. we may need to setup a timeout.
103                    if (dispatchCounterBeforePull == dispatchCounter) {
104                        // immediate timeout used by receiveNoWait()
105                        if (pull.getTimeout() == -1) {
106                            // Send a NULL message.
107                            add(QueueMessageReference.NULL_MESSAGE);
108                            dispatchPending();
109                        }
110                        if (pull.getTimeout() > 0) {
111                            scheduler.executeAfterDelay(new Runnable() {
112                                @Override
113                                public void run() {
114                                    pullTimeout(dispatchCounterBeforePull);
115                                }
116                            }, pull.getTimeout());
117                        }
118                    }
119                }
120            }
121            return null;
122        }
123    
124        /**
125         * Occurs when a pull times out. If nothing has been dispatched since the
126         * timeout was setup, then send the NULL message.
127         */
128        final void pullTimeout(long dispatchCounterBeforePull) {
129            synchronized (pendingLock) {
130                if (dispatchCounterBeforePull == dispatchCounter) {
131                    try {
132                        add(QueueMessageReference.NULL_MESSAGE);
133                        dispatchPending();
134                    } catch (Exception e) {
135                        context.getConnection().serviceException(e);
136                    }
137                }
138            }
139        }
140    
141        public void add(MessageReference node) throws Exception {
142            synchronized (pendingLock) {
143                // The destination may have just been removed...
144                if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
145                    // perhaps we should inform the caller that we are no longer valid to dispatch to?
146                    return;
147                }
148                enqueueCounter++;
149                pending.addMessageLast(node);
150            }
151            dispatchPending();
152        }
153    
154        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
155            synchronized(pendingLock) {
156                try {
157                    pending.reset();
158                    while (pending.hasNext()) {
159                        MessageReference node = pending.next();
160                        node.decrementReferenceCount();
161                        if (node.getMessageId().equals(mdn.getMessageId())) {
162                            // Synchronize between dispatched list and removal of messages from pending list
163                            // related to remove subscription action
164                            synchronized(dispatchLock) {
165                                pending.remove();
166                                createMessageDispatch(node, node.getMessage());
167                                dispatched.add(node);
168                                onDispatch(node, node.getMessage());
169                            }
170                            return;
171                        }
172                    }
173                } finally {
174                    pending.release();
175                }
176            }
177            throw new JMSException(
178                    "Slave broker out of sync with master: Dispatched message ("
179                            + mdn.getMessageId() + ") was not in the pending list for "
180                            + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
181        }
182    
183        public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
184            // Handle the standard acknowledgment case.
185            boolean callDispatchMatched = false;
186            Destination destination = null;
187    
188            if (!isSlave()) {
189                if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
190                    // suppress unexpected ack exception in this expected case
191                    LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
192                    return;
193                }
194            }
195            if (LOG.isTraceEnabled()) {
196                LOG.trace("ack:" + ack);
197            }
198            synchronized(dispatchLock) {
199                if (ack.isStandardAck()) {
200                    // First check if the ack matches the dispatched. When using failover this might
201                    // not be the case. We don't ever want to ack the wrong messages.
202                    assertAckMatchesDispatched(ack);
203    
204                    // Acknowledge all dispatched messages up till the message id of
205                    // the acknowledgment.
206                    int index = 0;
207                    boolean inAckRange = false;
208                    List<MessageReference> removeList = new ArrayList<MessageReference>();
209                    for (final MessageReference node : dispatched) {
210                        MessageId messageId = node.getMessageId();
211                        if (ack.getFirstMessageId() == null
212                                || ack.getFirstMessageId().equals(messageId)) {
213                            inAckRange = true;
214                        }
215                        if (inAckRange) {
216                            // Don't remove the nodes until we are committed.
217                            if (!context.isInTransaction()) {
218                                dequeueCounter++;
219                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
220                                removeList.add(node);
221                            } else {
222                                registerRemoveSync(context, node);
223                            }
224                            index++;
225                            acknowledge(context, ack, node);
226                            if (ack.getLastMessageId().equals(messageId)) {
227                                // contract prefetch if dispatch required a pull
228                                if (getPrefetchSize() == 0) {
229                                    // Protect extension update against parallel updates.
230                                    while (true) {
231                                        int currentExtension = prefetchExtension.get();
232                                        int newExtension = Math.max(0, currentExtension - index);
233                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
234                                            break;
235                                        }
236                                    }
237                                } else if (usePrefetchExtension && context.isInTransaction()) {
238                                    // extend prefetch window only if not a pulling consumer
239                                    while (true) {
240                                        int currentExtension = prefetchExtension.get();
241                                        int newExtension = Math.max(currentExtension, index);
242                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
243                                            break;
244                                        }
245                                    }
246                                }
247                                destination = node.getRegionDestination();
248                                callDispatchMatched = true;
249                                break;
250                            }
251                        }
252                    }
253                    for (final MessageReference node : removeList) {
254                        dispatched.remove(node);
255                    }
256                    // this only happens after a reconnect - get an ack which is not
257                    // valid
258                    if (!callDispatchMatched) {
259                        LOG.warn("Could not correlate acknowledgment with dispatched message: "
260                                      + ack);
261                    }
262                } else if (ack.isIndividualAck()) {
263                    // Message was delivered and acknowledge - but only delete the
264                    // individual message
265                    for (final MessageReference node : dispatched) {
266                        MessageId messageId = node.getMessageId();
267                        if (ack.getLastMessageId().equals(messageId)) {
268                            // Don't remove the nodes until we are committed - immediateAck option
269                            if (!context.isInTransaction()) {
270                                dequeueCounter++;
271                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
272                                dispatched.remove(node);
273                            } else {
274                                registerRemoveSync(context, node);
275                            }
276    
277                            // Protect extension update against parallel updates.
278                            while (true) {
279                                int currentExtension = prefetchExtension.get();
280                                int newExtension = Math.max(0, currentExtension - 1);
281                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
282                                    break;
283                                }
284                            }
285                            acknowledge(context, ack, node);
286                            destination = node.getRegionDestination();
287                            callDispatchMatched = true;
288                            break;
289                        }
290                    }
291                }else if (ack.isDeliveredAck()) {
292                    // Message was delivered but not acknowledged: update pre-fetch
293                    // counters.
294                    int index = 0;
295                    for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
296                        final MessageReference node = iter.next();
297                        if (node.isExpired()) {
298                            if (broker.isExpired(node)) {
299                                node.getRegionDestination().messageExpired(context, this, node);
300                            }
301                            iter.remove();
302                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
303                        }
304                        if (ack.getLastMessageId().equals(node.getMessageId())) {
305                            if (usePrefetchExtension) {
306                                while (true) {
307                                    int currentExtension = prefetchExtension.get();
308                                    int newExtension = Math.max(currentExtension, index + 1);
309                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
310                                        break;
311                                    }
312                                }
313                            }
314                            destination = node.getRegionDestination();
315                            callDispatchMatched = true;
316                            break;
317                        }
318                    }
319                    if (!callDispatchMatched) {
320                        throw new JMSException(
321                                "Could not correlate acknowledgment with dispatched message: "
322                                        + ack);
323                    }
324                } else if (ack.isRedeliveredAck()) {
325                    // Message was re-delivered but it was not yet considered to be
326                    // a DLQ message.
327                    boolean inAckRange = false;
328                    for (final MessageReference node : dispatched) {
329                        MessageId messageId = node.getMessageId();
330                        if (ack.getFirstMessageId() == null
331                                || ack.getFirstMessageId().equals(messageId)) {
332                            inAckRange = true;
333                        }
334                        if (inAckRange) {
335                            if (ack.getLastMessageId().equals(messageId)) {
336                                destination = node.getRegionDestination();
337                                callDispatchMatched = true;
338                                break;
339                            }
340                        }
341                    }
342                    if (!callDispatchMatched) {
343                        throw new JMSException(
344                                "Could not correlate acknowledgment with dispatched message: "
345                                        + ack);
346                    }
347                } else if (ack.isPoisonAck()) {
348                    // TODO: what if the message is already in a DLQ???
349                    // Handle the poison ACK case: we need to send the message to a
350                    // DLQ
351                    if (ack.isInTransaction()) {
352                        throw new JMSException("Poison ack cannot be transacted: "
353                                + ack);
354                    }
355                    int index = 0;
356                    boolean inAckRange = false;
357                    List<MessageReference> removeList = new ArrayList<MessageReference>();
358                    for (final MessageReference node : dispatched) {
359                        MessageId messageId = node.getMessageId();
360                        if (ack.getFirstMessageId() == null
361                                || ack.getFirstMessageId().equals(messageId)) {
362                            inAckRange = true;
363                        }
364                        if (inAckRange) {
365                            if (ack.getPoisonCause() != null) {
366                                node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
367                                        ack.getPoisonCause().toString());
368                            }
369                            sendToDLQ(context, node);
370                            node.getRegionDestination().getDestinationStatistics()
371                                    .getInflight().decrement();
372                            removeList.add(node);
373                            dequeueCounter++;
374                            index++;
375                            acknowledge(context, ack, node);
376                            if (ack.getLastMessageId().equals(messageId)) {
377                                while (true) {
378                                    int currentExtension = prefetchExtension.get();
379                                    int newExtension = Math.max(0, currentExtension - (index + 1));
380                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
381                                        break;
382                                    }
383                                }
384                                destination = node.getRegionDestination();
385                                callDispatchMatched = true;
386                                break;
387                            }
388                        }
389                    }
390                    for (final MessageReference node : removeList) {
391                        dispatched.remove(node);
392                    }
393                    if (!callDispatchMatched) {
394                        throw new JMSException(
395                                "Could not correlate acknowledgment with dispatched message: "
396                                        + ack);
397                    }
398                }
399            }
400            if (callDispatchMatched && destination != null) {
401                destination.wakeup();
402                dispatchPending();
403            } else {
404                if (isSlave()) {
405                    throw new JMSException(
406                            "Slave broker out of sync with master: Acknowledgment ("
407                                    + ack + ") was not in the dispatch list: "
408                                    + dispatched);
409                } else {
410                    LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
411                            + ack);
412                }
413            }
414        }
415    
416        private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
417            // setup a Synchronization to remove nodes from the
418            // dispatched list.
419            context.getTransaction().addSynchronization(
420                    new Synchronization() {
421    
422                        @Override
423                        public void afterCommit()
424                                throws Exception {
425                            synchronized(dispatchLock) {
426                                dequeueCounter++;
427                                dispatched.remove(node);
428                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
429                            }
430                        }
431    
432                        @Override
433                        public void afterRollback() throws Exception {
434                            synchronized(dispatchLock) {
435                                if (isSlave()) {
436                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
437                                } else {
438                                    // poisionAck will decrement - otherwise still inflight on client
439                                }
440                            }
441                        }
442                    });
443        }
444    
445        /**
446         * Checks an ack versus the contents of the dispatched list.
447         *  called with dispatchLock held
448         * @param ack
449         * @throws JMSException if it does not match
450         */
451        protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
452            MessageId firstAckedMsg = ack.getFirstMessageId();
453            MessageId lastAckedMsg = ack.getLastMessageId();
454            int checkCount = 0;
455            boolean checkFoundStart = false;
456            boolean checkFoundEnd = false;
457            for (MessageReference node : dispatched) {
458    
459                if (firstAckedMsg == null) {
460                    checkFoundStart = true;
461                } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
462                    checkFoundStart = true;
463                }
464    
465                if (checkFoundStart) {
466                    checkCount++;
467                }
468    
469                if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
470                    checkFoundEnd = true;
471                    break;
472                }
473            }
474            if (!checkFoundStart && firstAckedMsg != null)
475                throw new JMSException("Unmatched acknowledge: " + ack
476                        + "; Could not find Message-ID " + firstAckedMsg
477                        + " in dispatched-list (start of ack)");
478            if (!checkFoundEnd && lastAckedMsg != null)
479                throw new JMSException("Unmatched acknowledge: " + ack
480                        + "; Could not find Message-ID " + lastAckedMsg
481                        + " in dispatched-list (end of ack)");
482            if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
483                throw new JMSException("Unmatched acknowledge: " + ack
484                        + "; Expected message count (" + ack.getMessageCount()
485                        + ") differs from count in dispatched-list (" + checkCount
486                        + ")");
487            }
488        }
489    
490        /**
491         * @param context
492         * @param node
493         * @throws IOException
494         * @throws Exception
495         */
496        protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
497            broker.getRoot().sendToDeadLetterQueue(context, node, this);
498        }
499    
500        public int getInFlightSize() {
501            return dispatched.size();
502        }
503    
504        /**
505         * Used to determine if the broker can dispatch to the consumer.
506         *
507         * @return
508         */
509        public boolean isFull() {
510            return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
511        }
512    
513        /**
514         * @return true when 60% or more room is left for dispatching messages
515         */
516        public boolean isLowWaterMark() {
517            return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
518        }
519    
520        /**
521         * @return true when 10% or less room is left for dispatching messages
522         */
523        public boolean isHighWaterMark() {
524            return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
525        }
526    
527        @Override
528        public int countBeforeFull() {
529            return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
530        }
531    
532        public int getPendingQueueSize() {
533            return pending.size();
534        }
535    
536        public int getDispatchedQueueSize() {
537            return dispatched.size();
538        }
539    
540        public long getDequeueCounter() {
541            return dequeueCounter;
542        }
543    
544        public long getDispatchedCounter() {
545            return dispatchCounter;
546        }
547    
548        public long getEnqueueCounter() {
549            return enqueueCounter;
550        }
551    
552        @Override
553        public boolean isRecoveryRequired() {
554            return pending.isRecoveryRequired();
555        }
556    
557        public PendingMessageCursor getPending() {
558            return this.pending;
559        }
560    
561        public void setPending(PendingMessageCursor pending) {
562            this.pending = pending;
563            if (this.pending!=null) {
564                this.pending.setSystemUsage(usageManager);
565                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
566            }
567        }
568    
569       @Override
570        public void add(ConnectionContext context, Destination destination) throws Exception {
571            synchronized(pendingLock) {
572                super.add(context, destination);
573                pending.add(context, destination);
574            }
575        }
576    
577        @Override
578        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
579            List<MessageReference> rc = new ArrayList<MessageReference>();
580            synchronized(pendingLock) {
581                super.remove(context, destination);
582                // Here is a potential problem concerning Inflight stat:
583                // Messages not already committed or rolled back may not be removed from dispatched list at the moment
584                // Except if each commit or rollback callback action comes before remove of subscriber.
585                rc.addAll(pending.remove(context, destination));
586    
587                // Synchronized to DispatchLock
588                synchronized(dispatchLock) {
589                    ArrayList<MessageReference> references = new ArrayList<MessageReference>();
590                    for (MessageReference r : dispatched) {
591                        if( r.getRegionDestination() == destination) {
592                            references.add(r);
593                        }
594                    }
595                    rc.addAll(references);
596                    destination.getDestinationStatistics().getDispatched().subtract(references.size());
597                    destination.getDestinationStatistics().getInflight().subtract(references.size());
598                    dispatched.removeAll(references);
599                }
600            }
601            return rc;
602        }
603    
604        protected void dispatchPending() throws IOException {
605            if (!isSlave()) {
606               synchronized(pendingLock) {
607                    try {
608                        int numberToDispatch = countBeforeFull();
609                        if (numberToDispatch > 0) {
610                            setSlowConsumer(false);
611                            setPendingBatchSize(pending, numberToDispatch);
612                            int count = 0;
613                            pending.reset();
614                            while (pending.hasNext() && !isFull()
615                                    && count < numberToDispatch) {
616                                MessageReference node = pending.next();
617                                if (node == null) {
618                                    break;
619                                }
620    
621                                // Synchronize between dispatched list and remove of message from pending list
622                                // related to remove subscription action
623                                synchronized(dispatchLock) {
624                                    pending.remove();
625                                    node.decrementReferenceCount();
626                                    if( !isDropped(node) && canDispatch(node)) {
627    
628                                        // Message may have been sitting in the pending
629                                        // list a while waiting for the consumer to ak the message.
630                                        if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
631                                            //increment number to dispatch
632                                            numberToDispatch++;
633                                            if (broker.isExpired(node)) {
634                                                node.getRegionDestination().messageExpired(context, this, node);
635                                            }
636                                            continue;
637                                        }
638                                        dispatch(node);
639                                        count++;
640                                    }
641                                }
642                            }
643                        } else if (!isSlowConsumer()) {
644                            setSlowConsumer(true);
645                            for (Destination dest :destinations) {
646                                dest.slowConsumer(context, this);
647                            }
648                        }
649                    } finally {
650                        pending.release();
651                    }
652                }
653            }
654        }
655    
656        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
657            pending.setMaxBatchSize(numberToDispatch);
658        }
659    
660        // called with dispatchLock held
661        protected boolean dispatch(final MessageReference node) throws IOException {
662            final Message message = node.getMessage();
663            if (message == null) {
664                return false;
665            }
666    
667            okForAckAsDispatchDone.countDown();
668    
669            // No reentrant lock - Patch needed to IndirectMessageReference on method lock
670            if (!isSlave()) {
671    
672                MessageDispatch md = createMessageDispatch(node, message);
673                // NULL messages don't count... they don't get Acked.
674                if (node != QueueMessageReference.NULL_MESSAGE) {
675                    dispatchCounter++;
676                    dispatched.add(node);
677                } else {
678                    while (true) {
679                        int currentExtension = prefetchExtension.get();
680                        int newExtension = Math.max(0, currentExtension - 1);
681                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
682                            break;
683                        }
684                    }
685                }
686                if (info.isDispatchAsync()) {
687                    md.setTransmitCallback(new Runnable() {
688    
689                        public void run() {
690                            // Since the message gets queued up in async dispatch,
691                            // we don't want to
692                            // decrease the reference count until it gets put on the
693                            // wire.
694                            onDispatch(node, message);
695                        }
696                    });
697                    context.getConnection().dispatchAsync(md);
698                } else {
699                    context.getConnection().dispatchSync(md);
700                    onDispatch(node, message);
701                }
702                return true;
703            } else {
704                return false;
705            }
706        }
707    
708        protected void onDispatch(final MessageReference node, final Message message) {
709            if (node.getRegionDestination() != null) {
710                if (node != QueueMessageReference.NULL_MESSAGE) {
711                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
712                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();
713                    if (LOG.isTraceEnabled()) {
714                        LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
715                                + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
716                    }
717                }
718            }
719    
720            if (info.isDispatchAsync()) {
721                try {
722                    dispatchPending();
723                } catch (IOException e) {
724                    context.getConnection().serviceExceptionAsync(e);
725                }
726            }
727        }
728    
729        /**
730         * inform the MessageConsumer on the client to change it's prefetch
731         *
732         * @param newPrefetch
733         */
734        public void updateConsumerPrefetch(int newPrefetch) {
735            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
736                ConsumerControl cc = new ConsumerControl();
737                cc.setConsumerId(info.getConsumerId());
738                cc.setPrefetch(newPrefetch);
739                context.getConnection().dispatchAsync(cc);
740            }
741        }
742    
743        /**
744         * @param node
745         * @param message
746         * @return MessageDispatch
747         */
748        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
749            MessageDispatch md = new MessageDispatch();
750            md.setConsumerId(info.getConsumerId());
751    
752            if (node == QueueMessageReference.NULL_MESSAGE) {
753                md.setMessage(null);
754                md.setDestination(null);
755            } else {
756                md.setDestination(node.getRegionDestination().getActiveMQDestination());
757                md.setMessage(message);
758                md.setRedeliveryCounter(node.getRedeliveryCounter());
759            }
760    
761            return md;
762        }
763    
764        /**
765         * Use when a matched message is about to be dispatched to the client.
766         *
767         * @param node
768         * @return false if the message should not be dispatched to the client
769         *         (another sub may have already dispatched it for example).
770         * @throws IOException
771         */
772        protected abstract boolean canDispatch(MessageReference node) throws IOException;
773    
774        protected abstract boolean isDropped(MessageReference node);
775    
776        /**
777         * Used during acknowledgment to remove the message.
778         *
779         * @throws IOException
780         */
781        protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
782    
783    
784        public int getMaxProducersToAudit() {
785            return maxProducersToAudit;
786        }
787    
788        public void setMaxProducersToAudit(int maxProducersToAudit) {
789            this.maxProducersToAudit = maxProducersToAudit;
790        }
791    
792        public int getMaxAuditDepth() {
793            return maxAuditDepth;
794        }
795    
796        public void setMaxAuditDepth(int maxAuditDepth) {
797            this.maxAuditDepth = maxAuditDepth;
798        }
799    
800        public boolean isUsePrefetchExtension() {
801            return usePrefetchExtension;
802        }
803    
804        public void setUsePrefetchExtension(boolean usePrefetchExtension) {
805            this.usePrefetchExtension = usePrefetchExtension;
806        }
807    
808        protected int getPrefetchExtension() {
809            return this.prefetchExtension.get();
810        }
811    }