001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021import javax.jms.ResourceAllocationException;
022import org.apache.activemq.advisory.AdvisorySupport;
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.BrokerService;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ProducerBrokerExchange;
027import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQTopic;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.command.MessageDispatchNotification;
033import org.apache.activemq.command.ProducerInfo;
034import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
035import org.apache.activemq.security.SecurityContext;
036import org.apache.activemq.state.ProducerState;
037import org.apache.activemq.store.MessageStore;
038import org.apache.activemq.thread.Scheduler;
039import org.apache.activemq.usage.MemoryUsage;
040import org.apache.activemq.usage.SystemUsage;
041import org.apache.activemq.usage.Usage;
042import org.slf4j.Logger;
043
044/**
045 *
046 */
047public abstract class BaseDestination implements Destination {
048    /**
049     * The maximum number of messages to page in to the destination from
050     * persistent storage
051     */
052    public static final int MAX_PAGE_SIZE = 200;
053    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
054    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
055    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
056    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
057    public static final int MAX_AUDIT_DEPTH = 2048;
058
059    protected final ActiveMQDestination destination;
060    protected final Broker broker;
061    protected final MessageStore store;
062    protected SystemUsage systemUsage;
063    protected MemoryUsage memoryUsage;
064    private boolean producerFlowControl = true;
065    private boolean alwaysRetroactive = false;
066    protected boolean warnOnProducerFlowControl = true;
067    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
068
069    private int maxProducersToAudit = 1024;
070    private int maxAuditDepth = 2048;
071    private boolean enableAudit = true;
072    private int maxPageSize = MAX_PAGE_SIZE;
073    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
074    private boolean useCache = true;
075    private int minimumMessageSize = 1024;
076    private boolean lazyDispatch = false;
077    private boolean advisoryForSlowConsumers;
078    private boolean advisdoryForFastProducers;
079    private boolean advisoryForDiscardingMessages;
080    private boolean advisoryWhenFull;
081    private boolean advisoryForDelivery;
082    private boolean advisoryForConsumed;
083    private boolean sendAdvisoryIfNoConsumers;
084    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
085    protected final BrokerService brokerService;
086    protected final Broker regionBroker;
087    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
088    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
089    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
090    protected int cursorMemoryHighWaterMark = 70;
091    protected int storeUsageHighWaterMark = 100;
092    private SlowConsumerStrategy slowConsumerStrategy;
093    private boolean prioritizedMessages;
094    private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
095    private boolean gcIfInactive;
096    private boolean gcWithNetworkConsumers;
097    private long lastActiveTime=0l;
098    private boolean reduceMemoryFootprint = false;
099    protected final Scheduler scheduler;
100    private boolean disposed = false;
101    private boolean doOptimzeMessageStorage = true;
102    /*
103     * percentage of in-flight messages above which optimize message store is disabled
104     */
105    private int optimizeMessageStoreInFlightLimit = 10;
106
107    /**
108     * @param brokerService
109     * @param store
110     * @param destination
111     * @param parentStats
112     * @throws Exception
113     */
114    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
115        this.brokerService = brokerService;
116        this.broker = brokerService.getBroker();
117        this.store = store;
118        this.destination = destination;
119        // let's copy the enabled property from the parent DestinationStatistics
120        this.destinationStatistics.setEnabled(parentStats.isEnabled());
121        this.destinationStatistics.setParent(parentStats);
122        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
123        this.memoryUsage = this.systemUsage.getMemoryUsage();
124        this.memoryUsage.setUsagePortion(1.0f);
125        this.regionBroker = brokerService.getRegionBroker();
126        this.scheduler = brokerService.getBroker().getScheduler();
127    }
128
129    /**
130     * initialize the destination
131     *
132     * @throws Exception
133     */
134    public void initialize() throws Exception {
135        // Let the store know what usage manager we are using so that he can
136        // flush messages to disk when usage gets high.
137        if (store != null) {
138            store.setMemoryUsage(this.memoryUsage);
139        }
140    }
141
142    /**
143     * @return the producerFlowControl
144     */
145    public boolean isProducerFlowControl() {
146        return producerFlowControl;
147    }
148
149    /**
150     * @param producerFlowControl the producerFlowControl to set
151     */
152    public void setProducerFlowControl(boolean producerFlowControl) {
153        this.producerFlowControl = producerFlowControl;
154    }
155
156    public boolean isAlwaysRetroactive() {
157        return alwaysRetroactive;
158    }
159
160    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
161        this.alwaysRetroactive = alwaysRetroactive;
162    }
163
164    /**
165     * Set's the interval at which warnings about producers being blocked by
166     * resource usage will be triggered. Values of 0 or less will disable
167     * warnings
168     *
169     * @param blockedProducerWarningInterval the interval at which warning about
170     *            blocked producers will be triggered.
171     */
172    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
173        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
174    }
175
176    /**
177     *
178     * @return the interval at which warning about blocked producers will be
179     *         triggered.
180     */
181    public long getBlockedProducerWarningInterval() {
182        return blockedProducerWarningInterval;
183    }
184
185    /**
186     * @return the maxProducersToAudit
187     */
188    public int getMaxProducersToAudit() {
189        return maxProducersToAudit;
190    }
191
192    /**
193     * @param maxProducersToAudit the maxProducersToAudit to set
194     */
195    public void setMaxProducersToAudit(int maxProducersToAudit) {
196        this.maxProducersToAudit = maxProducersToAudit;
197    }
198
199    /**
200     * @return the maxAuditDepth
201     */
202    public int getMaxAuditDepth() {
203        return maxAuditDepth;
204    }
205
206    /**
207     * @param maxAuditDepth the maxAuditDepth to set
208     */
209    public void setMaxAuditDepth(int maxAuditDepth) {
210        this.maxAuditDepth = maxAuditDepth;
211    }
212
213    /**
214     * @return the enableAudit
215     */
216    public boolean isEnableAudit() {
217        return enableAudit;
218    }
219
220    /**
221     * @param enableAudit the enableAudit to set
222     */
223    public void setEnableAudit(boolean enableAudit) {
224        this.enableAudit = enableAudit;
225    }
226
227    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
228        destinationStatistics.getProducers().increment();
229        this.lastActiveTime=0l;
230    }
231
232    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
233        destinationStatistics.getProducers().decrement();
234    }
235
236    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
237        destinationStatistics.getConsumers().increment();
238        this.lastActiveTime=0l;
239    }
240
241    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
242        destinationStatistics.getConsumers().decrement();
243    }
244
245
246    public final MemoryUsage getMemoryUsage() {
247        return memoryUsage;
248    }
249
250    public DestinationStatistics getDestinationStatistics() {
251        return destinationStatistics;
252    }
253
254    public ActiveMQDestination getActiveMQDestination() {
255        return destination;
256    }
257
258    public final String getName() {
259        return getActiveMQDestination().getPhysicalName();
260    }
261
262    public final MessageStore getMessageStore() {
263        return store;
264    }
265
266    public boolean isActive() {
267        boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
268                           destinationStatistics.getProducers().getCount() != 0;
269        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
270            isActive = hasRegularConsumers(getConsumers());
271        }
272        return isActive;
273    }
274
275    public int getMaxPageSize() {
276        return maxPageSize;
277    }
278
279    public void setMaxPageSize(int maxPageSize) {
280        this.maxPageSize = maxPageSize;
281    }
282
283    public int getMaxBrowsePageSize() {
284        return this.maxBrowsePageSize;
285    }
286
287    public void setMaxBrowsePageSize(int maxPageSize) {
288        this.maxBrowsePageSize = maxPageSize;
289    }
290
291    public int getMaxExpirePageSize() {
292        return this.maxExpirePageSize;
293    }
294
295    public void setMaxExpirePageSize(int maxPageSize) {
296        this.maxExpirePageSize = maxPageSize;
297    }
298
299    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
300        this.expireMessagesPeriod = expireMessagesPeriod;
301    }
302
303    public long getExpireMessagesPeriod() {
304        return expireMessagesPeriod;
305    }
306
307    public boolean isUseCache() {
308        return useCache;
309    }
310
311    public void setUseCache(boolean useCache) {
312        this.useCache = useCache;
313    }
314
315    public int getMinimumMessageSize() {
316        return minimumMessageSize;
317    }
318
319    public void setMinimumMessageSize(int minimumMessageSize) {
320        this.minimumMessageSize = minimumMessageSize;
321    }
322
323    public boolean isLazyDispatch() {
324        return lazyDispatch;
325    }
326
327    public void setLazyDispatch(boolean lazyDispatch) {
328        this.lazyDispatch = lazyDispatch;
329    }
330
331    protected long getDestinationSequenceId() {
332        return regionBroker.getBrokerSequenceId();
333    }
334
335    /**
336     * @return the advisoryForSlowConsumers
337     */
338    public boolean isAdvisoryForSlowConsumers() {
339        return advisoryForSlowConsumers;
340    }
341
342    /**
343     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
344     */
345    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
346        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
347    }
348
349    /**
350     * @return the advisoryForDiscardingMessages
351     */
352    public boolean isAdvisoryForDiscardingMessages() {
353        return advisoryForDiscardingMessages;
354    }
355
356    /**
357     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
358     *            set
359     */
360    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
361        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
362    }
363
364    /**
365     * @return the advisoryWhenFull
366     */
367    public boolean isAdvisoryWhenFull() {
368        return advisoryWhenFull;
369    }
370
371    /**
372     * @param advisoryWhenFull the advisoryWhenFull to set
373     */
374    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
375        this.advisoryWhenFull = advisoryWhenFull;
376    }
377
378    /**
379     * @return the advisoryForDelivery
380     */
381    public boolean isAdvisoryForDelivery() {
382        return advisoryForDelivery;
383    }
384
385    /**
386     * @param advisoryForDelivery the advisoryForDelivery to set
387     */
388    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
389        this.advisoryForDelivery = advisoryForDelivery;
390    }
391
392    /**
393     * @return the advisoryForConsumed
394     */
395    public boolean isAdvisoryForConsumed() {
396        return advisoryForConsumed;
397    }
398
399    /**
400     * @param advisoryForConsumed the advisoryForConsumed to set
401     */
402    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
403        this.advisoryForConsumed = advisoryForConsumed;
404    }
405
406    /**
407     * @return the advisdoryForFastProducers
408     */
409    public boolean isAdvisdoryForFastProducers() {
410        return advisdoryForFastProducers;
411    }
412
413    /**
414     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
415     */
416    public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
417        this.advisdoryForFastProducers = advisdoryForFastProducers;
418    }
419
420    public boolean isSendAdvisoryIfNoConsumers() {
421        return sendAdvisoryIfNoConsumers;
422    }
423
424    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
425        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
426    }
427
428    /**
429     * @return the dead letter strategy
430     */
431    public DeadLetterStrategy getDeadLetterStrategy() {
432        return deadLetterStrategy;
433    }
434
435    /**
436     * set the dead letter strategy
437     *
438     * @param deadLetterStrategy
439     */
440    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
441        this.deadLetterStrategy = deadLetterStrategy;
442    }
443
444    public int getCursorMemoryHighWaterMark() {
445        return this.cursorMemoryHighWaterMark;
446    }
447
448    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
449        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
450    }
451
452    /**
453     * called when message is consumed
454     *
455     * @param context
456     * @param messageReference
457     */
458    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
459        if (advisoryForConsumed) {
460            broker.messageConsumed(context, messageReference);
461        }
462    }
463
464    /**
465     * Called when message is delivered to the broker
466     *
467     * @param context
468     * @param messageReference
469     */
470    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
471        if (advisoryForDelivery) {
472            broker.messageDelivered(context, messageReference);
473        }
474    }
475
476    /**
477     * Called when a message is discarded - e.g. running low on memory This will
478     * happen only if the policy is enabled - e.g. non durable topics
479     *
480     * @param context
481     * @param messageReference
482     */
483    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
484        if (advisoryForDiscardingMessages) {
485            broker.messageDiscarded(context, sub, messageReference);
486        }
487    }
488
489    /**
490     * Called when there is a slow consumer
491     *
492     * @param context
493     * @param subs
494     */
495    public void slowConsumer(ConnectionContext context, Subscription subs) {
496        if (advisoryForSlowConsumers) {
497            broker.slowConsumer(context, this, subs);
498        }
499        if (slowConsumerStrategy != null) {
500            slowConsumerStrategy.slowConsumer(context, subs);
501        }
502    }
503
504    /**
505     * Called to notify a producer is too fast
506     *
507     * @param context
508     * @param producerInfo
509     */
510    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
511        if (advisdoryForFastProducers) {
512            broker.fastProducer(context, producerInfo);
513        }
514    }
515
516    /**
517     * Called when a Usage reaches a limit
518     *
519     * @param context
520     * @param usage
521     */
522    public void isFull(ConnectionContext context, Usage<?> usage) {
523        if (advisoryWhenFull) {
524            broker.isFull(context, this, usage);
525        }
526    }
527
528    public void dispose(ConnectionContext context) throws IOException {
529        if (this.store != null) {
530            this.store.removeAllMessages(context);
531            this.store.dispose(context);
532        }
533        this.destinationStatistics.setParent(null);
534        this.memoryUsage.stop();
535        this.disposed = true;
536    }
537
538    public boolean isDisposed() {
539        return this.disposed;
540    }
541
542    /**
543     * Provides a hook to allow messages with no consumer to be processed in
544     * some way - such as to send to a dead letter queue or something..
545     */
546    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
547        if (!msg.isPersistent()) {
548            if (isSendAdvisoryIfNoConsumers()) {
549                // allow messages with no consumers to be dispatched to a dead
550                // letter queue
551                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
552
553                    Message message = msg.copy();
554                    // The original destination and transaction id do not get
555                    // filled when the message is first sent,
556                    // it is only populated if the message is routed to another
557                    // destination like the DLQ
558                    if (message.getOriginalDestination() != null) {
559                        message.setOriginalDestination(message.getDestination());
560                    }
561                    if (message.getOriginalTransactionId() != null) {
562                        message.setOriginalTransactionId(message.getTransactionId());
563                    }
564
565                    ActiveMQTopic advisoryTopic;
566                    if (destination.isQueue()) {
567                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
568                    } else {
569                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
570                    }
571                    message.setDestination(advisoryTopic);
572                    message.setTransactionId(null);
573
574                    // Disable flow control for this since since we don't want
575                    // to block.
576                    boolean originalFlowControl = context.isProducerFlowControl();
577                    try {
578                        context.setProducerFlowControl(false);
579                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
580                        producerExchange.setMutable(false);
581                        producerExchange.setConnectionContext(context);
582                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
583                        context.getBroker().send(producerExchange, message);
584                    } finally {
585                        context.setProducerFlowControl(originalFlowControl);
586                    }
587
588                }
589            }
590        }
591    }
592
593    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
594    }
595
596    public final int getStoreUsageHighWaterMark() {
597        return this.storeUsageHighWaterMark;
598    }
599
600    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
601        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
602    }
603
604    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
605        waitForSpace(context, usage, 100, warning);
606    }
607
608    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
609        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
610            getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
611            throw new ResourceAllocationException(warning);
612        }
613        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
614            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
615                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
616                throw new ResourceAllocationException(warning);
617            }
618        } else {
619            long start = System.currentTimeMillis();
620            long nextWarn = start;
621            while (!usage.waitForSpace(1000, highWaterMark)) {
622                if (context.getStopping().get()) {
623                    throw new IOException("Connection closed, send aborted.");
624                }
625
626                long now = System.currentTimeMillis();
627                if (now >= nextWarn) {
628                    getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
629                    nextWarn = now + blockedProducerWarningInterval;
630                }
631            }
632        }
633    }
634
635    protected abstract Logger getLog();
636
637    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
638        this.slowConsumerStrategy = slowConsumerStrategy;
639    }
640
641    public SlowConsumerStrategy getSlowConsumerStrategy() {
642        return this.slowConsumerStrategy;
643    }
644
645
646    public boolean isPrioritizedMessages() {
647        return this.prioritizedMessages;
648    }
649
650    public void setPrioritizedMessages(boolean prioritizedMessages) {
651        this.prioritizedMessages = prioritizedMessages;
652        if (store != null) {
653            store.setPrioritizedMessages(prioritizedMessages);
654        }
655    }
656
657    /**
658     * @return the inactiveTimoutBeforeGC
659     */
660    public long getInactiveTimoutBeforeGC() {
661        return this.inactiveTimoutBeforeGC;
662    }
663
664    /**
665     * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
666     */
667    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
668        this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
669    }
670
671    /**
672     * @return the gcIfInactive
673     */
674    public boolean isGcIfInactive() {
675        return this.gcIfInactive;
676    }
677
678    /**
679     * @param gcIfInactive the gcIfInactive to set
680     */
681    public void setGcIfInactive(boolean gcIfInactive) {
682        this.gcIfInactive = gcIfInactive;
683    }
684
685    /**
686     * Indicate if it is ok to gc destinations that have only network consumers
687     * @param gcWithNetworkConsumers
688     */
689    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
690        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
691    }
692
693    public boolean isGcWithNetworkConsumers() {
694        return gcWithNetworkConsumers;
695    }
696
697    public void markForGC(long timeStamp) {
698        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
699                && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
700            this.lastActiveTime = timeStamp;
701        }
702    }
703
704    public boolean canGC() {
705        boolean result = false;
706        if (isGcIfInactive()&& this.lastActiveTime != 0l) {
707            if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
708                result = true;
709            }
710        }
711        return result;
712    }
713
714    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
715        this.reduceMemoryFootprint = reduceMemoryFootprint;
716    }
717
718    protected boolean isReduceMemoryFootprint() {
719        return this.reduceMemoryFootprint;
720    }
721
722    public boolean isDoOptimzeMessageStorage() {
723        return doOptimzeMessageStorage;
724    }
725
726    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
727        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
728    }
729
730    public int getOptimizeMessageStoreInFlightLimit() {
731        return optimizeMessageStoreInFlightLimit;
732    }
733
734    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
735        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
736    }
737
738
739    public abstract List<Subscription> getConsumers();
740
741    protected boolean hasRegularConsumers(List<Subscription> consumers) {
742        boolean hasRegularConsumers = false;
743        for (Subscription subscription: consumers) {
744            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
745                hasRegularConsumers = true;
746                break;
747            }
748        }
749        return hasRegularConsumers;
750    }
751
752    protected ConnectionContext createConnectionContext() {
753        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
754        answer.setBroker(this.broker);
755        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
756        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
757        return answer;
758    }
759}