001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.LinkedList; 022 import java.util.List; 023 import java.util.concurrent.CancellationException; 024 import java.util.concurrent.ConcurrentHashMap; 025 import java.util.concurrent.CopyOnWriteArrayList; 026 import java.util.concurrent.Future; 027 import java.util.concurrent.locks.ReentrantReadWriteLock; 028 029 import org.apache.activemq.broker.BrokerService; 030 import org.apache.activemq.broker.ConnectionContext; 031 import org.apache.activemq.broker.ProducerBrokerExchange; 032 import org.apache.activemq.broker.region.policy.DispatchPolicy; 033 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; 034 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 035 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 036 import org.apache.activemq.broker.util.InsertionCountList; 037 import org.apache.activemq.command.ActiveMQDestination; 038 import org.apache.activemq.command.ExceptionResponse; 039 import org.apache.activemq.command.Message; 040 import org.apache.activemq.command.MessageAck; 041 import org.apache.activemq.command.MessageId; 042 import org.apache.activemq.command.ProducerAck; 043 import org.apache.activemq.command.ProducerInfo; 044 import org.apache.activemq.command.Response; 045 import org.apache.activemq.command.SubscriptionInfo; 046 import org.apache.activemq.filter.MessageEvaluationContext; 047 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 048 import org.apache.activemq.store.MessageRecoveryListener; 049 import org.apache.activemq.store.TopicMessageStore; 050 import org.apache.activemq.thread.Task; 051 import org.apache.activemq.thread.TaskRunner; 052 import org.apache.activemq.thread.TaskRunnerFactory; 053 import org.apache.activemq.transaction.Synchronization; 054 import org.apache.activemq.util.SubscriptionKey; 055 import org.slf4j.Logger; 056 import org.slf4j.LoggerFactory; 057 058 /** 059 * The Topic is a destination that sends a copy of a message to every active 060 * Subscription registered. 061 */ 062 public class Topic extends BaseDestination implements Task { 063 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 064 private final TopicMessageStore topicStore; 065 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 066 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 067 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 068 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 069 private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 070 private final TaskRunner taskRunner; 071 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 072 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 073 public void run() { 074 try { 075 Topic.this.taskRunner.wakeup(); 076 } catch (InterruptedException e) { 077 } 078 }; 079 }; 080 081 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 082 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 083 super(brokerService, store, destination, parentStats); 084 this.topicStore = store; 085 // set default subscription recovery policy 086 subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); 087 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 088 } 089 090 @Override 091 public void initialize() throws Exception { 092 super.initialize(); 093 if (store != null) { 094 // AMQ-2586: Better to leave this stat at zero than to give the user 095 // misleading metrics. 096 // int messageCount = store.getMessageCount(); 097 // destinationStatistics.getMessages().setCount(messageCount); 098 } 099 } 100 101 public List<Subscription> getConsumers() { 102 synchronized (consumers) { 103 return new ArrayList<Subscription>(consumers); 104 } 105 } 106 107 public boolean lock(MessageReference node, LockOwner sub) { 108 return true; 109 } 110 111 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 112 113 super.addSubscription(context, sub); 114 115 if (!sub.getConsumerInfo().isDurable()) { 116 117 // Do a retroactive recovery if needed. 118 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 119 120 // synchronize with dispatch method so that no new messages are sent 121 // while we are recovering a subscription to avoid out of order messages. 122 dispatchLock.writeLock().lock(); 123 try { 124 synchronized (consumers) { 125 sub.add(context, this); 126 consumers.add(sub); 127 } 128 subscriptionRecoveryPolicy.recover(context, this, sub); 129 } finally { 130 dispatchLock.writeLock().unlock(); 131 } 132 133 } else { 134 synchronized (consumers) { 135 sub.add(context, this); 136 consumers.add(sub); 137 } 138 } 139 } else { 140 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 141 sub.add(context, this); 142 if(dsub.isActive()) { 143 synchronized (consumers) { 144 boolean hasSubscription = false; 145 146 if(consumers.size()==0) { 147 hasSubscription = false; 148 } else { 149 for(Subscription currentSub : consumers) { 150 if(currentSub.getConsumerInfo().isDurable()) { 151 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 152 if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 153 hasSubscription = true; 154 break; 155 } 156 } 157 } 158 } 159 160 if(!hasSubscription) 161 consumers.add(sub); 162 } 163 } 164 durableSubcribers.put(dsub.getSubscriptionKey(), dsub); 165 } 166 } 167 168 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) 169 throws Exception { 170 if (!sub.getConsumerInfo().isDurable()) { 171 super.removeSubscription(context, sub, lastDeliveredSequenceId); 172 synchronized (consumers) { 173 consumers.remove(sub); 174 } 175 } 176 sub.remove(context, this); 177 } 178 179 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 180 if (topicStore != null) { 181 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 182 DurableTopicSubscription removed = durableSubcribers.remove(key); 183 if (removed != null) { 184 destinationStatistics.getConsumers().decrement(); 185 // deactivate and remove 186 removed.deactivate(false); 187 consumers.remove(removed); 188 } 189 } 190 } 191 192 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 193 // synchronize with dispatch method so that no new messages are sent 194 // while we are recovering a subscription to avoid out of order messages. 195 dispatchLock.writeLock().lock(); 196 try { 197 198 if (topicStore == null) { 199 return; 200 } 201 202 // Recover the durable subscription. 203 String clientId = subscription.getSubscriptionKey().getClientId(); 204 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 205 String selector = subscription.getConsumerInfo().getSelector(); 206 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 207 if (info != null) { 208 // Check to see if selector changed. 209 String s1 = info.getSelector(); 210 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 211 // Need to delete the subscription 212 topicStore.deleteSubscription(clientId, subscriptionName); 213 info = null; 214 } else { 215 synchronized (consumers) { 216 consumers.add(subscription); 217 } 218 } 219 } 220 221 // Do we need to create the subscription? 222 if (info == null) { 223 info = new SubscriptionInfo(); 224 info.setClientId(clientId); 225 info.setSelector(selector); 226 info.setSubscriptionName(subscriptionName); 227 info.setDestination(getActiveMQDestination()); 228 // This destination is an actual destination id. 229 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 230 // This destination might be a pattern 231 synchronized (consumers) { 232 consumers.add(subscription); 233 topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive()); 234 } 235 } 236 237 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 238 msgContext.setDestination(destination); 239 if (subscription.isRecoveryRequired()) { 240 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 241 public boolean recoverMessage(Message message) throws Exception { 242 message.setRegionDestination(Topic.this); 243 try { 244 msgContext.setMessageReference(message); 245 if (subscription.matches(message, msgContext)) { 246 subscription.add(message); 247 } 248 } catch (IOException e) { 249 LOG.error("Failed to recover this message " + message); 250 } 251 return true; 252 } 253 254 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 255 throw new RuntimeException("Should not be called."); 256 } 257 258 public boolean hasSpace() { 259 return true; 260 } 261 262 public boolean isDuplicate(MessageId id) { 263 return false; 264 } 265 }); 266 } 267 } finally { 268 dispatchLock.writeLock().unlock(); 269 } 270 } 271 272 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception { 273 synchronized (consumers) { 274 consumers.remove(sub); 275 } 276 sub.remove(context, this); 277 } 278 279 protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 280 if (subscription.getConsumerInfo().isRetroactive()) { 281 subscriptionRecoveryPolicy.recover(context, this, subscription); 282 } 283 } 284 285 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 286 final ConnectionContext context = producerExchange.getConnectionContext(); 287 288 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 289 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 290 && !context.isInRecoveryMode(); 291 292 // There is delay between the client sending it and it arriving at the 293 // destination.. it may have expired. 294 if (message.isExpired()) { 295 broker.messageExpired(context, message, null); 296 getDestinationStatistics().getExpired().increment(); 297 if (sendProducerAck) { 298 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 299 context.getConnection().dispatchAsync(ack); 300 } 301 return; 302 } 303 304 if (memoryUsage.isFull()) { 305 isFull(context, memoryUsage); 306 fastProducer(context, producerInfo); 307 308 if (isProducerFlowControl() && context.isProducerFlowControl()) { 309 310 if (warnOnProducerFlowControl) { 311 warnOnProducerFlowControl = false; 312 LOG.info(memoryUsage + ", Usage Manager memory limit reached for " 313 + getActiveMQDestination().getQualifiedName() 314 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." 315 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 316 } 317 318 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 319 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 320 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 321 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 322 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 323 } 324 325 // We can avoid blocking due to low usage if the producer is 326 // sending 327 // a sync message or 328 // if it is using a producer window 329 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 330 synchronized (messagesWaitingForSpace) { 331 messagesWaitingForSpace.add(new Runnable() { 332 public void run() { 333 try { 334 335 // While waiting for space to free up... the 336 // message may have expired. 337 if (message.isExpired()) { 338 broker.messageExpired(context, message, null); 339 getDestinationStatistics().getExpired().increment(); 340 } else { 341 doMessageSend(producerExchange, message); 342 } 343 344 if (sendProducerAck) { 345 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 346 .getSize()); 347 context.getConnection().dispatchAsync(ack); 348 } else { 349 Response response = new Response(); 350 response.setCorrelationId(message.getCommandId()); 351 context.getConnection().dispatchAsync(response); 352 } 353 354 } catch (Exception e) { 355 if (!sendProducerAck && !context.isInRecoveryMode()) { 356 ExceptionResponse response = new ExceptionResponse(e); 357 response.setCorrelationId(message.getCommandId()); 358 context.getConnection().dispatchAsync(response); 359 } 360 } 361 362 } 363 }); 364 365 registerCallbackForNotFullNotification(); 366 context.setDontSendReponse(true); 367 return; 368 } 369 370 } else { 371 // Producer flow control cannot be used, so we have do the 372 // flow 373 // control at the broker 374 // by blocking this thread until there is space available. 375 376 if (memoryUsage.isFull()) { 377 if (context.isInTransaction()) { 378 379 int count = 0; 380 while (!memoryUsage.waitForSpace(1000)) { 381 if (context.getStopping().get()) { 382 throw new IOException("Connection closed, send aborted."); 383 } 384 if (count > 2 && context.isInTransaction()) { 385 count = 0; 386 int size = context.getTransaction().size(); 387 LOG.warn("Waiting for space to send transacted message - transaction elements = " 388 + size + " need more space to commit. Message = " + message); 389 } 390 } 391 } else { 392 waitForSpace( 393 context, 394 memoryUsage, 395 "Usage Manager Memory Usage limit reached. Stopping producer (" 396 + message.getProducerId() 397 + ") to prevent flooding " 398 + getActiveMQDestination().getQualifiedName() 399 + "." 400 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 401 } 402 } 403 404 // The usage manager could have delayed us by the time 405 // we unblock the message could have expired.. 406 if (message.isExpired()) { 407 getDestinationStatistics().getExpired().increment(); 408 if (LOG.isDebugEnabled()) { 409 LOG.debug("Expired message: " + message); 410 } 411 return; 412 } 413 } 414 } 415 } 416 417 doMessageSend(producerExchange, message); 418 messageDelivered(context, message); 419 if (sendProducerAck) { 420 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 421 context.getConnection().dispatchAsync(ack); 422 } 423 } 424 425 /** 426 * do send the message - this needs to be synchronized to ensure messages 427 * are stored AND dispatched in the right order 428 * 429 * @param producerExchange 430 * @param message 431 * @throws IOException 432 * @throws Exception 433 */ 434 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 435 throws IOException, Exception { 436 final ConnectionContext context = producerExchange.getConnectionContext(); 437 message.setRegionDestination(this); 438 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 439 Future<Object> result = null; 440 441 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 442 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 443 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 444 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 445 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 446 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 447 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 448 throw new javax.jms.ResourceAllocationException(logMessage); 449 } 450 451 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 452 } 453 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 454 } 455 456 message.incrementReferenceCount(); 457 458 if (context.isInTransaction()) { 459 context.getTransaction().addSynchronization(new Synchronization() { 460 @Override 461 public void afterCommit() throws Exception { 462 // It could take while before we receive the commit 463 // operation.. by that time the message could have 464 // expired.. 465 if (broker.isExpired(message)) { 466 getDestinationStatistics().getExpired().increment(); 467 broker.messageExpired(context, message, null); 468 message.decrementReferenceCount(); 469 return; 470 } 471 try { 472 dispatch(context, message); 473 } finally { 474 message.decrementReferenceCount(); 475 } 476 } 477 }); 478 479 } else { 480 try { 481 dispatch(context, message); 482 } finally { 483 message.decrementReferenceCount(); 484 } 485 } 486 487 if (result != null && !result.isCancelled()) { 488 try { 489 result.get(); 490 } catch (CancellationException e) { 491 // ignore - the task has been cancelled if the message 492 // has already been deleted 493 } 494 } 495 } 496 497 private boolean canOptimizeOutPersistence() { 498 return durableSubcribers.size() == 0; 499 } 500 501 @Override 502 public String toString() { 503 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 504 } 505 506 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 507 final MessageReference node) throws IOException { 508 if (topicStore != null && node.isPersistent()) { 509 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 510 SubscriptionKey key = dsub.getSubscriptionKey(); 511 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack); 512 } 513 messageConsumed(context, node); 514 } 515 516 public void gc() { 517 } 518 519 public Message loadMessage(MessageId messageId) throws IOException { 520 return topicStore != null ? topicStore.getMessage(messageId) : null; 521 } 522 523 public void start() throws Exception { 524 this.subscriptionRecoveryPolicy.start(); 525 if (memoryUsage != null) { 526 memoryUsage.start(); 527 } 528 529 if (getExpireMessagesPeriod() > 0) { 530 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); 531 } 532 } 533 534 public void stop() throws Exception { 535 if (taskRunner != null) { 536 taskRunner.shutdown(); 537 } 538 this.subscriptionRecoveryPolicy.stop(); 539 if (memoryUsage != null) { 540 memoryUsage.stop(); 541 } 542 if (this.topicStore != null) { 543 this.topicStore.stop(); 544 } 545 546 scheduler.cancel(expireMessagesTask); 547 } 548 549 public Message[] browse() { 550 final List<Message> result = new ArrayList<Message>(); 551 doBrowse(result, getMaxBrowsePageSize()); 552 return result.toArray(new Message[result.size()]); 553 } 554 555 private void doBrowse(final List<Message> browseList, final int max) { 556 try { 557 if (topicStore != null) { 558 final List<Message> toExpire = new ArrayList<Message>(); 559 topicStore.recover(new MessageRecoveryListener() { 560 public boolean recoverMessage(Message message) throws Exception { 561 if (message.isExpired()) { 562 toExpire.add(message); 563 } 564 browseList.add(message); 565 return true; 566 } 567 568 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 569 return true; 570 } 571 572 public boolean hasSpace() { 573 return browseList.size() < max; 574 } 575 576 public boolean isDuplicate(MessageId id) { 577 return false; 578 } 579 }); 580 final ConnectionContext connectionContext = createConnectionContext(); 581 for (Message message : toExpire) { 582 for (DurableTopicSubscription sub : durableSubcribers.values()) { 583 if (!sub.isActive()) { 584 messageExpired(connectionContext, sub, message); 585 } 586 } 587 } 588 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 589 if (msgs != null) { 590 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 591 browseList.add(msgs[i]); 592 } 593 } 594 } 595 } catch (Throwable e) { 596 LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e); 597 } 598 } 599 600 public boolean iterate() { 601 synchronized (messagesWaitingForSpace) { 602 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 603 Runnable op = messagesWaitingForSpace.removeFirst(); 604 op.run(); 605 } 606 607 if (!messagesWaitingForSpace.isEmpty()) { 608 registerCallbackForNotFullNotification(); 609 } 610 } 611 return false; 612 } 613 614 private void registerCallbackForNotFullNotification() { 615 // If the usage manager is not full, then the task will not 616 // get called.. 617 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 618 // so call it directly here. 619 sendMessagesWaitingForSpaceTask.run(); 620 } 621 } 622 623 // Properties 624 // ------------------------------------------------------------------------- 625 626 public DispatchPolicy getDispatchPolicy() { 627 return dispatchPolicy; 628 } 629 630 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 631 this.dispatchPolicy = dispatchPolicy; 632 } 633 634 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 635 return subscriptionRecoveryPolicy; 636 } 637 638 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 639 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 640 } 641 642 // Implementation methods 643 // ------------------------------------------------------------------------- 644 645 public final void wakeup() { 646 } 647 648 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 649 // AMQ-2586: Better to leave this stat at zero than to give the user 650 // misleading metrics. 651 // destinationStatistics.getMessages().increment(); 652 destinationStatistics.getEnqueues().increment(); 653 MessageEvaluationContext msgContext = null; 654 655 dispatchLock.readLock().lock(); 656 try { 657 if (!subscriptionRecoveryPolicy.add(context, message)) { 658 return; 659 } 660 synchronized (consumers) { 661 if (consumers.isEmpty()) { 662 onMessageWithNoConsumers(context, message); 663 return; 664 } 665 } 666 msgContext = context.getMessageEvaluationContext(); 667 msgContext.setDestination(destination); 668 msgContext.setMessageReference(message); 669 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 670 onMessageWithNoConsumers(context, message); 671 } 672 673 } finally { 674 dispatchLock.readLock().unlock(); 675 if (msgContext != null) { 676 msgContext.clear(); 677 } 678 } 679 } 680 681 private final Runnable expireMessagesTask = new Runnable() { 682 public void run() { 683 List<Message> browsedMessages = new InsertionCountList<Message>(); 684 doBrowse(browsedMessages, getMaxExpirePageSize()); 685 } 686 }; 687 688 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 689 broker.messageExpired(context, reference, subs); 690 // AMQ-2586: Better to leave this stat at zero than to give the user 691 // misleading metrics. 692 // destinationStatistics.getMessages().decrement(); 693 destinationStatistics.getEnqueues().decrement(); 694 destinationStatistics.getExpired().increment(); 695 MessageAck ack = new MessageAck(); 696 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 697 ack.setDestination(destination); 698 ack.setMessageID(reference.getMessageId()); 699 try { 700 if (subs instanceof DurableTopicSubscription) { 701 ((DurableTopicSubscription)subs).removePending(reference); 702 } 703 acknowledge(context, subs, ack, reference); 704 } catch (Exception e) { 705 LOG.error("Failed to remove expired Message from the store ", e); 706 } 707 } 708 709 @Override 710 protected Logger getLog() { 711 return LOG; 712 } 713 714 protected boolean isOptimizeStorage(){ 715 boolean result = false; 716 717 if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){ 718 result = true; 719 for (DurableTopicSubscription s : durableSubcribers.values()) { 720 if (s.isActive()== false){ 721 result = false; 722 break; 723 } 724 if (s.getPrefetchSize()==0){ 725 result = false; 726 break; 727 } 728 if (s.isSlowConsumer()){ 729 result = false; 730 break; 731 } 732 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 733 result = false; 734 break; 735 } 736 } 737 } 738 return result; 739 } 740 }