001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.concurrent.CancellationException;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.CopyOnWriteArrayList;
026    import java.util.concurrent.Future;
027    import java.util.concurrent.locks.ReentrantReadWriteLock;
028    
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.broker.ProducerBrokerExchange;
032    import org.apache.activemq.broker.region.policy.DispatchPolicy;
033    import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
034    import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
035    import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
036    import org.apache.activemq.broker.util.InsertionCountList;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ExceptionResponse;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessageAck;
041    import org.apache.activemq.command.MessageId;
042    import org.apache.activemq.command.ProducerAck;
043    import org.apache.activemq.command.ProducerInfo;
044    import org.apache.activemq.command.Response;
045    import org.apache.activemq.command.SubscriptionInfo;
046    import org.apache.activemq.filter.MessageEvaluationContext;
047    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
048    import org.apache.activemq.store.MessageRecoveryListener;
049    import org.apache.activemq.store.TopicMessageStore;
050    import org.apache.activemq.thread.Task;
051    import org.apache.activemq.thread.TaskRunner;
052    import org.apache.activemq.thread.TaskRunnerFactory;
053    import org.apache.activemq.transaction.Synchronization;
054    import org.apache.activemq.util.SubscriptionKey;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    /**
059     * The Topic is a destination that sends a copy of a message to every active
060     * Subscription registered.
061     */
062    public class Topic extends BaseDestination implements Task {
063        protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
064        private final TopicMessageStore topicStore;
065        protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
066        private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
067        private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
068        private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
069        private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
070        private final TaskRunner taskRunner;
071        private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
072        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
073            public void run() {
074                try {
075                    Topic.this.taskRunner.wakeup();
076                } catch (InterruptedException e) {
077                }
078            };
079        };
080    
081        public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
082                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
083            super(brokerService, store, destination, parentStats);
084            this.topicStore = store;
085            // set default subscription recovery policy
086            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
087            this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
088        }
089    
090        @Override
091        public void initialize() throws Exception {
092            super.initialize();
093            if (store != null) {
094                // AMQ-2586: Better to leave this stat at zero than to give the user
095                // misleading metrics.
096                // int messageCount = store.getMessageCount();
097                // destinationStatistics.getMessages().setCount(messageCount);
098            }
099        }
100    
101        public List<Subscription> getConsumers() {
102            synchronized (consumers) {
103                return new ArrayList<Subscription>(consumers);
104            }
105        }
106    
107        public boolean lock(MessageReference node, LockOwner sub) {
108            return true;
109        }
110    
111        public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
112    
113           super.addSubscription(context, sub);
114    
115            if (!sub.getConsumerInfo().isDurable()) {
116    
117                // Do a retroactive recovery if needed.
118                if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
119    
120                    // synchronize with dispatch method so that no new messages are sent
121                    // while we are recovering a subscription to avoid out of order messages.
122                    dispatchLock.writeLock().lock();
123                    try {
124                        synchronized (consumers) {
125                            sub.add(context, this);
126                            consumers.add(sub);
127                        }
128                        subscriptionRecoveryPolicy.recover(context, this, sub);
129                    } finally {
130                        dispatchLock.writeLock().unlock();
131                    }
132    
133                } else {
134                    synchronized (consumers) {
135                        sub.add(context, this);
136                        consumers.add(sub);
137                    }
138                }
139            } else {
140                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
141                    sub.add(context, this);
142                    if(dsub.isActive()) {
143                            synchronized (consumers) {
144                                    boolean hasSubscription = false;
145            
146                                    if(consumers.size()==0) {
147                                    hasSubscription = false;
148                                    } else {
149                                            for(Subscription currentSub : consumers) {
150                                                    if(currentSub.getConsumerInfo().isDurable()) {
151                                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
152                                                if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
153                                                    hasSubscription = true;
154                                                    break;
155                                                }
156                                                    }
157                                            }
158                                    }
159                                    
160                            if(!hasSubscription)
161                                    consumers.add(sub);
162                        }
163                    }
164                durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
165            }
166        }
167    
168        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
169                throws Exception {
170            if (!sub.getConsumerInfo().isDurable()) {
171                super.removeSubscription(context, sub, lastDeliveredSequenceId);
172                synchronized (consumers) {
173                    consumers.remove(sub);
174                }
175            }
176            sub.remove(context, this);
177        }
178    
179        public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
180            if (topicStore != null) {
181                topicStore.deleteSubscription(key.clientId, key.subscriptionName);
182                DurableTopicSubscription removed = durableSubcribers.remove(key);
183                if (removed != null) {
184                    destinationStatistics.getConsumers().decrement();
185                    // deactivate and remove
186                    removed.deactivate(false);
187                    consumers.remove(removed);
188                }
189            }
190        }
191    
192        public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
193            // synchronize with dispatch method so that no new messages are sent
194            // while we are recovering a subscription to avoid out of order messages.
195            dispatchLock.writeLock().lock();
196            try {
197    
198                if (topicStore == null) {
199                    return;
200                }
201    
202                // Recover the durable subscription.
203                String clientId = subscription.getSubscriptionKey().getClientId();
204                String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
205                String selector = subscription.getConsumerInfo().getSelector();
206                SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
207                if (info != null) {
208                    // Check to see if selector changed.
209                    String s1 = info.getSelector();
210                    if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
211                        // Need to delete the subscription
212                        topicStore.deleteSubscription(clientId, subscriptionName);
213                        info = null;
214                    } else {
215                        synchronized (consumers) {
216                            consumers.add(subscription);
217                        }
218                    }
219                }
220    
221                // Do we need to create the subscription?
222                if (info == null) {
223                    info = new SubscriptionInfo();
224                    info.setClientId(clientId);
225                    info.setSelector(selector);
226                    info.setSubscriptionName(subscriptionName);
227                    info.setDestination(getActiveMQDestination());
228                    // This destination is an actual destination id.
229                    info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
230                    // This destination might be a pattern
231                    synchronized (consumers) {
232                        consumers.add(subscription);
233                        topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
234                    }
235                }
236    
237                final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
238                msgContext.setDestination(destination);
239                if (subscription.isRecoveryRequired()) {
240                    topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
241                        public boolean recoverMessage(Message message) throws Exception {
242                            message.setRegionDestination(Topic.this);
243                            try {
244                                msgContext.setMessageReference(message);
245                                if (subscription.matches(message, msgContext)) {
246                                    subscription.add(message);
247                                }
248                            } catch (IOException e) {
249                                LOG.error("Failed to recover this message " + message);
250                            }
251                            return true;
252                        }
253    
254                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
255                            throw new RuntimeException("Should not be called.");
256                        }
257    
258                        public boolean hasSpace() {
259                            return true;
260                        }
261    
262                        public boolean isDuplicate(MessageId id) {
263                            return false;
264                        }
265                    });
266                }
267            } finally {
268                dispatchLock.writeLock().unlock();
269            }
270        }
271    
272        public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
273            synchronized (consumers) {
274                consumers.remove(sub);
275            }
276            sub.remove(context, this);
277        }
278    
279        protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
280            if (subscription.getConsumerInfo().isRetroactive()) {
281                subscriptionRecoveryPolicy.recover(context, this, subscription);
282            }
283        }
284    
285        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
286            final ConnectionContext context = producerExchange.getConnectionContext();
287    
288            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
289            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
290                    && !context.isInRecoveryMode();
291    
292            // There is delay between the client sending it and it arriving at the
293            // destination.. it may have expired.
294            if (message.isExpired()) {
295                broker.messageExpired(context, message, null);
296                getDestinationStatistics().getExpired().increment();
297                if (sendProducerAck) {
298                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
299                    context.getConnection().dispatchAsync(ack);
300                }
301                return;
302            }
303    
304            if (memoryUsage.isFull()) {
305                isFull(context, memoryUsage);
306                fastProducer(context, producerInfo);
307    
308                if (isProducerFlowControl() && context.isProducerFlowControl()) {
309    
310                    if (warnOnProducerFlowControl) {
311                        warnOnProducerFlowControl = false;
312                        LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
313                                        + getActiveMQDestination().getQualifiedName()
314                                        + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
315                                        + " See http://activemq.apache.org/producer-flow-control.html for more info");
316                    }
317    
318                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
319                        throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
320                                + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
321                                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
322                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
323                    }
324    
325                    // We can avoid blocking due to low usage if the producer is
326                    // sending
327                    // a sync message or
328                    // if it is using a producer window
329                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
330                        synchronized (messagesWaitingForSpace) {
331                            messagesWaitingForSpace.add(new Runnable() {
332                                public void run() {
333                                    try {
334    
335                                        // While waiting for space to free up... the
336                                        // message may have expired.
337                                        if (message.isExpired()) {
338                                            broker.messageExpired(context, message, null);
339                                            getDestinationStatistics().getExpired().increment();
340                                        } else {
341                                            doMessageSend(producerExchange, message);
342                                        }
343    
344                                        if (sendProducerAck) {
345                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
346                                                    .getSize());
347                                            context.getConnection().dispatchAsync(ack);
348                                        } else {
349                                            Response response = new Response();
350                                            response.setCorrelationId(message.getCommandId());
351                                            context.getConnection().dispatchAsync(response);
352                                        }
353    
354                                    } catch (Exception e) {
355                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
356                                            ExceptionResponse response = new ExceptionResponse(e);
357                                            response.setCorrelationId(message.getCommandId());
358                                            context.getConnection().dispatchAsync(response);
359                                        }
360                                    }
361    
362                                }
363                            });
364    
365                            registerCallbackForNotFullNotification();
366                            context.setDontSendReponse(true);
367                            return;
368                        }
369    
370                    } else {
371                        // Producer flow control cannot be used, so we have do the
372                        // flow
373                        // control at the broker
374                        // by blocking this thread until there is space available.
375    
376                        if (memoryUsage.isFull()) {
377                            if (context.isInTransaction()) {
378    
379                                int count = 0;
380                                while (!memoryUsage.waitForSpace(1000)) {
381                                    if (context.getStopping().get()) {
382                                        throw new IOException("Connection closed, send aborted.");
383                                    }
384                                    if (count > 2 && context.isInTransaction()) {
385                                        count = 0;
386                                        int size = context.getTransaction().size();
387                                        LOG.warn("Waiting for space to send  transacted message - transaction elements = "
388                                                + size + " need more space to commit. Message = " + message);
389                                    }
390                                }
391                            } else {
392                                waitForSpace(
393                                        context,
394                                        memoryUsage,
395                                        "Usage Manager Memory Usage limit reached. Stopping producer ("
396                                                + message.getProducerId()
397                                                + ") to prevent flooding "
398                                                + getActiveMQDestination().getQualifiedName()
399                                                + "."
400                                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
401                            }
402                        }
403    
404                        // The usage manager could have delayed us by the time
405                        // we unblock the message could have expired..
406                        if (message.isExpired()) {
407                            getDestinationStatistics().getExpired().increment();
408                            if (LOG.isDebugEnabled()) {
409                                LOG.debug("Expired message: " + message);
410                            }
411                            return;
412                        }
413                    }
414                }
415            }
416    
417            doMessageSend(producerExchange, message);
418            messageDelivered(context, message);
419            if (sendProducerAck) {
420                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
421                context.getConnection().dispatchAsync(ack);
422            }
423        }
424    
425        /**
426         * do send the message - this needs to be synchronized to ensure messages
427         * are stored AND dispatched in the right order
428         *
429         * @param producerExchange
430         * @param message
431         * @throws IOException
432         * @throws Exception
433         */
434        synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
435                throws IOException, Exception {
436            final ConnectionContext context = producerExchange.getConnectionContext();
437            message.setRegionDestination(this);
438            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
439            Future<Object> result = null;
440    
441            if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
442                if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
443                    final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
444                            + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
445                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
446                            + " See http://activemq.apache.org/producer-flow-control.html for more info";
447                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
448                        throw new javax.jms.ResourceAllocationException(logMessage);
449                    }
450    
451                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
452                }
453                result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
454            }
455    
456            message.incrementReferenceCount();
457    
458            if (context.isInTransaction()) {
459                context.getTransaction().addSynchronization(new Synchronization() {
460                    @Override
461                    public void afterCommit() throws Exception {
462                        // It could take while before we receive the commit
463                        // operation.. by that time the message could have
464                        // expired..
465                        if (broker.isExpired(message)) {
466                            getDestinationStatistics().getExpired().increment();
467                            broker.messageExpired(context, message, null);
468                            message.decrementReferenceCount();
469                            return;
470                        }
471                        try {
472                            dispatch(context, message);
473                        } finally {
474                            message.decrementReferenceCount();
475                        }
476                    }
477                });
478    
479            } else {
480                try {
481                    dispatch(context, message);
482                } finally {
483                    message.decrementReferenceCount();
484                }
485            }
486    
487            if (result != null && !result.isCancelled()) {
488                try {
489                    result.get();
490                } catch (CancellationException e) {
491                    // ignore - the task has been cancelled if the message
492                    // has already been deleted
493                }
494            }
495        }
496    
497        private boolean canOptimizeOutPersistence() {
498            return durableSubcribers.size() == 0;
499        }
500    
501        @Override
502        public String toString() {
503            return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
504        }
505    
506        public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
507                final MessageReference node) throws IOException {
508            if (topicStore != null && node.isPersistent()) {
509                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
510                SubscriptionKey key = dsub.getSubscriptionKey();
511                topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
512            }
513            messageConsumed(context, node);
514        }
515    
516        public void gc() {
517        }
518    
519        public Message loadMessage(MessageId messageId) throws IOException {
520            return topicStore != null ? topicStore.getMessage(messageId) : null;
521        }
522    
523        public void start() throws Exception {
524            this.subscriptionRecoveryPolicy.start();
525            if (memoryUsage != null) {
526                memoryUsage.start();
527            }
528    
529            if (getExpireMessagesPeriod() > 0) {
530                scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
531            }
532        }
533    
534        public void stop() throws Exception {
535            if (taskRunner != null) {
536                taskRunner.shutdown();
537            }
538            this.subscriptionRecoveryPolicy.stop();
539            if (memoryUsage != null) {
540                memoryUsage.stop();
541            }
542            if (this.topicStore != null) {
543                this.topicStore.stop();
544            }
545    
546             scheduler.cancel(expireMessagesTask);
547        }
548    
549        public Message[] browse() {
550            final List<Message> result = new ArrayList<Message>();
551            doBrowse(result, getMaxBrowsePageSize());
552            return result.toArray(new Message[result.size()]);
553        }
554    
555        private void doBrowse(final List<Message> browseList, final int max) {
556            try {
557                if (topicStore != null) {
558                    final List<Message> toExpire = new ArrayList<Message>();
559                    topicStore.recover(new MessageRecoveryListener() {
560                        public boolean recoverMessage(Message message) throws Exception {
561                            if (message.isExpired()) {
562                                toExpire.add(message);
563                            }
564                            browseList.add(message);
565                            return true;
566                        }
567    
568                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
569                            return true;
570                        }
571    
572                        public boolean hasSpace() {
573                            return browseList.size() < max;
574                        }
575    
576                        public boolean isDuplicate(MessageId id) {
577                            return false;
578                        }
579                    });
580                    final ConnectionContext connectionContext = createConnectionContext();
581                    for (Message message : toExpire) {
582                        for (DurableTopicSubscription sub : durableSubcribers.values()) {
583                            if (!sub.isActive()) {
584                                messageExpired(connectionContext, sub, message);
585                            }
586                        }
587                    }
588                    Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
589                    if (msgs != null) {
590                        for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
591                            browseList.add(msgs[i]);
592                        }
593                    }
594                }
595            } catch (Throwable e) {
596                LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
597            }
598        }
599    
600        public boolean iterate() {
601            synchronized (messagesWaitingForSpace) {
602                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
603                    Runnable op = messagesWaitingForSpace.removeFirst();
604                    op.run();
605                }
606    
607                if (!messagesWaitingForSpace.isEmpty()) {
608                    registerCallbackForNotFullNotification();
609                }
610            }
611            return false;
612        }
613    
614        private void registerCallbackForNotFullNotification() {
615            // If the usage manager is not full, then the task will not
616            // get called..
617            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
618                // so call it directly here.
619                sendMessagesWaitingForSpaceTask.run();
620            }
621        }
622    
623        // Properties
624        // -------------------------------------------------------------------------
625    
626        public DispatchPolicy getDispatchPolicy() {
627            return dispatchPolicy;
628        }
629    
630        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
631            this.dispatchPolicy = dispatchPolicy;
632        }
633    
634        public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
635            return subscriptionRecoveryPolicy;
636        }
637    
638        public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
639            this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
640        }
641    
642        // Implementation methods
643        // -------------------------------------------------------------------------
644    
645        public final void wakeup() {
646        }
647    
648        protected void dispatch(final ConnectionContext context, Message message) throws Exception {
649            // AMQ-2586: Better to leave this stat at zero than to give the user
650            // misleading metrics.
651            // destinationStatistics.getMessages().increment();
652            destinationStatistics.getEnqueues().increment();
653            MessageEvaluationContext msgContext = null;
654    
655            dispatchLock.readLock().lock();
656            try {
657                if (!subscriptionRecoveryPolicy.add(context, message)) {
658                    return;
659                }
660                synchronized (consumers) {
661                    if (consumers.isEmpty()) {
662                        onMessageWithNoConsumers(context, message);
663                        return;
664                    }
665                }
666                msgContext = context.getMessageEvaluationContext();
667                msgContext.setDestination(destination);
668                msgContext.setMessageReference(message);
669                if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
670                    onMessageWithNoConsumers(context, message);
671                }
672    
673            } finally {
674                dispatchLock.readLock().unlock();
675                if (msgContext != null) {
676                    msgContext.clear();
677                }
678            }
679        }
680    
681        private final Runnable expireMessagesTask = new Runnable() {
682            public void run() {
683                List<Message> browsedMessages = new InsertionCountList<Message>();
684                doBrowse(browsedMessages, getMaxExpirePageSize());
685            }
686        };
687    
688        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
689            broker.messageExpired(context, reference, subs);
690            // AMQ-2586: Better to leave this stat at zero than to give the user
691            // misleading metrics.
692            // destinationStatistics.getMessages().decrement();
693            destinationStatistics.getEnqueues().decrement();
694            destinationStatistics.getExpired().increment();
695            MessageAck ack = new MessageAck();
696            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
697            ack.setDestination(destination);
698            ack.setMessageID(reference.getMessageId());
699            try {
700                if (subs instanceof DurableTopicSubscription) {
701                    ((DurableTopicSubscription)subs).removePending(reference);
702                }
703                acknowledge(context, subs, ack, reference);
704            } catch (Exception e) {
705                LOG.error("Failed to remove expired Message from the store ", e);
706            }
707        }
708    
709        @Override
710        protected Logger getLog() {
711            return LOG;
712        }
713    
714        protected boolean isOptimizeStorage(){
715            boolean result = false;
716    
717            if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
718                    result = true;
719                    for (DurableTopicSubscription s : durableSubcribers.values()) {
720                        if (s.isActive()== false){
721                            result = false;
722                            break;
723                        }
724                        if (s.getPrefetchSize()==0){
725                            result = false;
726                            break;
727                        }
728                        if (s.isSlowConsumer()){
729                            result = false;
730                            break;
731                        }
732                        if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
733                            result = false;
734                            break;
735                        }
736                    }
737            }
738            return result;
739        }
740    }