001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.List; 021 import javax.jms.ResourceAllocationException; 022 import org.apache.activemq.advisory.AdvisorySupport; 023 import org.apache.activemq.broker.Broker; 024 import org.apache.activemq.broker.BrokerService; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.broker.ProducerBrokerExchange; 027 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 028 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQTopic; 031 import org.apache.activemq.command.Message; 032 import org.apache.activemq.command.MessageDispatchNotification; 033 import org.apache.activemq.command.ProducerInfo; 034 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 035 import org.apache.activemq.security.SecurityContext; 036 import org.apache.activemq.state.ProducerState; 037 import org.apache.activemq.store.MessageStore; 038 import org.apache.activemq.thread.Scheduler; 039 import org.apache.activemq.usage.MemoryUsage; 040 import org.apache.activemq.usage.SystemUsage; 041 import org.apache.activemq.usage.Usage; 042 import org.slf4j.Logger; 043 044 /** 045 * 046 */ 047 public abstract class BaseDestination implements Destination { 048 /** 049 * The maximum number of messages to page in to the destination from 050 * persistent storage 051 */ 052 public static final int MAX_PAGE_SIZE = 200; 053 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 054 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 055 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 056 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 057 public static final int MAX_AUDIT_DEPTH = 2048; 058 059 protected final ActiveMQDestination destination; 060 protected final Broker broker; 061 protected final MessageStore store; 062 protected SystemUsage systemUsage; 063 protected MemoryUsage memoryUsage; 064 private boolean producerFlowControl = true; 065 private boolean alwaysRetroactive = false; 066 protected boolean warnOnProducerFlowControl = true; 067 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 068 069 private int maxProducersToAudit = 1024; 070 private int maxAuditDepth = 2048; 071 private boolean enableAudit = true; 072 private int maxPageSize = MAX_PAGE_SIZE; 073 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 074 private boolean useCache = true; 075 private int minimumMessageSize = 1024; 076 private boolean lazyDispatch = false; 077 private boolean advisoryForSlowConsumers; 078 private boolean advisdoryForFastProducers; 079 private boolean advisoryForDiscardingMessages; 080 private boolean advisoryWhenFull; 081 private boolean advisoryForDelivery; 082 private boolean advisoryForConsumed; 083 private boolean sendAdvisoryIfNoConsumers; 084 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 085 protected final BrokerService brokerService; 086 protected final Broker regionBroker; 087 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 088 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 089 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 090 protected int cursorMemoryHighWaterMark = 70; 091 protected int storeUsageHighWaterMark = 100; 092 private SlowConsumerStrategy slowConsumerStrategy; 093 private boolean prioritizedMessages; 094 private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 095 private boolean gcIfInactive; 096 private boolean gcWithNetworkConsumers; 097 private long lastActiveTime=0l; 098 private boolean reduceMemoryFootprint = false; 099 protected final Scheduler scheduler; 100 private boolean disposed = false; 101 private boolean doOptimzeMessageStorage = true; 102 /* 103 * percentage of in-flight messages above which optimize message store is disabled 104 */ 105 private int optimizeMessageStoreInFlightLimit = 10; 106 107 /** 108 * @param brokerService 109 * @param store 110 * @param destination 111 * @param parentStats 112 * @throws Exception 113 */ 114 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 115 this.brokerService = brokerService; 116 this.broker = brokerService.getBroker(); 117 this.store = store; 118 this.destination = destination; 119 // let's copy the enabled property from the parent DestinationStatistics 120 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 121 this.destinationStatistics.setParent(parentStats); 122 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 123 this.memoryUsage = this.systemUsage.getMemoryUsage(); 124 this.memoryUsage.setUsagePortion(1.0f); 125 this.regionBroker = brokerService.getRegionBroker(); 126 this.scheduler = brokerService.getBroker().getScheduler(); 127 } 128 129 /** 130 * initialize the destination 131 * 132 * @throws Exception 133 */ 134 public void initialize() throws Exception { 135 // Let the store know what usage manager we are using so that he can 136 // flush messages to disk when usage gets high. 137 if (store != null) { 138 store.setMemoryUsage(this.memoryUsage); 139 } 140 } 141 142 /** 143 * @return the producerFlowControl 144 */ 145 public boolean isProducerFlowControl() { 146 return producerFlowControl; 147 } 148 149 /** 150 * @param producerFlowControl the producerFlowControl to set 151 */ 152 public void setProducerFlowControl(boolean producerFlowControl) { 153 this.producerFlowControl = producerFlowControl; 154 } 155 156 public boolean isAlwaysRetroactive() { 157 return alwaysRetroactive; 158 } 159 160 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 161 this.alwaysRetroactive = alwaysRetroactive; 162 } 163 164 /** 165 * Set's the interval at which warnings about producers being blocked by 166 * resource usage will be triggered. Values of 0 or less will disable 167 * warnings 168 * 169 * @param blockedProducerWarningInterval the interval at which warning about 170 * blocked producers will be triggered. 171 */ 172 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 173 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 174 } 175 176 /** 177 * 178 * @return the interval at which warning about blocked producers will be 179 * triggered. 180 */ 181 public long getBlockedProducerWarningInterval() { 182 return blockedProducerWarningInterval; 183 } 184 185 /** 186 * @return the maxProducersToAudit 187 */ 188 public int getMaxProducersToAudit() { 189 return maxProducersToAudit; 190 } 191 192 /** 193 * @param maxProducersToAudit the maxProducersToAudit to set 194 */ 195 public void setMaxProducersToAudit(int maxProducersToAudit) { 196 this.maxProducersToAudit = maxProducersToAudit; 197 } 198 199 /** 200 * @return the maxAuditDepth 201 */ 202 public int getMaxAuditDepth() { 203 return maxAuditDepth; 204 } 205 206 /** 207 * @param maxAuditDepth the maxAuditDepth to set 208 */ 209 public void setMaxAuditDepth(int maxAuditDepth) { 210 this.maxAuditDepth = maxAuditDepth; 211 } 212 213 /** 214 * @return the enableAudit 215 */ 216 public boolean isEnableAudit() { 217 return enableAudit; 218 } 219 220 /** 221 * @param enableAudit the enableAudit to set 222 */ 223 public void setEnableAudit(boolean enableAudit) { 224 this.enableAudit = enableAudit; 225 } 226 227 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 228 destinationStatistics.getProducers().increment(); 229 this.lastActiveTime=0l; 230 } 231 232 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 233 destinationStatistics.getProducers().decrement(); 234 } 235 236 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 237 destinationStatistics.getConsumers().increment(); 238 this.lastActiveTime=0l; 239 } 240 241 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 242 destinationStatistics.getConsumers().decrement(); 243 } 244 245 246 public final MemoryUsage getMemoryUsage() { 247 return memoryUsage; 248 } 249 250 public DestinationStatistics getDestinationStatistics() { 251 return destinationStatistics; 252 } 253 254 public ActiveMQDestination getActiveMQDestination() { 255 return destination; 256 } 257 258 public final String getName() { 259 return getActiveMQDestination().getPhysicalName(); 260 } 261 262 public final MessageStore getMessageStore() { 263 return store; 264 } 265 266 public boolean isActive() { 267 boolean isActive = destinationStatistics.getConsumers().getCount() != 0 || 268 destinationStatistics.getProducers().getCount() != 0; 269 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) { 270 isActive = hasRegularConsumers(getConsumers()); 271 } 272 return isActive; 273 } 274 275 public int getMaxPageSize() { 276 return maxPageSize; 277 } 278 279 public void setMaxPageSize(int maxPageSize) { 280 this.maxPageSize = maxPageSize; 281 } 282 283 public int getMaxBrowsePageSize() { 284 return this.maxBrowsePageSize; 285 } 286 287 public void setMaxBrowsePageSize(int maxPageSize) { 288 this.maxBrowsePageSize = maxPageSize; 289 } 290 291 public int getMaxExpirePageSize() { 292 return this.maxExpirePageSize; 293 } 294 295 public void setMaxExpirePageSize(int maxPageSize) { 296 this.maxExpirePageSize = maxPageSize; 297 } 298 299 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 300 this.expireMessagesPeriod = expireMessagesPeriod; 301 } 302 303 public long getExpireMessagesPeriod() { 304 return expireMessagesPeriod; 305 } 306 307 public boolean isUseCache() { 308 return useCache; 309 } 310 311 public void setUseCache(boolean useCache) { 312 this.useCache = useCache; 313 } 314 315 public int getMinimumMessageSize() { 316 return minimumMessageSize; 317 } 318 319 public void setMinimumMessageSize(int minimumMessageSize) { 320 this.minimumMessageSize = minimumMessageSize; 321 } 322 323 public boolean isLazyDispatch() { 324 return lazyDispatch; 325 } 326 327 public void setLazyDispatch(boolean lazyDispatch) { 328 this.lazyDispatch = lazyDispatch; 329 } 330 331 protected long getDestinationSequenceId() { 332 return regionBroker.getBrokerSequenceId(); 333 } 334 335 /** 336 * @return the advisoryForSlowConsumers 337 */ 338 public boolean isAdvisoryForSlowConsumers() { 339 return advisoryForSlowConsumers; 340 } 341 342 /** 343 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 344 */ 345 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 346 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 347 } 348 349 /** 350 * @return the advisoryForDiscardingMessages 351 */ 352 public boolean isAdvisoryForDiscardingMessages() { 353 return advisoryForDiscardingMessages; 354 } 355 356 /** 357 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 358 * set 359 */ 360 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 361 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 362 } 363 364 /** 365 * @return the advisoryWhenFull 366 */ 367 public boolean isAdvisoryWhenFull() { 368 return advisoryWhenFull; 369 } 370 371 /** 372 * @param advisoryWhenFull the advisoryWhenFull to set 373 */ 374 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 375 this.advisoryWhenFull = advisoryWhenFull; 376 } 377 378 /** 379 * @return the advisoryForDelivery 380 */ 381 public boolean isAdvisoryForDelivery() { 382 return advisoryForDelivery; 383 } 384 385 /** 386 * @param advisoryForDelivery the advisoryForDelivery to set 387 */ 388 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 389 this.advisoryForDelivery = advisoryForDelivery; 390 } 391 392 /** 393 * @return the advisoryForConsumed 394 */ 395 public boolean isAdvisoryForConsumed() { 396 return advisoryForConsumed; 397 } 398 399 /** 400 * @param advisoryForConsumed the advisoryForConsumed to set 401 */ 402 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 403 this.advisoryForConsumed = advisoryForConsumed; 404 } 405 406 /** 407 * @return the advisdoryForFastProducers 408 */ 409 public boolean isAdvisdoryForFastProducers() { 410 return advisdoryForFastProducers; 411 } 412 413 /** 414 * @param advisdoryForFastProducers the advisdoryForFastProducers to set 415 */ 416 public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { 417 this.advisdoryForFastProducers = advisdoryForFastProducers; 418 } 419 420 public boolean isSendAdvisoryIfNoConsumers() { 421 return sendAdvisoryIfNoConsumers; 422 } 423 424 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 425 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 426 } 427 428 /** 429 * @return the dead letter strategy 430 */ 431 public DeadLetterStrategy getDeadLetterStrategy() { 432 return deadLetterStrategy; 433 } 434 435 /** 436 * set the dead letter strategy 437 * 438 * @param deadLetterStrategy 439 */ 440 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 441 this.deadLetterStrategy = deadLetterStrategy; 442 } 443 444 public int getCursorMemoryHighWaterMark() { 445 return this.cursorMemoryHighWaterMark; 446 } 447 448 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 449 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 450 } 451 452 /** 453 * called when message is consumed 454 * 455 * @param context 456 * @param messageReference 457 */ 458 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 459 if (advisoryForConsumed) { 460 broker.messageConsumed(context, messageReference); 461 } 462 } 463 464 /** 465 * Called when message is delivered to the broker 466 * 467 * @param context 468 * @param messageReference 469 */ 470 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 471 if (advisoryForDelivery) { 472 broker.messageDelivered(context, messageReference); 473 } 474 } 475 476 /** 477 * Called when a message is discarded - e.g. running low on memory This will 478 * happen only if the policy is enabled - e.g. non durable topics 479 * 480 * @param context 481 * @param messageReference 482 */ 483 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 484 if (advisoryForDiscardingMessages) { 485 broker.messageDiscarded(context, sub, messageReference); 486 } 487 } 488 489 /** 490 * Called when there is a slow consumer 491 * 492 * @param context 493 * @param subs 494 */ 495 public void slowConsumer(ConnectionContext context, Subscription subs) { 496 if (advisoryForSlowConsumers) { 497 broker.slowConsumer(context, this, subs); 498 } 499 if (slowConsumerStrategy != null) { 500 slowConsumerStrategy.slowConsumer(context, subs); 501 } 502 } 503 504 /** 505 * Called to notify a producer is too fast 506 * 507 * @param context 508 * @param producerInfo 509 */ 510 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 511 if (advisdoryForFastProducers) { 512 broker.fastProducer(context, producerInfo); 513 } 514 } 515 516 /** 517 * Called when a Usage reaches a limit 518 * 519 * @param context 520 * @param usage 521 */ 522 public void isFull(ConnectionContext context, Usage<?> usage) { 523 if (advisoryWhenFull) { 524 broker.isFull(context, this, usage); 525 } 526 } 527 528 public void dispose(ConnectionContext context) throws IOException { 529 if (this.store != null) { 530 this.store.removeAllMessages(context); 531 this.store.dispose(context); 532 } 533 this.destinationStatistics.setParent(null); 534 this.memoryUsage.stop(); 535 this.disposed = true; 536 } 537 538 public boolean isDisposed() { 539 return this.disposed; 540 } 541 542 /** 543 * Provides a hook to allow messages with no consumer to be processed in 544 * some way - such as to send to a dead letter queue or something.. 545 */ 546 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 547 if (!msg.isPersistent()) { 548 if (isSendAdvisoryIfNoConsumers()) { 549 // allow messages with no consumers to be dispatched to a dead 550 // letter queue 551 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 552 553 Message message = msg.copy(); 554 // The original destination and transaction id do not get 555 // filled when the message is first sent, 556 // it is only populated if the message is routed to another 557 // destination like the DLQ 558 if (message.getOriginalDestination() != null) { 559 message.setOriginalDestination(message.getDestination()); 560 } 561 if (message.getOriginalTransactionId() != null) { 562 message.setOriginalTransactionId(message.getTransactionId()); 563 } 564 565 ActiveMQTopic advisoryTopic; 566 if (destination.isQueue()) { 567 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 568 } else { 569 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 570 } 571 message.setDestination(advisoryTopic); 572 message.setTransactionId(null); 573 574 // Disable flow control for this since since we don't want 575 // to block. 576 boolean originalFlowControl = context.isProducerFlowControl(); 577 try { 578 context.setProducerFlowControl(false); 579 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 580 producerExchange.setMutable(false); 581 producerExchange.setConnectionContext(context); 582 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 583 context.getBroker().send(producerExchange, message); 584 } finally { 585 context.setProducerFlowControl(originalFlowControl); 586 } 587 588 } 589 } 590 } 591 } 592 593 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 594 } 595 596 public final int getStoreUsageHighWaterMark() { 597 return this.storeUsageHighWaterMark; 598 } 599 600 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 601 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 602 } 603 604 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 605 waitForSpace(context, usage, 100, warning); 606 } 607 608 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 609 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 610 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning); 611 throw new ResourceAllocationException(warning); 612 } 613 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 614 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 615 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning); 616 throw new ResourceAllocationException(warning); 617 } 618 } else { 619 long start = System.currentTimeMillis(); 620 long nextWarn = start; 621 while (!usage.waitForSpace(1000, highWaterMark)) { 622 if (context.getStopping().get()) { 623 throw new IOException("Connection closed, send aborted."); 624 } 625 626 long now = System.currentTimeMillis(); 627 if (now >= nextWarn) { 628 getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)"); 629 nextWarn = now + blockedProducerWarningInterval; 630 } 631 } 632 } 633 } 634 635 protected abstract Logger getLog(); 636 637 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 638 this.slowConsumerStrategy = slowConsumerStrategy; 639 } 640 641 public SlowConsumerStrategy getSlowConsumerStrategy() { 642 return this.slowConsumerStrategy; 643 } 644 645 646 public boolean isPrioritizedMessages() { 647 return this.prioritizedMessages; 648 } 649 650 public void setPrioritizedMessages(boolean prioritizedMessages) { 651 this.prioritizedMessages = prioritizedMessages; 652 if (store != null) { 653 store.setPrioritizedMessages(prioritizedMessages); 654 } 655 } 656 657 /** 658 * @return the inactiveTimoutBeforeGC 659 */ 660 public long getInactiveTimoutBeforeGC() { 661 return this.inactiveTimoutBeforeGC; 662 } 663 664 /** 665 * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set 666 */ 667 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { 668 this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC; 669 } 670 671 /** 672 * @return the gcIfInactive 673 */ 674 public boolean isGcIfInactive() { 675 return this.gcIfInactive; 676 } 677 678 /** 679 * @param gcIfInactive the gcIfInactive to set 680 */ 681 public void setGcIfInactive(boolean gcIfInactive) { 682 this.gcIfInactive = gcIfInactive; 683 } 684 685 /** 686 * Indicate if it is ok to gc destinations that have only network consumers 687 * @param gcWithNetworkConsumers 688 */ 689 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 690 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 691 } 692 693 public boolean isGcWithNetworkConsumers() { 694 return gcWithNetworkConsumers; 695 } 696 697 public void markForGC(long timeStamp) { 698 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 699 && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) { 700 this.lastActiveTime = timeStamp; 701 } 702 } 703 704 public boolean canGC() { 705 boolean result = false; 706 if (isGcIfInactive()&& this.lastActiveTime != 0l) { 707 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) { 708 result = true; 709 } 710 } 711 return result; 712 } 713 714 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 715 this.reduceMemoryFootprint = reduceMemoryFootprint; 716 } 717 718 protected boolean isReduceMemoryFootprint() { 719 return this.reduceMemoryFootprint; 720 } 721 722 public boolean isDoOptimzeMessageStorage() { 723 return doOptimzeMessageStorage; 724 } 725 726 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 727 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 728 } 729 730 public int getOptimizeMessageStoreInFlightLimit() { 731 return optimizeMessageStoreInFlightLimit; 732 } 733 734 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 735 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 736 } 737 738 739 public abstract List<Subscription> getConsumers(); 740 741 protected boolean hasRegularConsumers(List<Subscription> consumers) { 742 boolean hasRegularConsumers = false; 743 for (Subscription subscription: consumers) { 744 if (!subscription.getConsumerInfo().isNetworkSubscription()) { 745 hasRegularConsumers = true; 746 break; 747 } 748 } 749 return hasRegularConsumers; 750 } 751 752 protected ConnectionContext createConnectionContext() { 753 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); 754 answer.setBroker(this.broker); 755 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 756 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 757 return answer; 758 } 759 }