001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Comparator;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedHashMap;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    import java.util.concurrent.CancellationException;
032    import java.util.concurrent.ConcurrentLinkedQueue;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.DelayQueue;
035    import java.util.concurrent.Delayed;
036    import java.util.concurrent.ExecutorService;
037    import java.util.concurrent.Future;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.atomic.AtomicLong;
040    import java.util.concurrent.locks.Lock;
041    import java.util.concurrent.locks.ReentrantLock;
042    import java.util.concurrent.locks.ReentrantReadWriteLock;
043    
044    import javax.jms.InvalidSelectorException;
045    import javax.jms.JMSException;
046    import javax.jms.ResourceAllocationException;
047    
048    import org.apache.activemq.broker.BrokerService;
049    import org.apache.activemq.broker.ConnectionContext;
050    import org.apache.activemq.broker.ProducerBrokerExchange;
051    import org.apache.activemq.broker.region.cursors.OrderedPendingList;
052    import org.apache.activemq.broker.region.cursors.PendingList;
053    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
054    import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
055    import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
056    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
057    import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
058    import org.apache.activemq.broker.region.group.MessageGroupMap;
059    import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
060    import org.apache.activemq.broker.region.policy.DispatchPolicy;
061    import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
062    import org.apache.activemq.broker.util.InsertionCountList;
063    import org.apache.activemq.command.ActiveMQDestination;
064    import org.apache.activemq.command.ActiveMQMessage;
065    import org.apache.activemq.command.ConsumerId;
066    import org.apache.activemq.command.ExceptionResponse;
067    import org.apache.activemq.command.Message;
068    import org.apache.activemq.command.MessageAck;
069    import org.apache.activemq.command.MessageDispatchNotification;
070    import org.apache.activemq.command.MessageId;
071    import org.apache.activemq.command.ProducerAck;
072    import org.apache.activemq.command.ProducerInfo;
073    import org.apache.activemq.command.Response;
074    import org.apache.activemq.filter.BooleanExpression;
075    import org.apache.activemq.filter.MessageEvaluationContext;
076    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
077    import org.apache.activemq.selector.SelectorParser;
078    import org.apache.activemq.state.ProducerState;
079    import org.apache.activemq.store.MessageRecoveryListener;
080    import org.apache.activemq.store.MessageStore;
081    import org.apache.activemq.thread.Task;
082    import org.apache.activemq.thread.TaskRunner;
083    import org.apache.activemq.thread.TaskRunnerFactory;
084    import org.apache.activemq.transaction.Synchronization;
085    import org.apache.activemq.usage.Usage;
086    import org.apache.activemq.usage.UsageListener;
087    import org.apache.activemq.util.BrokerSupport;
088    import org.slf4j.Logger;
089    import org.slf4j.LoggerFactory;
090    import org.slf4j.MDC;
091    
092    /**
093     * The Queue is a List of MessageEntry objects that are dispatched to matching
094     * subscriptions.
095     */
096    public class Queue extends BaseDestination implements Task, UsageListener {
097        protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
098        protected final TaskRunnerFactory taskFactory;
099        protected TaskRunner taskRunner;
100        private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
101        protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
102        private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
103        protected PendingMessageCursor messages;
104        private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
105        private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
106        // Messages that are paged in but have not yet been targeted at a
107        // subscription
108        private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
109        protected PendingList pagedInPendingDispatch = new OrderedPendingList();
110        protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
111        private MessageGroupMap messageGroupOwners;
112        private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
113        private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
114        final Lock sendLock = new ReentrantLock();
115        private ExecutorService executor;
116        private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
117        private boolean useConsumerPriority = true;
118        private boolean strictOrderDispatch = false;
119        private final QueueDispatchSelector dispatchSelector;
120        private boolean optimizedDispatch = false;
121        private boolean iterationRunning = false;
122        private boolean firstConsumer = false;
123        private int timeBeforeDispatchStarts = 0;
124        private int consumersBeforeDispatchStarts = 0;
125        private CountDownLatch consumersBeforeStartsLatch;
126        private final AtomicLong pendingWakeups = new AtomicLong();
127        private boolean allConsumersExclusiveByDefault = false;
128    
129        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
130            public void run() {
131                asyncWakeup();
132            }
133        };
134        private final Runnable expireMessagesTask = new Runnable() {
135            public void run() {
136                expireMessages();
137            }
138        };
139    
140        private final Object iteratingMutex = new Object();
141    
142        class TimeoutMessage implements Delayed {
143    
144            Message message;
145            ConnectionContext context;
146            long trigger;
147    
148            public TimeoutMessage(Message message, ConnectionContext context, long delay) {
149                this.message = message;
150                this.context = context;
151                this.trigger = System.currentTimeMillis() + delay;
152            }
153    
154            public long getDelay(TimeUnit unit) {
155                long n = trigger - System.currentTimeMillis();
156                return unit.convert(n, TimeUnit.MILLISECONDS);
157            }
158    
159            public int compareTo(Delayed delayed) {
160                long other = ((TimeoutMessage) delayed).trigger;
161                int returnValue;
162                if (this.trigger < other) {
163                    returnValue = -1;
164                } else if (this.trigger > other) {
165                    returnValue = 1;
166                } else {
167                    returnValue = 0;
168                }
169                return returnValue;
170            }
171    
172        }
173    
174        DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
175    
176        class FlowControlTimeoutTask extends Thread {
177    
178            @Override
179            public void run() {
180                TimeoutMessage timeout;
181                try {
182                    while (true) {
183                        timeout = flowControlTimeoutMessages.take();
184                        if (timeout != null) {
185                            synchronized (messagesWaitingForSpace) {
186                                if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
187                                    ExceptionResponse response = new ExceptionResponse(
188                                            new ResourceAllocationException(
189                                                    "Usage Manager Memory Limit reached. Stopping producer ("
190                                                            + timeout.message.getProducerId()
191                                                            + ") to prevent flooding "
192                                                            + getActiveMQDestination().getQualifiedName()
193                                                            + "."
194                                                            + " See http://activemq.apache.org/producer-flow-control.html for more info"));
195                                    response.setCorrelationId(timeout.message.getCommandId());
196                                    timeout.context.getConnection().dispatchAsync(response);
197                                }
198                            }
199                        }
200                    }
201                } catch (InterruptedException e) {
202                    if (LOG.isDebugEnabled()) {
203                        LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
204                    }
205                }
206            }
207        };
208    
209        private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
210    
211        private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
212    
213            public int compare(Subscription s1, Subscription s2) {
214                // We want the list sorted in descending order
215                return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
216            }
217        };
218    
219        public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
220                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
221            super(brokerService, store, destination, parentStats);
222            this.taskFactory = taskFactory;
223            this.dispatchSelector = new QueueDispatchSelector(destination);
224        }
225    
226        public List<Subscription> getConsumers() {
227            consumersLock.readLock().lock();
228            try {
229                return new ArrayList<Subscription>(consumers);
230            }finally {
231                consumersLock.readLock().unlock();
232            }
233        }
234    
235        // make the queue easily visible in the debugger from its task runner
236        // threads
237        final class QueueThread extends Thread {
238            final Queue queue;
239    
240            public QueueThread(Runnable runnable, String name, Queue queue) {
241                super(runnable, name);
242                this.queue = queue;
243            }
244        }
245    
246        class BatchMessageRecoveryListener implements MessageRecoveryListener {
247            final LinkedList<Message> toExpire = new LinkedList<Message>();
248            final double totalMessageCount;
249            int recoveredAccumulator = 0;
250            int currentBatchCount;
251    
252            BatchMessageRecoveryListener(int totalMessageCount) {
253                this.totalMessageCount = totalMessageCount;
254                currentBatchCount = recoveredAccumulator;
255            }
256    
257            public boolean recoverMessage(Message message) {
258                recoveredAccumulator++;
259                if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
260                    LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
261                            + recoveredAccumulator + " messages. " +
262                            (int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete");
263                }
264                // Message could have expired while it was being
265                // loaded..
266                if (message.isExpired() && broker.isExpired(message)) {
267                    toExpire.add(message);
268                    return true;
269                }
270                if (hasSpace()) {
271                    message.setRegionDestination(Queue.this);
272                    messagesLock.writeLock().lock();
273                    try {
274                        try {
275                            messages.addMessageLast(message);
276                        } catch (Exception e) {
277                            LOG.error("Failed to add message to cursor", e);
278                        }
279                    } finally {
280                        messagesLock.writeLock().unlock();
281                    }
282                    destinationStatistics.getMessages().increment();
283                    return true;
284                }
285                return false;
286            }
287    
288            public boolean recoverMessageReference(MessageId messageReference) throws Exception {
289                throw new RuntimeException("Should not be called.");
290            }
291    
292            public boolean hasSpace() {
293                return true;
294            }
295    
296            public boolean isDuplicate(MessageId id) {
297                return false;
298            }
299    
300            public void reset() {
301                currentBatchCount = recoveredAccumulator;
302            }
303    
304            public void processExpired() {
305                for (Message message: toExpire) {
306                    messageExpired(createConnectionContext(), createMessageReference(message));
307                    // drop message will decrement so counter
308                    // balance here
309                    destinationStatistics.getMessages().increment();
310                }
311                toExpire.clear();
312            }
313    
314            public boolean done() {
315                return currentBatchCount == recoveredAccumulator;
316            }
317        }
318    
319        @Override
320        public void setPrioritizedMessages(boolean prioritizedMessages) {
321            super.setPrioritizedMessages(prioritizedMessages);
322    
323            if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
324                pagedInPendingDispatch = new PrioritizedPendingList();
325                redeliveredWaitingDispatch = new PrioritizedPendingList();
326            } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
327                pagedInPendingDispatch = new OrderedPendingList();
328                redeliveredWaitingDispatch = new OrderedPendingList();
329            }
330        }
331    
332        @Override
333        public void initialize() throws Exception {
334    
335            if (this.messages == null) {
336                if (destination.isTemporary() || broker == null || store == null) {
337                    this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
338                } else {
339                    this.messages = new StoreQueueCursor(broker, this);
340                }
341            }
342    
343            // If a VMPendingMessageCursor don't use the default Producer System
344            // Usage
345            // since it turns into a shared blocking queue which can lead to a
346            // network deadlock.
347            // If we are cursoring to disk..it's not and issue because it does not
348            // block due
349            // to large disk sizes.
350            if (messages instanceof VMPendingMessageCursor) {
351                this.systemUsage = brokerService.getSystemUsage();
352                memoryUsage.setParent(systemUsage.getMemoryUsage());
353            }
354    
355            this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
356    
357            super.initialize();
358            if (store != null) {
359                // Restore the persistent messages.
360                messages.setSystemUsage(systemUsage);
361                messages.setEnableAudit(isEnableAudit());
362                messages.setMaxAuditDepth(getMaxAuditDepth());
363                messages.setMaxProducersToAudit(getMaxProducersToAudit());
364                messages.setUseCache(isUseCache());
365                messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
366                final int messageCount = store.getMessageCount();
367                if (messageCount > 0 && messages.isRecoveryRequired()) {
368                    BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
369                    do {
370                       listener.reset();
371                       store.recoverNextMessages(getMaxPageSize(), listener);
372                       listener.processExpired();
373                   } while (!listener.done());
374                } else {
375                    destinationStatistics.getMessages().setCount(messageCount);
376                }
377            }
378        }
379    
380        /*
381         * Holder for subscription that needs attention on next iterate browser
382         * needs access to existing messages in the queue that have already been
383         * dispatched
384         */
385        class BrowserDispatch {
386            QueueBrowserSubscription browser;
387    
388            public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
389                browser = browserSubscription;
390                browser.incrementQueueRef();
391            }
392    
393            void done() {
394                try {
395                    browser.decrementQueueRef();
396                } catch (Exception e) {
397                    LOG.warn("decrement ref on browser: " + browser, e);
398                }
399            }
400    
401            public QueueBrowserSubscription getBrowser() {
402                return browser;
403            }
404        }
405    
406        ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
407    
408        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
409            if (LOG.isDebugEnabled()) {
410                LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
411                        + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
412                        + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
413                        + getDestinationStatistics().getInflight().getCount());
414            }
415    
416            super.addSubscription(context, sub);
417            // synchronize with dispatch method so that no new messages are sent
418            // while setting up a subscription. avoid out of order messages,
419            // duplicates, etc.
420            pagedInPendingDispatchLock.writeLock().lock();
421            try {
422    
423                sub.add(context, this);
424    
425                // needs to be synchronized - so no contention with dispatching
426               // consumersLock.
427                consumersLock.writeLock().lock();
428                try {
429    
430                    // set a flag if this is a first consumer
431                    if (consumers.size() == 0) {
432                        firstConsumer = true;
433                        if (consumersBeforeDispatchStarts != 0) {
434                            consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
435                        }
436                    } else {
437                        if (consumersBeforeStartsLatch != null) {
438                            consumersBeforeStartsLatch.countDown();
439                        }
440                    }
441    
442                    addToConsumerList(sub);
443                    if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
444                        Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
445                        if (exclusiveConsumer == null) {
446                            exclusiveConsumer = sub;
447                        } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
448                            sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
449                            exclusiveConsumer = sub;
450                        }
451                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
452                    }
453                }finally {
454                    consumersLock.writeLock().unlock();
455                }
456    
457                if (sub instanceof QueueBrowserSubscription) {
458                    // tee up for dispatch in next iterate
459                    QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
460                    BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
461                    browserDispatches.add(browserDispatch);
462                }
463    
464                if (!(this.optimizedDispatch || isSlave())) {
465                    wakeup();
466                }
467            }finally {
468                pagedInPendingDispatchLock.writeLock().unlock();
469            }
470            if (this.optimizedDispatch || isSlave()) {
471                // Outside of dispatchLock() to maintain the lock hierarchy of
472                // iteratingMutex -> dispatchLock. - see
473                // https://issues.apache.org/activemq/browse/AMQ-1878
474                wakeup();
475            }
476        }
477    
478        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
479                throws Exception {
480            super.removeSubscription(context, sub, lastDeiveredSequenceId);
481            // synchronize with dispatch method so that no new messages are sent
482            // while removing up a subscription.
483            pagedInPendingDispatchLock.writeLock().lock();
484            try {
485                if (LOG.isDebugEnabled()) {
486                    LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
487                            + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
488                            + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
489                            + getDestinationStatistics().getInflight().getCount());
490                }
491                consumersLock.writeLock().lock();
492                try {
493                    removeFromConsumerList(sub);
494                    if (sub.getConsumerInfo().isExclusive()) {
495                        Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
496                        if (exclusiveConsumer == sub) {
497                            exclusiveConsumer = null;
498                            for (Subscription s : consumers) {
499                                if (s.getConsumerInfo().isExclusive()
500                                        && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
501                                                .getConsumerInfo().getPriority())) {
502                                    exclusiveConsumer = s;
503    
504                                }
505                            }
506                            dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
507                        }
508                    } else if (isAllConsumersExclusiveByDefault()) {
509                        Subscription exclusiveConsumer = null;
510                        for (Subscription s : consumers) {
511                            if (exclusiveConsumer == null
512                                    || s.getConsumerInfo().getPriority() > exclusiveConsumer
513                                    .getConsumerInfo().getPriority()) {
514                                exclusiveConsumer = s;
515                                    }
516                        }
517                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
518                    }
519                    ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
520                    getMessageGroupOwners().removeConsumer(consumerId);
521    
522                    // redeliver inflight messages
523    
524                    boolean markAsRedelivered = false;
525                    MessageReference lastDeliveredRef = null;
526                    List<MessageReference> unAckedMessages = sub.remove(context, this);
527    
528                    // locate last redelivered in unconsumed list (list in delivery rather than seq order)
529                    if (lastDeiveredSequenceId != 0) {
530                        for (MessageReference ref : unAckedMessages) {
531                            if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
532                                lastDeliveredRef = ref;
533                                markAsRedelivered = true;
534                                if (LOG.isDebugEnabled()) {
535                                    LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
536                                }
537                                break;
538                            }
539                        }
540                    }
541                    for (MessageReference ref : unAckedMessages) {
542                        QueueMessageReference qmr = (QueueMessageReference) ref;
543                        if (qmr.getLockOwner() == sub) {
544                            qmr.unlock();
545    
546                            // have no delivery information
547                            if (lastDeiveredSequenceId == 0) {
548                                qmr.incrementRedeliveryCounter();
549                            } else {
550                                if (markAsRedelivered) {
551                                    qmr.incrementRedeliveryCounter();
552                                }
553                                if (ref == lastDeliveredRef) {
554                                    // all that follow were not redelivered
555                                    markAsRedelivered = false;
556                                }
557                            }
558                        }
559                        redeliveredWaitingDispatch.addMessageLast(qmr);
560                    }
561                    if (!redeliveredWaitingDispatch.isEmpty()) {
562                        doDispatch(new OrderedPendingList());
563                    }
564                }finally {
565                    consumersLock.writeLock().unlock();
566                }
567                if (!(this.optimizedDispatch || isSlave())) {
568                    wakeup();
569                }
570            }finally {
571                pagedInPendingDispatchLock.writeLock().unlock();
572            }
573            if (this.optimizedDispatch || isSlave()) {
574                // Outside of dispatchLock() to maintain the lock hierarchy of
575                // iteratingMutex -> dispatchLock. - see
576                // https://issues.apache.org/activemq/browse/AMQ-1878
577                wakeup();
578            }
579        }
580    
581        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
582            final ConnectionContext context = producerExchange.getConnectionContext();
583            // There is delay between the client sending it and it arriving at the
584            // destination.. it may have expired.
585            message.setRegionDestination(this);
586            ProducerState state = producerExchange.getProducerState();
587            if (state == null) {
588                LOG.warn("Send failed for: " + message + ",  missing producer state for: " + producerExchange);
589                throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
590            }
591            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
592            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
593                    && !context.isInRecoveryMode();
594            if (message.isExpired()) {
595                // message not stored - or added to stats yet - so chuck here
596                broker.getRoot().messageExpired(context, message, null);
597                if (sendProducerAck) {
598                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
599                    context.getConnection().dispatchAsync(ack);
600                }
601                return;
602            }
603            if (memoryUsage.isFull()) {
604                isFull(context, memoryUsage);
605                fastProducer(context, producerInfo);
606                if (isProducerFlowControl() && context.isProducerFlowControl()) {
607                    if (warnOnProducerFlowControl) {
608                        warnOnProducerFlowControl = false;
609                        LOG
610                                .info("Usage Manager Memory Limit ("
611                                        + memoryUsage.getLimit()
612                                        + ") reached on "
613                                        + getActiveMQDestination().getQualifiedName()
614                                        + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
615                                        + " See http://activemq.apache.org/producer-flow-control.html for more info");
616                    }
617    
618                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
619                        throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
620                                + message.getProducerId() + ") to prevent flooding "
621                                + getActiveMQDestination().getQualifiedName() + "."
622                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
623                    }
624    
625                    // We can avoid blocking due to low usage if the producer is
626                    // sending
627                    // a sync message or if it is using a producer window
628                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
629                        // copy the exchange state since the context will be
630                        // modified while we are waiting
631                        // for space.
632                        final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
633                        synchronized (messagesWaitingForSpace) {
634                         // Start flow control timeout task
635                            // Prevent trying to start it multiple times
636                            if (!flowControlTimeoutTask.isAlive()) {
637                                flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
638                                flowControlTimeoutTask.start();
639                            }
640                            messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
641                                public void run() {
642    
643                                    try {
644                                        // While waiting for space to free up... the
645                                        // message may have expired.
646                                        if (message.isExpired()) {
647                                            LOG.error("expired waiting for space..");
648                                            broker.messageExpired(context, message, null);
649                                            destinationStatistics.getExpired().increment();
650                                        } else {
651                                            doMessageSend(producerExchangeCopy, message);
652                                        }
653    
654                                        if (sendProducerAck) {
655                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
656                                                    .getSize());
657                                            context.getConnection().dispatchAsync(ack);
658                                        } else {
659                                            Response response = new Response();
660                                            response.setCorrelationId(message.getCommandId());
661                                            context.getConnection().dispatchAsync(response);
662                                        }
663    
664                                    } catch (Exception e) {
665                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
666                                            ExceptionResponse response = new ExceptionResponse(e);
667                                            response.setCorrelationId(message.getCommandId());
668                                            context.getConnection().dispatchAsync(response);
669                                        } else {
670                                            LOG.debug("unexpected exception on deferred send of :" + message, e);
671                                        }
672                                    }
673                                }
674                            });
675    
676                            if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
677                                flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
678                                        .getSendFailIfNoSpaceAfterTimeout()));
679                            }
680    
681                            registerCallbackForNotFullNotification();
682                            context.setDontSendReponse(true);
683                            return;
684                        }
685    
686                    } else {
687    
688                        if (memoryUsage.isFull()) {
689                            waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
690                                    + message.getProducerId() + ") stopped to prevent flooding "
691                                    + getActiveMQDestination().getQualifiedName() + "."
692                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
693                        }
694    
695                        // The usage manager could have delayed us by the time
696                        // we unblock the message could have expired..
697                        if (message.isExpired()) {
698                            if (LOG.isDebugEnabled()) {
699                                LOG.debug("Expired message: " + message);
700                            }
701                            broker.getRoot().messageExpired(context, message, null);
702                            return;
703                        }
704                    }
705                }
706            }
707            doMessageSend(producerExchange, message);
708            if (sendProducerAck) {
709                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
710                context.getConnection().dispatchAsync(ack);
711            }
712        }
713    
714        private void registerCallbackForNotFullNotification() {
715            // If the usage manager is not full, then the task will not
716            // get called..
717            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
718                // so call it directly here.
719                sendMessagesWaitingForSpaceTask.run();
720            }
721        }
722    
723        void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
724                Exception {
725            final ConnectionContext context = producerExchange.getConnectionContext();
726            Future<Object> result = null;
727    
728            checkUsage(context, message);
729            sendLock.lockInterruptibly();
730            try {
731                if (store != null && message.isPersistent()) {
732                    message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
733                    if (messages.isCacheEnabled()) {
734                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
735                    } else {
736                        store.addMessage(context, message);
737                    }
738                    if (isReduceMemoryFootprint()) {
739                        message.clearMarshalledState();
740                    }
741                }
742                if (context.isInTransaction()) {
743                    // If this is a transacted message.. increase the usage now so that
744                    // a big TX does not blow up
745                    // our memory. This increment is decremented once the tx finishes..
746                    message.incrementReferenceCount();
747    
748                    context.getTransaction().addSynchronization(new Synchronization() {
749                        @Override
750                        public void afterCommit() throws Exception {
751                            sendLock.lockInterruptibly();
752                            try {
753                                // It could take while before we receive the commit
754                                // op, by that time the message could have expired..
755                                if (broker.isExpired(message)) {
756                                    broker.messageExpired(context, message, null);
757                                    destinationStatistics.getExpired().increment();
758                                    return;
759                                }
760                                sendMessage(message);
761                            } finally {
762                                sendLock.unlock();
763                                message.decrementReferenceCount();
764                            }
765                            messageSent(context, message);
766                        }
767                        @Override
768                        public void afterRollback() throws Exception {
769                            message.decrementReferenceCount();
770                        }
771                    });
772                } else {
773                    // Add to the pending list, this takes care of incrementing the
774                    // usage manager.
775                    sendMessage(message);
776                }
777            } finally {
778                sendLock.unlock();
779            }
780            if (!context.isInTransaction()) {
781                messageSent(context, message);
782            }
783            if (result != null && !result.isCancelled()) {
784                try {
785                    result.get();
786                } catch (CancellationException e) {
787                    // ignore - the task has been cancelled if the message
788                    // has already been deleted
789                }
790            }
791        }
792    
793        private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
794            if (message.isPersistent()) {
795                if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
796                    final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
797                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
798                        + message.getProducerId() + ") to prevent flooding "
799                        + getActiveMQDestination().getQualifiedName() + "."
800                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
801    
802                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
803                }
804            } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
805                final String logMessage = "Temp Store is Full ("
806                        + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
807                        +"). Stopping producer (" + message.getProducerId()
808                    + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
809                    + " See http://activemq.apache.org/producer-flow-control.html for more info";
810    
811                waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
812            }
813        }
814    
815        private void expireMessages() {
816            if (LOG.isDebugEnabled()) {
817                LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages ..");
818            }
819    
820            // just track the insertion count
821            List<Message> browsedMessages = new InsertionCountList<Message>();
822            doBrowse(browsedMessages, this.getMaxExpirePageSize());
823            asyncWakeup();
824            if (LOG.isDebugEnabled()) {
825                LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done.");
826            }
827        }
828    
829        public void gc() {
830        }
831    
832        public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
833                throws IOException {
834            messageConsumed(context, node);
835            if (store != null && node.isPersistent()) {
836                // the original ack may be a ranged ack, but we are trying to delete
837                // a specific
838                // message store here so we need to convert to a non ranged ack.
839                if (ack.getMessageCount() > 0) {
840                    // Dup the ack
841                    MessageAck a = new MessageAck();
842                    ack.copy(a);
843                    ack = a;
844                    // Convert to non-ranged.
845                    ack.setFirstMessageId(node.getMessageId());
846                    ack.setLastMessageId(node.getMessageId());
847                    ack.setMessageCount(1);
848                }
849    
850                store.removeAsyncMessage(context, ack);
851            }
852        }
853    
854        Message loadMessage(MessageId messageId) throws IOException {
855            Message msg = null;
856            if (store != null) { // can be null for a temp q
857                msg = store.getMessage(messageId);
858                if (msg != null) {
859                    msg.setRegionDestination(this);
860                }
861            }
862            return msg;
863        }
864    
865        @Override
866        public String toString() {
867            int size = 0;
868            messagesLock.readLock().lock();
869            try{
870                size = messages.size();
871            }finally {
872                messagesLock.readLock().unlock();
873            }
874            return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
875                    + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
876                    + messageGroupOwners;
877        }
878    
879        public void start() throws Exception {
880            if (memoryUsage != null) {
881                memoryUsage.start();
882            }
883            if (systemUsage.getStoreUsage() != null) {
884                systemUsage.getStoreUsage().start();
885            }
886            systemUsage.getMemoryUsage().addUsageListener(this);
887            messages.start();
888            if (getExpireMessagesPeriod() > 0) {
889                scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
890            }
891            doPageIn(false);
892        }
893    
894        public void stop() throws Exception {
895            if (taskRunner != null) {
896                taskRunner.shutdown();
897            }
898            if (this.executor != null) {
899                this.executor.shutdownNow();
900            }
901    
902            scheduler.cancel(expireMessagesTask);
903    
904            if (flowControlTimeoutTask.isAlive()) {
905                flowControlTimeoutTask.interrupt();
906            }
907    
908            if (messages != null) {
909                messages.stop();
910            }
911    
912            systemUsage.getMemoryUsage().removeUsageListener(this);
913            if (memoryUsage != null) {
914                memoryUsage.stop();
915            }
916            if (store != null) {
917                store.stop();
918            }
919        }
920    
921        // Properties
922        // -------------------------------------------------------------------------
923        @Override
924        public ActiveMQDestination getActiveMQDestination() {
925            return destination;
926        }
927    
928        public MessageGroupMap getMessageGroupOwners() {
929            if (messageGroupOwners == null) {
930                messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
931            }
932            return messageGroupOwners;
933        }
934    
935        public DispatchPolicy getDispatchPolicy() {
936            return dispatchPolicy;
937        }
938    
939        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
940            this.dispatchPolicy = dispatchPolicy;
941        }
942    
943        public MessageGroupMapFactory getMessageGroupMapFactory() {
944            return messageGroupMapFactory;
945        }
946    
947        public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
948            this.messageGroupMapFactory = messageGroupMapFactory;
949        }
950    
951        public PendingMessageCursor getMessages() {
952            return this.messages;
953        }
954    
955        public void setMessages(PendingMessageCursor messages) {
956            this.messages = messages;
957        }
958    
959        public boolean isUseConsumerPriority() {
960            return useConsumerPriority;
961        }
962    
963        public void setUseConsumerPriority(boolean useConsumerPriority) {
964            this.useConsumerPriority = useConsumerPriority;
965        }
966    
967        public boolean isStrictOrderDispatch() {
968            return strictOrderDispatch;
969        }
970    
971        public void setStrictOrderDispatch(boolean strictOrderDispatch) {
972            this.strictOrderDispatch = strictOrderDispatch;
973        }
974    
975        public boolean isOptimizedDispatch() {
976            return optimizedDispatch;
977        }
978    
979        public void setOptimizedDispatch(boolean optimizedDispatch) {
980            this.optimizedDispatch = optimizedDispatch;
981        }
982    
983        public int getTimeBeforeDispatchStarts() {
984            return timeBeforeDispatchStarts;
985        }
986    
987        public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
988            this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
989        }
990    
991        public int getConsumersBeforeDispatchStarts() {
992            return consumersBeforeDispatchStarts;
993        }
994    
995        public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
996            this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
997        }
998    
999        public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
1000            this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
1001        }
1002    
1003        public boolean isAllConsumersExclusiveByDefault() {
1004            return allConsumersExclusiveByDefault;
1005        }
1006    
1007    
1008        // Implementation methods
1009        // -------------------------------------------------------------------------
1010        private QueueMessageReference createMessageReference(Message message) {
1011            QueueMessageReference result = new IndirectMessageReference(message);
1012            return result;
1013        }
1014    
1015        public Message[] browse() {
1016            List<Message> browseList = new ArrayList<Message>();
1017            doBrowse(browseList, getMaxBrowsePageSize());
1018            return browseList.toArray(new Message[browseList.size()]);
1019        }
1020    
1021        public void doBrowse(List<Message> browseList, int max) {
1022            final ConnectionContext connectionContext = createConnectionContext();
1023            try {
1024                pageInMessages(false);
1025                List<MessageReference> toExpire = new ArrayList<MessageReference>();
1026    
1027                pagedInPendingDispatchLock.writeLock().lock();
1028                try {
1029                    addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
1030                    for (MessageReference ref : toExpire) {
1031                        pagedInPendingDispatch.remove(ref);
1032                        if (broker.isExpired(ref)) {
1033                            if (LOG.isDebugEnabled()) {
1034                                LOG.debug("expiring from pagedInPending: " + ref);
1035                            }
1036                            messageExpired(connectionContext, ref);
1037                        }
1038                    }
1039                } finally {
1040                    pagedInPendingDispatchLock.writeLock().unlock();
1041                }
1042                toExpire.clear();
1043                pagedInMessagesLock.readLock().lock();
1044                try {
1045                    addAll(pagedInMessages.values(), browseList, max, toExpire);
1046                } finally {
1047                    pagedInMessagesLock.readLock().unlock();
1048                }
1049                for (MessageReference ref : toExpire) {
1050                    if (broker.isExpired(ref)) {
1051                        if (LOG.isDebugEnabled()) {
1052                            LOG.debug("expiring from pagedInMessages: " + ref);
1053                        }
1054                        messageExpired(connectionContext, ref);
1055                    } else {
1056                        pagedInMessagesLock.writeLock().lock();
1057                        try {
1058                            pagedInMessages.remove(ref.getMessageId());
1059                        } finally {
1060                            pagedInMessagesLock.writeLock().unlock();
1061                        }
1062                    }
1063                }
1064    
1065                if (browseList.size() < getMaxBrowsePageSize()) {
1066                    messagesLock.writeLock().lock();
1067                    try {
1068                        try {
1069                            messages.reset();
1070                            while (messages.hasNext() && browseList.size() < max) {
1071                                MessageReference node = messages.next();
1072                                if (node.isExpired()) {
1073                                    if (broker.isExpired(node)) {
1074                                        if (LOG.isDebugEnabled()) {
1075                                            LOG.debug("expiring from messages: " + node);
1076                                        }
1077                                        messageExpired(connectionContext, createMessageReference(node.getMessage()));
1078                                    }
1079                                    messages.remove();
1080                                } else {
1081                                    messages.rollback(node.getMessageId());
1082                                    if (browseList.contains(node.getMessage()) == false) {
1083                                        browseList.add(node.getMessage());
1084                                    }
1085                                }
1086                                node.decrementReferenceCount();
1087                            }
1088                        } finally {
1089                            messages.release();
1090                        }
1091                    } finally {
1092                        messagesLock.writeLock().unlock();
1093                    }
1094                }
1095    
1096            } catch (Exception e) {
1097                LOG.error("Problem retrieving message for browse", e);
1098            }
1099        }
1100    
1101        private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
1102                List<MessageReference> toExpire) throws Exception {
1103            for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
1104                QueueMessageReference ref = (QueueMessageReference) i.next();
1105                if (ref.isExpired()) {
1106                    toExpire.add(ref);
1107                } else if (l.contains(ref.getMessage()) == false) {
1108                    l.add(ref.getMessage());
1109                }
1110            }
1111        }
1112    
1113        public QueueMessageReference getMessage(String id) {
1114            MessageId msgId = new MessageId(id);
1115            pagedInMessagesLock.readLock().lock();
1116            try{
1117                QueueMessageReference ref = this.pagedInMessages.get(msgId);
1118                if (ref != null) {
1119                    return ref;
1120                }
1121            }finally {
1122                pagedInMessagesLock.readLock().unlock();
1123            }
1124            messagesLock.readLock().lock();
1125            try{
1126                try {
1127                    messages.reset();
1128                    while (messages.hasNext()) {
1129                        MessageReference mr = messages.next();
1130                        QueueMessageReference qmr = createMessageReference(mr.getMessage());
1131                        qmr.decrementReferenceCount();
1132                        messages.rollback(qmr.getMessageId());
1133                        if (msgId.equals(qmr.getMessageId())) {
1134                            return qmr;
1135                        }
1136                    }
1137                } finally {
1138                    messages.release();
1139                }
1140            }finally {
1141                messagesLock.readLock().unlock();
1142            }
1143            return null;
1144        }
1145    
1146        public void purge() throws Exception {
1147            ConnectionContext c = createConnectionContext();
1148            List<MessageReference> list = null;
1149            do {
1150                doPageIn(true);
1151                pagedInMessagesLock.readLock().lock();
1152                try {
1153                    list = new ArrayList<MessageReference>(pagedInMessages.values());
1154                }finally {
1155                    pagedInMessagesLock.readLock().unlock();
1156                }
1157    
1158                for (MessageReference ref : list) {
1159                    try {
1160                        QueueMessageReference r = (QueueMessageReference) ref;
1161                        removeMessage(c, r);
1162                    } catch (IOException e) {
1163                    }
1164                }
1165                // don't spin/hang if stats are out and there is nothing left in the
1166                // store
1167            } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1168            if (this.destinationStatistics.getMessages().getCount() > 0) {
1169                LOG.warn(getActiveMQDestination().getQualifiedName()
1170                        + " after purge complete, message count stats report: "
1171                        + this.destinationStatistics.getMessages().getCount());
1172            }
1173            gc();
1174            this.destinationStatistics.getMessages().setCount(0);
1175            getMessages().clear();
1176        }
1177    
1178        public void clearPendingMessages() {
1179            messagesLock.writeLock().lock();
1180            try {
1181                if (store != null) {
1182                    store.resetBatching();
1183                }
1184                messages.gc();
1185                asyncWakeup();
1186            } finally {
1187                messagesLock.writeLock().unlock();
1188            }
1189        }
1190    
1191        /**
1192         * Removes the message matching the given messageId
1193         */
1194        public boolean removeMessage(String messageId) throws Exception {
1195            return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1196        }
1197    
1198        /**
1199         * Removes the messages matching the given selector
1200         *
1201         * @return the number of messages removed
1202         */
1203        public int removeMatchingMessages(String selector) throws Exception {
1204            return removeMatchingMessages(selector, -1);
1205        }
1206    
1207        /**
1208         * Removes the messages matching the given selector up to the maximum number
1209         * of matched messages
1210         *
1211         * @return the number of messages removed
1212         */
1213        public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1214            return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1215        }
1216    
1217        /**
1218         * Removes the messages matching the given filter up to the maximum number
1219         * of matched messages
1220         *
1221         * @return the number of messages removed
1222         */
1223        public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1224            int movedCounter = 0;
1225            Set<MessageReference> set = new HashSet<MessageReference>();
1226            ConnectionContext context = createConnectionContext();
1227            do {
1228                doPageIn(true);
1229                pagedInMessagesLock.readLock().lock();
1230                try{
1231                    set.addAll(pagedInMessages.values());
1232                }finally {
1233                    pagedInMessagesLock.readLock().unlock();
1234                }
1235                List<MessageReference> list = new ArrayList<MessageReference>(set);
1236                for (MessageReference ref : list) {
1237                    IndirectMessageReference r = (IndirectMessageReference) ref;
1238                    if (filter.evaluate(context, r)) {
1239    
1240                        removeMessage(context, r);
1241                        set.remove(r);
1242                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1243                            return movedCounter;
1244                        }
1245                    }
1246                }
1247            } while (set.size() < this.destinationStatistics.getMessages().getCount());
1248            return movedCounter;
1249        }
1250    
1251        /**
1252         * Copies the message matching the given messageId
1253         */
1254        public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1255                throws Exception {
1256            return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1257        }
1258    
1259        /**
1260         * Copies the messages matching the given selector
1261         *
1262         * @return the number of messages copied
1263         */
1264        public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1265                throws Exception {
1266            return copyMatchingMessagesTo(context, selector, dest, -1);
1267        }
1268    
1269        /**
1270         * Copies the messages matching the given selector up to the maximum number
1271         * of matched messages
1272         *
1273         * @return the number of messages copied
1274         */
1275        public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1276                int maximumMessages) throws Exception {
1277            return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1278        }
1279    
1280        /**
1281         * Copies the messages matching the given filter up to the maximum number of
1282         * matched messages
1283         *
1284         * @return the number of messages copied
1285         */
1286        public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1287                int maximumMessages) throws Exception {
1288            int movedCounter = 0;
1289            int count = 0;
1290            Set<MessageReference> set = new HashSet<MessageReference>();
1291            do {
1292                int oldMaxSize = getMaxPageSize();
1293                setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1294                doPageIn(true);
1295                setMaxPageSize(oldMaxSize);
1296                pagedInMessagesLock.readLock().lock();
1297                try {
1298                    set.addAll(pagedInMessages.values());
1299                }finally {
1300                    pagedInMessagesLock.readLock().unlock();
1301                }
1302                List<MessageReference> list = new ArrayList<MessageReference>(set);
1303                for (MessageReference ref : list) {
1304                    IndirectMessageReference r = (IndirectMessageReference) ref;
1305                    if (filter.evaluate(context, r)) {
1306    
1307                        r.incrementReferenceCount();
1308                        try {
1309                            Message m = r.getMessage();
1310                            BrokerSupport.resend(context, m, dest);
1311                            if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1312                                return movedCounter;
1313                            }
1314                        } finally {
1315                            r.decrementReferenceCount();
1316                        }
1317                    }
1318                    count++;
1319                }
1320            } while (count < this.destinationStatistics.getMessages().getCount());
1321            return movedCounter;
1322        }
1323    
1324        /**
1325         * Move a message
1326         *
1327         * @param context
1328         *            connection context
1329         * @param m
1330         *            QueueMessageReference
1331         * @param dest
1332         *            ActiveMQDestination
1333         * @throws Exception
1334         */
1335        public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1336            BrokerSupport.resend(context, m.getMessage(), dest);
1337            removeMessage(context, m);
1338            messagesLock.writeLock().lock();
1339            try{
1340                messages.rollback(m.getMessageId());
1341            }finally {
1342                messagesLock.writeLock().unlock();
1343            }
1344            return true;
1345        }
1346    
1347        /**
1348         * Moves the message matching the given messageId
1349         */
1350        public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1351                throws Exception {
1352            return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1353        }
1354    
1355        /**
1356         * Moves the messages matching the given selector
1357         *
1358         * @return the number of messages removed
1359         */
1360        public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1361                throws Exception {
1362            return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1363        }
1364    
1365        /**
1366         * Moves the messages matching the given selector up to the maximum number
1367         * of matched messages
1368         */
1369        public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1370                int maximumMessages) throws Exception {
1371            return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1372        }
1373    
1374        /**
1375         * Moves the messages matching the given filter up to the maximum number of
1376         * matched messages
1377         */
1378        public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1379                ActiveMQDestination dest, int maximumMessages) throws Exception {
1380            int movedCounter = 0;
1381            Set<QueueMessageReference> set = new HashSet<QueueMessageReference>();
1382            do {
1383                doPageIn(true);
1384                pagedInMessagesLock.readLock().lock();
1385                try{
1386                    set.addAll(pagedInMessages.values());
1387                }finally {
1388                    pagedInMessagesLock.readLock().unlock();
1389                }
1390                List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1391                for (QueueMessageReference ref : list) {
1392                    if (filter.evaluate(context, ref)) {
1393                        // We should only move messages that can be locked.
1394                        moveMessageTo(context, ref, dest);
1395                        set.remove(ref);
1396                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1397                            return movedCounter;
1398                        }
1399                    }
1400                }
1401            } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1402            return movedCounter;
1403        }
1404    
1405        /**
1406         * @return true if we would like to iterate again
1407         * @see org.apache.activemq.thread.Task#iterate()
1408         */
1409        public boolean iterate() {
1410            MDC.put("activemq.destination", getName());
1411            boolean pageInMoreMessages = false;
1412            synchronized (iteratingMutex) {
1413    
1414                // If optimize dispatch is on or this is a slave this method could be called recursively
1415                // we set this state value to short-circuit wakeup in those cases to avoid that as it
1416                // could lead to errors.
1417                iterationRunning = true;
1418    
1419                // do early to allow dispatch of these waiting messages
1420                synchronized (messagesWaitingForSpace) {
1421                    Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1422                    while (it.hasNext()) {
1423                        if (!memoryUsage.isFull()) {
1424                            Runnable op = it.next();
1425                            it.remove();
1426                            op.run();
1427                        } else {
1428                            registerCallbackForNotFullNotification();
1429                            break;
1430                        }
1431                    }
1432                }
1433    
1434                if (firstConsumer) {
1435                    firstConsumer = false;
1436                    try {
1437                        if (consumersBeforeDispatchStarts > 0) {
1438                            int timeout = 1000; // wait one second by default if
1439                                                // consumer count isn't reached
1440                            if (timeBeforeDispatchStarts > 0) {
1441                                timeout = timeBeforeDispatchStarts;
1442                            }
1443                            if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1444                                if (LOG.isDebugEnabled()) {
1445                                    LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
1446                                }
1447                            } else {
1448                                if (LOG.isDebugEnabled()) {
1449                                    LOG.debug(timeout + " ms elapsed and " + consumers.size()
1450                                            + " consumers subscribed. Starting dispatch.");
1451                                }
1452                            }
1453                        }
1454                        if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1455                            iteratingMutex.wait(timeBeforeDispatchStarts);
1456                            if (LOG.isDebugEnabled()) {
1457                                LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
1458                            }
1459                        }
1460                    } catch (Exception e) {
1461                        LOG.error(e.toString());
1462                    }
1463                }
1464    
1465                BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
1466    
1467                messagesLock.readLock().lock();
1468                try{
1469                    pageInMoreMessages |= !messages.isEmpty();
1470                } finally {
1471                    messagesLock.readLock().unlock();
1472                }
1473    
1474                pagedInPendingDispatchLock.readLock().lock();
1475                try {
1476                    pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
1477                } finally {
1478                    pagedInPendingDispatchLock.readLock().unlock();
1479                }
1480    
1481                // Perhaps we should page always into the pagedInPendingDispatch
1482                // list if
1483                // !messages.isEmpty(), and then if
1484                // !pagedInPendingDispatch.isEmpty()
1485                // then we do a dispatch.
1486                if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
1487                    try {
1488                        pageInMessages(pendingBrowserDispatch != null);
1489    
1490                    } catch (Throwable e) {
1491                        LOG.error("Failed to page in more queue messages ", e);
1492                    }
1493                }
1494    
1495                if (pendingBrowserDispatch != null) {
1496                    ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
1497                    pagedInMessagesLock.readLock().lock();
1498                    try{
1499                        alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
1500                    }finally {
1501                        pagedInMessagesLock.readLock().unlock();
1502                    }
1503                    if (LOG.isDebugEnabled()) {
1504                        LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
1505                                + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
1506                    }
1507                    do {
1508                        try {
1509                            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1510                            msgContext.setDestination(destination);
1511    
1512                            QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
1513                            for (QueueMessageReference node : alreadyDispatchedMessages) {
1514                                if (!node.isAcked()) {
1515                                    msgContext.setMessageReference(node);
1516                                    if (browser.matches(node, msgContext)) {
1517                                        browser.add(node);
1518                                    }
1519                                }
1520                            }
1521                            pendingBrowserDispatch.done();
1522                        } catch (Exception e) {
1523                            LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
1524                        }
1525    
1526                    } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
1527                }
1528    
1529                if (pendingWakeups.get() > 0) {
1530                    pendingWakeups.decrementAndGet();
1531                }
1532                MDC.remove("activemq.destination");
1533                iterationRunning = false;
1534    
1535                return pendingWakeups.get() > 0;
1536            }
1537        }
1538    
1539        protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1540            return new MessageReferenceFilter() {
1541                public boolean evaluate(ConnectionContext context, MessageReference r) {
1542                    return messageId.equals(r.getMessageId().toString());
1543                }
1544    
1545                @Override
1546                public String toString() {
1547                    return "MessageIdFilter: " + messageId;
1548                }
1549            };
1550        }
1551    
1552        protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1553    
1554            if (selector == null || selector.isEmpty()) {
1555                return new MessageReferenceFilter() {
1556    
1557                    @Override
1558                    public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException {
1559                        return true;
1560                    }
1561                };
1562            }
1563    
1564            final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1565    
1566            return new MessageReferenceFilter() {
1567                public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1568                    MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1569    
1570                    messageEvaluationContext.setMessageReference(r);
1571                    if (messageEvaluationContext.getDestination() == null) {
1572                        messageEvaluationContext.setDestination(getActiveMQDestination());
1573                    }
1574    
1575                    return selectorExpression.matches(messageEvaluationContext);
1576                }
1577            };
1578        }
1579    
1580        protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1581            removeMessage(c, null, r);
1582            pagedInPendingDispatchLock.writeLock().lock();
1583            try {
1584                pagedInPendingDispatch.remove(r);
1585            } finally {
1586                pagedInPendingDispatchLock.writeLock().unlock();
1587            }
1588    
1589        }
1590    
1591        protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1592            MessageAck ack = new MessageAck();
1593            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1594            ack.setDestination(destination);
1595            ack.setMessageID(r.getMessageId());
1596            removeMessage(c, subs, r, ack);
1597        }
1598    
1599        protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1600                MessageAck ack) throws IOException {
1601            reference.setAcked(true);
1602            // This sends the ack the the journal..
1603            if (!ack.isInTransaction()) {
1604                acknowledge(context, sub, ack, reference);
1605                getDestinationStatistics().getDequeues().increment();
1606                dropMessage(reference);
1607            } else {
1608                try {
1609                    acknowledge(context, sub, ack, reference);
1610                } finally {
1611                    context.getTransaction().addSynchronization(new Synchronization() {
1612    
1613                        @Override
1614                        public void afterCommit() throws Exception {
1615                            getDestinationStatistics().getDequeues().increment();
1616                            dropMessage(reference);
1617                            wakeup();
1618                        }
1619    
1620                        @Override
1621                        public void afterRollback() throws Exception {
1622                            reference.setAcked(false);
1623                        }
1624                    });
1625                }
1626            }
1627            if (ack.isPoisonAck()) {
1628                // message gone to DLQ, is ok to allow redelivery
1629                messagesLock.writeLock().lock();
1630                try{
1631                    messages.rollback(reference.getMessageId());
1632                }finally {
1633                    messagesLock.writeLock().unlock();
1634                }
1635            }
1636    
1637        }
1638    
1639        private void dropMessage(QueueMessageReference reference) {
1640            reference.drop();
1641            destinationStatistics.getMessages().decrement();
1642            pagedInMessagesLock.writeLock().lock();
1643            try{
1644                pagedInMessages.remove(reference.getMessageId());
1645            }finally {
1646                pagedInMessagesLock.writeLock().unlock();
1647            }
1648        }
1649    
1650        public void messageExpired(ConnectionContext context, MessageReference reference) {
1651            messageExpired(context, null, reference);
1652        }
1653    
1654        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1655            if (LOG.isDebugEnabled()) {
1656                LOG.debug("message expired: " + reference);
1657            }
1658            broker.messageExpired(context, reference, subs);
1659            destinationStatistics.getExpired().increment();
1660            try {
1661                removeMessage(context, subs, (QueueMessageReference) reference);
1662            } catch (IOException e) {
1663                LOG.error("Failed to remove expired Message from the store ", e);
1664            }
1665        }
1666    
1667        final void sendMessage(final Message msg) throws Exception {
1668            messagesLock.writeLock().lock();
1669            try{
1670                messages.addMessageLast(msg);
1671            }finally {
1672                messagesLock.writeLock().unlock();
1673            }
1674        }
1675    
1676        final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1677            destinationStatistics.getEnqueues().increment();
1678            destinationStatistics.getMessages().increment();
1679            messageDelivered(context, msg);
1680            consumersLock.readLock().lock();
1681            try {
1682                if (consumers.isEmpty()) {
1683                    onMessageWithNoConsumers(context, msg);
1684                }
1685            }finally {
1686                consumersLock.readLock().unlock();
1687            }
1688            if (LOG.isDebugEnabled()) {
1689                LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
1690            }
1691            wakeup();
1692        }
1693    
1694        public void wakeup() {
1695            if ((optimizedDispatch || isSlave()) && !iterationRunning) {
1696                iterate();
1697                pendingWakeups.incrementAndGet();
1698            } else {
1699                asyncWakeup();
1700            }
1701        }
1702    
1703        private void asyncWakeup() {
1704            try {
1705                pendingWakeups.incrementAndGet();
1706                this.taskRunner.wakeup();
1707            } catch (InterruptedException e) {
1708                LOG.warn("Async task tunner failed to wakeup ", e);
1709            }
1710        }
1711    
1712        private boolean isSlave() {
1713            return broker.getBrokerService().isSlave();
1714        }
1715    
1716        private void doPageIn(boolean force) throws Exception {
1717            PendingList newlyPaged = doPageInForDispatch(force);
1718            pagedInPendingDispatchLock.writeLock().lock();
1719            try {
1720                if (pagedInPendingDispatch.isEmpty()) {
1721                    pagedInPendingDispatch.addAll(newlyPaged);
1722    
1723                } else {
1724                    for (MessageReference qmr : newlyPaged) {
1725                        if (!pagedInPendingDispatch.contains(qmr)) {
1726                            pagedInPendingDispatch.addMessageLast(qmr);
1727                        }
1728                    }
1729                }
1730            } finally {
1731                pagedInPendingDispatchLock.writeLock().unlock();
1732            }
1733        }
1734    
1735        private PendingList doPageInForDispatch(boolean force) throws Exception {
1736            List<QueueMessageReference> result = null;
1737            PendingList resultList = null;
1738    
1739            int toPageIn = Math.min(getMaxPageSize(), messages.size());
1740            if (LOG.isDebugEnabled()) {
1741                LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
1742                        + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
1743                        + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
1744                        + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
1745            }
1746    
1747            if (isLazyDispatch() && !force) {
1748                // Only page in the minimum number of messages which can be
1749                // dispatched immediately.
1750                toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1751            }
1752            int pagedInPendingSize = 0;
1753            pagedInPendingDispatchLock.readLock().lock();
1754            try {
1755                pagedInPendingSize = pagedInPendingDispatch.size();
1756            } finally {
1757                pagedInPendingDispatchLock.readLock().unlock();
1758            }
1759            if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1760                int count = 0;
1761                result = new ArrayList<QueueMessageReference>(toPageIn);
1762                messagesLock.writeLock().lock();
1763                try {
1764                    try {
1765                        messages.setMaxBatchSize(toPageIn);
1766                        messages.reset();
1767                        while (messages.hasNext() && count < toPageIn) {
1768                            MessageReference node = messages.next();
1769                            messages.remove();
1770    
1771                            QueueMessageReference ref = createMessageReference(node.getMessage());
1772                            if (ref.isExpired()) {
1773                                if (broker.isExpired(ref)) {
1774                                    messageExpired(createConnectionContext(), ref);
1775                                } else {
1776                                    ref.decrementReferenceCount();
1777                                }
1778                            } else {
1779                                result.add(ref);
1780                                count++;
1781                            }
1782                        }
1783                    } finally {
1784                        messages.release();
1785                    }
1786                } finally {
1787                    messagesLock.writeLock().unlock();
1788                }
1789                // Only add new messages, not already pagedIn to avoid multiple
1790                // dispatch attempts
1791                pagedInMessagesLock.writeLock().lock();
1792                try {
1793                    if(isPrioritizedMessages()) {
1794                        resultList = new PrioritizedPendingList();
1795                    } else {
1796                        resultList = new OrderedPendingList();
1797                    }
1798                    for (QueueMessageReference ref : result) {
1799                        if (!pagedInMessages.containsKey(ref.getMessageId())) {
1800                            pagedInMessages.put(ref.getMessageId(), ref);
1801                            resultList.addMessageLast(ref);
1802                        } else {
1803                            ref.decrementReferenceCount();
1804                        }
1805                    }
1806                } finally {
1807                    pagedInMessagesLock.writeLock().unlock();
1808                }
1809            } else {
1810                // Avoid return null list, if condition is not validated
1811                resultList = new OrderedPendingList();
1812            }
1813    
1814            return resultList;
1815        }
1816    
1817        private void doDispatch(PendingList list) throws Exception {
1818            boolean doWakeUp = false;
1819    
1820            pagedInPendingDispatchLock.writeLock().lock();
1821            try {
1822                if (!redeliveredWaitingDispatch.isEmpty()) {
1823                    // Try first to dispatch redelivered messages to keep an
1824                    // proper order
1825                    redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
1826                }
1827                if (!pagedInPendingDispatch.isEmpty()) {
1828                    // Next dispatch anything that had not been
1829                    // dispatched before.
1830                    pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
1831                }
1832                // and now see if we can dispatch the new stuff.. and append to
1833                // the pending
1834                // list anything that does not actually get dispatched.
1835                if (list != null && !list.isEmpty()) {
1836                    if (pagedInPendingDispatch.isEmpty()) {
1837                        pagedInPendingDispatch.addAll(doActualDispatch(list));
1838                    } else {
1839                        for (MessageReference qmr : list) {
1840                            if (!pagedInPendingDispatch.contains(qmr)) {
1841                                pagedInPendingDispatch.addMessageLast(qmr);
1842                            }
1843                        }
1844                        doWakeUp = true;
1845                    }
1846                }
1847            } finally {
1848                pagedInPendingDispatchLock.writeLock().unlock();
1849            }
1850    
1851            if (doWakeUp) {
1852                // avoid lock order contention
1853                asyncWakeup();
1854            }
1855        }
1856    
1857        /**
1858         * @return list of messages that could get dispatched to consumers if they
1859         *         were not full.
1860         */
1861        private PendingList doActualDispatch(PendingList list) throws Exception {
1862            List<Subscription> consumers;
1863            consumersLock.writeLock().lock();
1864    
1865            try {
1866                if (this.consumers.isEmpty() || isSlave()) {
1867                    // slave dispatch happens in processDispatchNotification
1868                    return list;
1869                }
1870                consumers = new ArrayList<Subscription>(this.consumers);
1871            }finally {
1872                consumersLock.writeLock().unlock();
1873            }
1874    
1875            PendingList rc;
1876            if(isPrioritizedMessages()) {
1877                rc = new PrioritizedPendingList();
1878            } else {
1879                rc = new OrderedPendingList();
1880            }
1881    
1882            Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
1883    
1884            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
1885    
1886                MessageReference node = (MessageReference) iterator.next();
1887                Subscription target = null;
1888                int interestCount = 0;
1889                for (Subscription s : consumers) {
1890                    if (s instanceof QueueBrowserSubscription) {
1891                        interestCount++;
1892                        continue;
1893                    }
1894                    if (!fullConsumers.contains(s) && !s.isFull()) {
1895                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)) {
1896                            // Dispatch it.
1897                            s.add(node);
1898                            target = s;
1899                            break;
1900                        }
1901                    } else {
1902                        // no further dispatch of list to a full consumer to
1903                        // avoid out of order message receipt
1904                        fullConsumers.add(s);
1905                        if (LOG.isTraceEnabled()) {
1906                            LOG.trace("Sub full " + s);
1907                        }
1908                    }
1909                    // make sure it gets dispatched again
1910                    if (!node.isDropped() && !((QueueMessageReference) node).isAcked() &&
1911                            (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
1912                        interestCount++;
1913                    }
1914                }
1915    
1916                if ((target == null && interestCount > 0) || consumers.size() == 0) {
1917                    // This means all subs were full or that there are no
1918                    // consumers...
1919                    rc.addMessageLast((QueueMessageReference) node);
1920                }
1921    
1922                // If it got dispatched, rotate the consumer list to get round robin
1923                // distribution.
1924                if (target != null && !strictOrderDispatch && consumers.size() > 1
1925                        && !dispatchSelector.isExclusiveConsumer(target)) {
1926                    consumersLock.writeLock().lock();
1927                    try {
1928                        if (removeFromConsumerList(target)) {
1929                            addToConsumerList(target);
1930                            consumers = new ArrayList<Subscription>(this.consumers);
1931                        }
1932                    }finally {
1933                        consumersLock.writeLock().unlock();
1934                    }
1935                }
1936            }
1937    
1938            return rc;
1939        }
1940    
1941        protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
1942            boolean result = true;
1943            // Keep message groups together.
1944            String groupId = node.getGroupID();
1945            int sequence = node.getGroupSequence();
1946            if (groupId != null) {
1947                //MessageGroupMap messageGroupOwners = ((Queue) node
1948                //        .getRegionDestination()).getMessageGroupOwners();
1949    
1950                MessageGroupMap messageGroupOwners = getMessageGroupOwners();
1951                // If we can own the first, then no-one else should own the
1952                // rest.
1953                if (sequence == 1) {
1954                    assignGroup(subscription, messageGroupOwners, node, groupId);
1955                } else {
1956    
1957                    // Make sure that the previous owner is still valid, we may
1958                    // need to become the new owner.
1959                    ConsumerId groupOwner;
1960    
1961                    groupOwner = messageGroupOwners.get(groupId);
1962                    if (groupOwner == null) {
1963                        assignGroup(subscription, messageGroupOwners, node, groupId);
1964                    } else {
1965                        if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
1966                            // A group sequence < 1 is an end of group signal.
1967                            if (sequence < 0) {
1968                                messageGroupOwners.removeGroup(groupId);
1969                            }
1970                        } else {
1971                            result = false;
1972                        }
1973                    }
1974                }
1975            }
1976    
1977            return result;
1978    
1979        }
1980    
1981        protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
1982            messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
1983            Message message = n.getMessage();
1984            if (message instanceof ActiveMQMessage) {
1985                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
1986                try {
1987                    activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
1988                } catch (JMSException e) {
1989                    LOG.warn("Failed to set boolean header: " + e, e);
1990                }
1991            }
1992        }
1993    
1994        protected void pageInMessages(boolean force) throws Exception {
1995            doDispatch(doPageInForDispatch(force));
1996        }
1997    
1998        private void addToConsumerList(Subscription sub) {
1999            if (useConsumerPriority) {
2000                consumers.add(sub);
2001                Collections.sort(consumers, orderedCompare);
2002            } else {
2003                consumers.add(sub);
2004            }
2005        }
2006    
2007        private boolean removeFromConsumerList(Subscription sub) {
2008            return consumers.remove(sub);
2009        }
2010    
2011        private int getConsumerMessageCountBeforeFull() throws Exception {
2012            int total = 0;
2013            boolean zeroPrefetch = false;
2014            consumersLock.readLock().lock();
2015            try{
2016                for (Subscription s : consumers) {
2017                    zeroPrefetch |= s.getPrefetchSize() == 0;
2018                    int countBeforeFull = s.countBeforeFull();
2019                    total += countBeforeFull;
2020                }
2021            }finally {
2022                consumersLock.readLock().unlock();
2023            }
2024            if (total == 0 && zeroPrefetch) {
2025                total = 1;
2026            }
2027            return total;
2028        }
2029    
2030        /*
2031         * In slave mode, dispatch is ignored till we get this notification as the
2032         * dispatch process is non deterministic between master and slave. On a
2033         * notification, the actual dispatch to the subscription (as chosen by the
2034         * master) is completed. (non-Javadoc)
2035         * @see
2036         * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
2037         * (org.apache.activemq.command.MessageDispatchNotification)
2038         */
2039        @Override
2040        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
2041            // do dispatch
2042            Subscription sub = getMatchingSubscription(messageDispatchNotification);
2043            if (sub != null) {
2044                MessageReference message = getMatchingMessage(messageDispatchNotification);
2045                sub.add(message);
2046                sub.processMessageDispatchNotification(messageDispatchNotification);
2047            }
2048        }
2049    
2050        private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
2051                throws Exception {
2052            QueueMessageReference message = null;
2053            MessageId messageId = messageDispatchNotification.getMessageId();
2054    
2055            pagedInPendingDispatchLock.writeLock().lock();
2056            try {
2057                for (MessageReference ref : pagedInPendingDispatch) {
2058                    if (messageId.equals(ref.getMessageId())) {
2059                        message = (QueueMessageReference)ref;
2060                        pagedInPendingDispatch.remove(ref);
2061                        break;
2062                    }
2063                }
2064            } finally {
2065                pagedInPendingDispatchLock.writeLock().unlock();
2066            }
2067    
2068            if (message == null) {
2069                pagedInMessagesLock.readLock().lock();
2070                try {
2071                    message = pagedInMessages.get(messageId);
2072                } finally {
2073                    pagedInMessagesLock.readLock().unlock();
2074                }
2075            }
2076    
2077            if (message == null) {
2078                messagesLock.writeLock().lock();
2079                try {
2080                    try {
2081                        messages.setMaxBatchSize(getMaxPageSize());
2082                        messages.reset();
2083                        while (messages.hasNext()) {
2084                            MessageReference node = messages.next();
2085                            messages.remove();
2086                            if (messageId.equals(node.getMessageId())) {
2087                                message = this.createMessageReference(node.getMessage());
2088                                break;
2089                            }
2090                        }
2091                    } finally {
2092                        messages.release();
2093                    }
2094                } finally {
2095                    messagesLock.writeLock().unlock();
2096                }
2097            }
2098    
2099            if (message == null) {
2100                Message msg = loadMessage(messageId);
2101                if (msg != null) {
2102                    message = this.createMessageReference(msg);
2103                }
2104            }
2105    
2106            if (message == null) {
2107                throw new JMSException("Slave broker out of sync with master - Message: "
2108                        + messageDispatchNotification.getMessageId() + " on "
2109                        + messageDispatchNotification.getDestination() + " does not exist among pending("
2110                        + pagedInPendingDispatch.size() + ") for subscription: "
2111                        + messageDispatchNotification.getConsumerId());
2112            }
2113            return message;
2114        }
2115    
2116        /**
2117         * Find a consumer that matches the id in the message dispatch notification
2118         *
2119         * @param messageDispatchNotification
2120         * @return sub or null if the subscription has been removed before dispatch
2121         * @throws JMSException
2122         */
2123        private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2124                throws JMSException {
2125            Subscription sub = null;
2126            consumersLock.readLock().lock();
2127            try {
2128                for (Subscription s : consumers) {
2129                    if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2130                        sub = s;
2131                        break;
2132                    }
2133                }
2134            }finally {
2135                consumersLock.readLock().unlock();
2136            }
2137            return sub;
2138        }
2139    
2140        public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
2141            if (oldPercentUsage > newPercentUsage) {
2142                asyncWakeup();
2143            }
2144        }
2145    
2146        @Override
2147        protected Logger getLog() {
2148            return LOG;
2149        }
2150    
2151        protected boolean isOptimizeStorage(){
2152            boolean result = false;
2153            if (isDoOptimzeMessageStorage()){
2154                consumersLock.readLock().lock();
2155                try{
2156                    if (consumers.isEmpty()==false){
2157                        result = true;
2158                        for (Subscription s : consumers) {
2159                            if (s.getPrefetchSize()==0){
2160                                result = false;
2161                                break;
2162                            }
2163                            if (s.isSlowConsumer()){
2164                                result = false;
2165                                break;
2166                            }
2167                            if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
2168                                result = false;
2169                                break;
2170                            }
2171                        }
2172                    }
2173                }finally {
2174                    consumersLock.readLock().unlock();
2175                }
2176            }
2177            return result;
2178        }
2179    }