001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.CancellationException; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.DelayQueue; 035import java.util.concurrent.Delayed; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Future; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicLong; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReentrantLock; 042import java.util.concurrent.locks.ReentrantReadWriteLock; 043 044import javax.jms.InvalidSelectorException; 045import javax.jms.JMSException; 046import javax.jms.ResourceAllocationException; 047 048import org.apache.activemq.broker.BrokerService; 049import org.apache.activemq.broker.ConnectionContext; 050import org.apache.activemq.broker.ProducerBrokerExchange; 051import org.apache.activemq.broker.region.cursors.OrderedPendingList; 052import org.apache.activemq.broker.region.cursors.PendingList; 053import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 054import org.apache.activemq.broker.region.cursors.PrioritizedPendingList; 055import org.apache.activemq.broker.region.cursors.StoreQueueCursor; 056import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 057import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; 058import org.apache.activemq.broker.region.group.MessageGroupMap; 059import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 060import org.apache.activemq.broker.region.policy.DispatchPolicy; 061import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 062import org.apache.activemq.broker.util.InsertionCountList; 063import org.apache.activemq.command.ActiveMQDestination; 064import org.apache.activemq.command.ActiveMQMessage; 065import org.apache.activemq.command.ConsumerId; 066import org.apache.activemq.command.ExceptionResponse; 067import org.apache.activemq.command.Message; 068import org.apache.activemq.command.MessageAck; 069import org.apache.activemq.command.MessageDispatchNotification; 070import org.apache.activemq.command.MessageId; 071import org.apache.activemq.command.ProducerAck; 072import org.apache.activemq.command.ProducerInfo; 073import org.apache.activemq.command.Response; 074import org.apache.activemq.filter.BooleanExpression; 075import org.apache.activemq.filter.MessageEvaluationContext; 076import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 077import org.apache.activemq.selector.SelectorParser; 078import org.apache.activemq.state.ProducerState; 079import org.apache.activemq.store.MessageRecoveryListener; 080import org.apache.activemq.store.MessageStore; 081import org.apache.activemq.thread.Task; 082import org.apache.activemq.thread.TaskRunner; 083import org.apache.activemq.thread.TaskRunnerFactory; 084import org.apache.activemq.transaction.Synchronization; 085import org.apache.activemq.usage.Usage; 086import org.apache.activemq.usage.UsageListener; 087import org.apache.activemq.util.BrokerSupport; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090import org.slf4j.MDC; 091 092/** 093 * The Queue is a List of MessageEntry objects that are dispatched to matching 094 * subscriptions. 095 */ 096public class Queue extends BaseDestination implements Task, UsageListener { 097 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); 098 protected final TaskRunnerFactory taskFactory; 099 protected TaskRunner taskRunner; 100 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 101 protected final List<Subscription> consumers = new ArrayList<Subscription>(50); 102 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock(); 103 protected PendingMessageCursor messages; 104 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); 105 private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>(); 106 // Messages that are paged in but have not yet been targeted at a 107 // subscription 108 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); 109 protected PendingList pagedInPendingDispatch = new OrderedPendingList(); 110 protected PendingList redeliveredWaitingDispatch = new OrderedPendingList(); 111 private MessageGroupMap messageGroupOwners; 112 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); 113 private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); 114 final Lock sendLock = new ReentrantLock(); 115 private ExecutorService executor; 116 private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>(); 117 private boolean useConsumerPriority = true; 118 private boolean strictOrderDispatch = false; 119 private final QueueDispatchSelector dispatchSelector; 120 private boolean optimizedDispatch = false; 121 private boolean iterationRunning = false; 122 private boolean firstConsumer = false; 123 private int timeBeforeDispatchStarts = 0; 124 private int consumersBeforeDispatchStarts = 0; 125 private CountDownLatch consumersBeforeStartsLatch; 126 private final AtomicLong pendingWakeups = new AtomicLong(); 127 private boolean allConsumersExclusiveByDefault = false; 128 129 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 130 public void run() { 131 asyncWakeup(); 132 } 133 }; 134 private final Runnable expireMessagesTask = new Runnable() { 135 public void run() { 136 expireMessages(); 137 } 138 }; 139 140 private final Object iteratingMutex = new Object(); 141 142 class TimeoutMessage implements Delayed { 143 144 Message message; 145 ConnectionContext context; 146 long trigger; 147 148 public TimeoutMessage(Message message, ConnectionContext context, long delay) { 149 this.message = message; 150 this.context = context; 151 this.trigger = System.currentTimeMillis() + delay; 152 } 153 154 public long getDelay(TimeUnit unit) { 155 long n = trigger - System.currentTimeMillis(); 156 return unit.convert(n, TimeUnit.MILLISECONDS); 157 } 158 159 public int compareTo(Delayed delayed) { 160 long other = ((TimeoutMessage) delayed).trigger; 161 int returnValue; 162 if (this.trigger < other) { 163 returnValue = -1; 164 } else if (this.trigger > other) { 165 returnValue = 1; 166 } else { 167 returnValue = 0; 168 } 169 return returnValue; 170 } 171 172 } 173 174 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>(); 175 176 class FlowControlTimeoutTask extends Thread { 177 178 @Override 179 public void run() { 180 TimeoutMessage timeout; 181 try { 182 while (true) { 183 timeout = flowControlTimeoutMessages.take(); 184 if (timeout != null) { 185 synchronized (messagesWaitingForSpace) { 186 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) { 187 ExceptionResponse response = new ExceptionResponse( 188 new ResourceAllocationException( 189 "Usage Manager Memory Limit reached. Stopping producer (" 190 + timeout.message.getProducerId() 191 + ") to prevent flooding " 192 + getActiveMQDestination().getQualifiedName() 193 + "." 194 + " See http://activemq.apache.org/producer-flow-control.html for more info")); 195 response.setCorrelationId(timeout.message.getCommandId()); 196 timeout.context.getConnection().dispatchAsync(response); 197 } 198 } 199 } 200 } 201 } catch (InterruptedException e) { 202 if (LOG.isDebugEnabled()) { 203 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping"); 204 } 205 } 206 } 207 }; 208 209 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); 210 211 private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { 212 213 public int compare(Subscription s1, Subscription s2) { 214 // We want the list sorted in descending order 215 return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); 216 } 217 }; 218 219 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, 220 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 221 super(brokerService, store, destination, parentStats); 222 this.taskFactory = taskFactory; 223 this.dispatchSelector = new QueueDispatchSelector(destination); 224 } 225 226 public List<Subscription> getConsumers() { 227 consumersLock.readLock().lock(); 228 try { 229 return new ArrayList<Subscription>(consumers); 230 }finally { 231 consumersLock.readLock().unlock(); 232 } 233 } 234 235 // make the queue easily visible in the debugger from its task runner 236 // threads 237 final class QueueThread extends Thread { 238 final Queue queue; 239 240 public QueueThread(Runnable runnable, String name, Queue queue) { 241 super(runnable, name); 242 this.queue = queue; 243 } 244 } 245 246 class BatchMessageRecoveryListener implements MessageRecoveryListener { 247 final LinkedList<Message> toExpire = new LinkedList<Message>(); 248 final double totalMessageCount; 249 int recoveredAccumulator = 0; 250 int currentBatchCount; 251 252 BatchMessageRecoveryListener(int totalMessageCount) { 253 this.totalMessageCount = totalMessageCount; 254 currentBatchCount = recoveredAccumulator; 255 } 256 257 public boolean recoverMessage(Message message) { 258 recoveredAccumulator++; 259 if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) { 260 LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered " 261 + recoveredAccumulator + " messages. " + 262 (int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete"); 263 } 264 // Message could have expired while it was being 265 // loaded.. 266 if (message.isExpired() && broker.isExpired(message)) { 267 toExpire.add(message); 268 return true; 269 } 270 if (hasSpace()) { 271 message.setRegionDestination(Queue.this); 272 messagesLock.writeLock().lock(); 273 try { 274 try { 275 messages.addMessageLast(message); 276 } catch (Exception e) { 277 LOG.error("Failed to add message to cursor", e); 278 } 279 } finally { 280 messagesLock.writeLock().unlock(); 281 } 282 destinationStatistics.getMessages().increment(); 283 return true; 284 } 285 return false; 286 } 287 288 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 289 throw new RuntimeException("Should not be called."); 290 } 291 292 public boolean hasSpace() { 293 return true; 294 } 295 296 public boolean isDuplicate(MessageId id) { 297 return false; 298 } 299 300 public void reset() { 301 currentBatchCount = recoveredAccumulator; 302 } 303 304 public void processExpired() { 305 for (Message message: toExpire) { 306 messageExpired(createConnectionContext(), createMessageReference(message)); 307 // drop message will decrement so counter 308 // balance here 309 destinationStatistics.getMessages().increment(); 310 } 311 toExpire.clear(); 312 } 313 314 public boolean done() { 315 return currentBatchCount == recoveredAccumulator; 316 } 317 } 318 319 @Override 320 public void setPrioritizedMessages(boolean prioritizedMessages) { 321 super.setPrioritizedMessages(prioritizedMessages); 322 323 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { 324 pagedInPendingDispatch = new PrioritizedPendingList(); 325 redeliveredWaitingDispatch = new PrioritizedPendingList(); 326 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { 327 pagedInPendingDispatch = new OrderedPendingList(); 328 redeliveredWaitingDispatch = new OrderedPendingList(); 329 } 330 } 331 332 @Override 333 public void initialize() throws Exception { 334 335 if (this.messages == null) { 336 if (destination.isTemporary() || broker == null || store == null) { 337 this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); 338 } else { 339 this.messages = new StoreQueueCursor(broker, this); 340 } 341 } 342 343 // If a VMPendingMessageCursor don't use the default Producer System 344 // Usage 345 // since it turns into a shared blocking queue which can lead to a 346 // network deadlock. 347 // If we are cursoring to disk..it's not and issue because it does not 348 // block due 349 // to large disk sizes. 350 if (messages instanceof VMPendingMessageCursor) { 351 this.systemUsage = brokerService.getSystemUsage(); 352 memoryUsage.setParent(systemUsage.getMemoryUsage()); 353 } 354 355 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); 356 357 super.initialize(); 358 if (store != null) { 359 // Restore the persistent messages. 360 messages.setSystemUsage(systemUsage); 361 messages.setEnableAudit(isEnableAudit()); 362 messages.setMaxAuditDepth(getMaxAuditDepth()); 363 messages.setMaxProducersToAudit(getMaxProducersToAudit()); 364 messages.setUseCache(isUseCache()); 365 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 366 final int messageCount = store.getMessageCount(); 367 if (messageCount > 0 && messages.isRecoveryRequired()) { 368 BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount); 369 do { 370 listener.reset(); 371 store.recoverNextMessages(getMaxPageSize(), listener); 372 listener.processExpired(); 373 } while (!listener.done()); 374 } else { 375 destinationStatistics.getMessages().setCount(messageCount); 376 } 377 } 378 } 379 380 /* 381 * Holder for subscription that needs attention on next iterate browser 382 * needs access to existing messages in the queue that have already been 383 * dispatched 384 */ 385 class BrowserDispatch { 386 QueueBrowserSubscription browser; 387 388 public BrowserDispatch(QueueBrowserSubscription browserSubscription) { 389 browser = browserSubscription; 390 browser.incrementQueueRef(); 391 } 392 393 void done() { 394 try { 395 browser.decrementQueueRef(); 396 } catch (Exception e) { 397 LOG.warn("decrement ref on browser: " + browser, e); 398 } 399 } 400 401 public QueueBrowserSubscription getBrowser() { 402 return browser; 403 } 404 } 405 406 ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>(); 407 408 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 409 if (LOG.isDebugEnabled()) { 410 LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: " 411 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " 412 + getDestinationStatistics().getDispatched().getCount() + ", inflight: " 413 + getDestinationStatistics().getInflight().getCount()); 414 } 415 416 super.addSubscription(context, sub); 417 // synchronize with dispatch method so that no new messages are sent 418 // while setting up a subscription. avoid out of order messages, 419 // duplicates, etc. 420 pagedInPendingDispatchLock.writeLock().lock(); 421 try { 422 423 sub.add(context, this); 424 425 // needs to be synchronized - so no contention with dispatching 426 // consumersLock. 427 consumersLock.writeLock().lock(); 428 try { 429 430 // set a flag if this is a first consumer 431 if (consumers.size() == 0) { 432 firstConsumer = true; 433 if (consumersBeforeDispatchStarts != 0) { 434 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); 435 } 436 } else { 437 if (consumersBeforeStartsLatch != null) { 438 consumersBeforeStartsLatch.countDown(); 439 } 440 } 441 442 addToConsumerList(sub); 443 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) { 444 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 445 if (exclusiveConsumer == null) { 446 exclusiveConsumer = sub; 447 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE || 448 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) { 449 exclusiveConsumer = sub; 450 } 451 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 452 } 453 }finally { 454 consumersLock.writeLock().unlock(); 455 } 456 457 if (sub instanceof QueueBrowserSubscription) { 458 // tee up for dispatch in next iterate 459 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; 460 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); 461 browserDispatches.add(browserDispatch); 462 } 463 464 if (!(this.optimizedDispatch || isSlave())) { 465 wakeup(); 466 } 467 }finally { 468 pagedInPendingDispatchLock.writeLock().unlock(); 469 } 470 if (this.optimizedDispatch || isSlave()) { 471 // Outside of dispatchLock() to maintain the lock hierarchy of 472 // iteratingMutex -> dispatchLock. - see 473 // https://issues.apache.org/activemq/browse/AMQ-1878 474 wakeup(); 475 } 476 } 477 478 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) 479 throws Exception { 480 super.removeSubscription(context, sub, lastDeiveredSequenceId); 481 // synchronize with dispatch method so that no new messages are sent 482 // while removing up a subscription. 483 pagedInPendingDispatchLock.writeLock().lock(); 484 try { 485 if (LOG.isDebugEnabled()) { 486 LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " 487 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " 488 + getDestinationStatistics().getDispatched().getCount() + ", inflight: " 489 + getDestinationStatistics().getInflight().getCount()); 490 } 491 consumersLock.writeLock().lock(); 492 try { 493 removeFromConsumerList(sub); 494 if (sub.getConsumerInfo().isExclusive()) { 495 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 496 if (exclusiveConsumer == sub) { 497 exclusiveConsumer = null; 498 for (Subscription s : consumers) { 499 if (s.getConsumerInfo().isExclusive() 500 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer 501 .getConsumerInfo().getPriority())) { 502 exclusiveConsumer = s; 503 504 } 505 } 506 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 507 } 508 } else if (isAllConsumersExclusiveByDefault()) { 509 Subscription exclusiveConsumer = null; 510 for (Subscription s : consumers) { 511 if (exclusiveConsumer == null 512 || s.getConsumerInfo().getPriority() > exclusiveConsumer 513 .getConsumerInfo().getPriority()) { 514 exclusiveConsumer = s; 515 } 516 } 517 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 518 } 519 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); 520 getMessageGroupOwners().removeConsumer(consumerId); 521 522 // redeliver inflight messages 523 524 boolean markAsRedelivered = false; 525 MessageReference lastDeliveredRef = null; 526 List<MessageReference> unAckedMessages = sub.remove(context, this); 527 528 // locate last redelivered in unconsumed list (list in delivery rather than seq order) 529 if (lastDeiveredSequenceId != 0) { 530 for (MessageReference ref : unAckedMessages) { 531 if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) { 532 lastDeliveredRef = ref; 533 markAsRedelivered = true; 534 if (LOG.isDebugEnabled()) { 535 LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId()); 536 } 537 break; 538 } 539 } 540 } 541 for (MessageReference ref : unAckedMessages) { 542 QueueMessageReference qmr = (QueueMessageReference) ref; 543 if (qmr.getLockOwner() == sub) { 544 qmr.unlock(); 545 546 // have no delivery information 547 if (lastDeiveredSequenceId == 0) { 548 qmr.incrementRedeliveryCounter(); 549 } else { 550 if (markAsRedelivered) { 551 qmr.incrementRedeliveryCounter(); 552 } 553 if (ref == lastDeliveredRef) { 554 // all that follow were not redelivered 555 markAsRedelivered = false; 556 } 557 } 558 } 559 redeliveredWaitingDispatch.addMessageLast(qmr); 560 } 561 if (!redeliveredWaitingDispatch.isEmpty()) { 562 doDispatch(new OrderedPendingList()); 563 } 564 }finally { 565 consumersLock.writeLock().unlock(); 566 } 567 if (!(this.optimizedDispatch || isSlave())) { 568 wakeup(); 569 } 570 }finally { 571 pagedInPendingDispatchLock.writeLock().unlock(); 572 } 573 if (this.optimizedDispatch || isSlave()) { 574 // Outside of dispatchLock() to maintain the lock hierarchy of 575 // iteratingMutex -> dispatchLock. - see 576 // https://issues.apache.org/activemq/browse/AMQ-1878 577 wakeup(); 578 } 579 } 580 581 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 582 final ConnectionContext context = producerExchange.getConnectionContext(); 583 // There is delay between the client sending it and it arriving at the 584 // destination.. it may have expired. 585 message.setRegionDestination(this); 586 ProducerState state = producerExchange.getProducerState(); 587 if (state == null) { 588 LOG.warn("Send failed for: " + message + ", missing producer state for: " + producerExchange); 589 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state"); 590 } 591 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 592 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 593 && !context.isInRecoveryMode(); 594 if (message.isExpired()) { 595 // message not stored - or added to stats yet - so chuck here 596 broker.getRoot().messageExpired(context, message, null); 597 if (sendProducerAck) { 598 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 599 context.getConnection().dispatchAsync(ack); 600 } 601 return; 602 } 603 if (memoryUsage.isFull()) { 604 isFull(context, memoryUsage); 605 fastProducer(context, producerInfo); 606 if (isProducerFlowControl() && context.isProducerFlowControl()) { 607 if (warnOnProducerFlowControl) { 608 warnOnProducerFlowControl = false; 609 LOG 610 .info("Usage Manager Memory Limit (" 611 + memoryUsage.getLimit() 612 + ") reached on " 613 + getActiveMQDestination().getQualifiedName() 614 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." 615 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 616 } 617 618 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 619 throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" 620 + message.getProducerId() + ") to prevent flooding " 621 + getActiveMQDestination().getQualifiedName() + "." 622 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 623 } 624 625 // We can avoid blocking due to low usage if the producer is 626 // sending 627 // a sync message or if it is using a producer window 628 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 629 // copy the exchange state since the context will be 630 // modified while we are waiting 631 // for space. 632 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); 633 synchronized (messagesWaitingForSpace) { 634 // Start flow control timeout task 635 // Prevent trying to start it multiple times 636 if (!flowControlTimeoutTask.isAlive()) { 637 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task"); 638 flowControlTimeoutTask.start(); 639 } 640 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() { 641 public void run() { 642 643 try { 644 // While waiting for space to free up... the 645 // message may have expired. 646 if (message.isExpired()) { 647 LOG.error("expired waiting for space.."); 648 broker.messageExpired(context, message, null); 649 destinationStatistics.getExpired().increment(); 650 } else { 651 doMessageSend(producerExchangeCopy, message); 652 } 653 654 if (sendProducerAck) { 655 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 656 .getSize()); 657 context.getConnection().dispatchAsync(ack); 658 } else { 659 Response response = new Response(); 660 response.setCorrelationId(message.getCommandId()); 661 context.getConnection().dispatchAsync(response); 662 } 663 664 } catch (Exception e) { 665 if (!sendProducerAck && !context.isInRecoveryMode()) { 666 ExceptionResponse response = new ExceptionResponse(e); 667 response.setCorrelationId(message.getCommandId()); 668 context.getConnection().dispatchAsync(response); 669 } else { 670 LOG.debug("unexpected exception on deferred send of :" + message, e); 671 } 672 } 673 } 674 }); 675 676 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 677 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage 678 .getSendFailIfNoSpaceAfterTimeout())); 679 } 680 681 registerCallbackForNotFullNotification(); 682 context.setDontSendReponse(true); 683 return; 684 } 685 686 } else { 687 688 if (memoryUsage.isFull()) { 689 waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer (" 690 + message.getProducerId() + ") stopped to prevent flooding " 691 + getActiveMQDestination().getQualifiedName() + "." 692 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 693 } 694 695 // The usage manager could have delayed us by the time 696 // we unblock the message could have expired.. 697 if (message.isExpired()) { 698 if (LOG.isDebugEnabled()) { 699 LOG.debug("Expired message: " + message); 700 } 701 broker.getRoot().messageExpired(context, message, null); 702 return; 703 } 704 } 705 } 706 } 707 doMessageSend(producerExchange, message); 708 if (sendProducerAck) { 709 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 710 context.getConnection().dispatchAsync(ack); 711 } 712 } 713 714 private void registerCallbackForNotFullNotification() { 715 // If the usage manager is not full, then the task will not 716 // get called.. 717 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 718 // so call it directly here. 719 sendMessagesWaitingForSpaceTask.run(); 720 } 721 } 722 723 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, 724 Exception { 725 final ConnectionContext context = producerExchange.getConnectionContext(); 726 Future<Object> result = null; 727 728 checkUsage(context, message); 729 sendLock.lockInterruptibly(); 730 try { 731 if (store != null && message.isPersistent()) { 732 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 733 if (messages.isCacheEnabled()) { 734 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); 735 } else { 736 store.addMessage(context, message); 737 } 738 if (isReduceMemoryFootprint()) { 739 message.clearMarshalledState(); 740 } 741 } 742 if (context.isInTransaction()) { 743 // If this is a transacted message.. increase the usage now so that 744 // a big TX does not blow up 745 // our memory. This increment is decremented once the tx finishes.. 746 message.incrementReferenceCount(); 747 748 context.getTransaction().addSynchronization(new Synchronization() { 749 @Override 750 public void afterCommit() throws Exception { 751 sendLock.lockInterruptibly(); 752 try { 753 // It could take while before we receive the commit 754 // op, by that time the message could have expired.. 755 if (broker.isExpired(message)) { 756 broker.messageExpired(context, message, null); 757 destinationStatistics.getExpired().increment(); 758 return; 759 } 760 sendMessage(message); 761 } finally { 762 sendLock.unlock(); 763 message.decrementReferenceCount(); 764 } 765 messageSent(context, message); 766 } 767 @Override 768 public void afterRollback() throws Exception { 769 message.decrementReferenceCount(); 770 } 771 }); 772 } else { 773 // Add to the pending list, this takes care of incrementing the 774 // usage manager. 775 sendMessage(message); 776 } 777 } finally { 778 sendLock.unlock(); 779 } 780 if (!context.isInTransaction()) { 781 messageSent(context, message); 782 } 783 if (result != null && !result.isCancelled()) { 784 try { 785 result.get(); 786 } catch (CancellationException e) { 787 // ignore - the task has been cancelled if the message 788 // has already been deleted 789 } 790 } 791 } 792 793 private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException { 794 if (message.isPersistent()) { 795 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 796 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 797 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" 798 + message.getProducerId() + ") to prevent flooding " 799 + getActiveMQDestination().getQualifiedName() + "." 800 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 801 802 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 803 } 804 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) { 805 final String logMessage = "Temp Store is Full (" 806 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() 807 +"). Stopping producer (" + message.getProducerId() 808 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 809 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 810 811 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage); 812 } 813 } 814 815 private void expireMessages() { 816 if (LOG.isDebugEnabled()) { 817 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages .."); 818 } 819 820 // just track the insertion count 821 List<Message> browsedMessages = new InsertionCountList<Message>(); 822 doBrowse(browsedMessages, this.getMaxExpirePageSize()); 823 asyncWakeup(); 824 if (LOG.isDebugEnabled()) { 825 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done."); 826 } 827 } 828 829 public void gc() { 830 } 831 832 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) 833 throws IOException { 834 messageConsumed(context, node); 835 if (store != null && node.isPersistent()) { 836 // the original ack may be a ranged ack, but we are trying to delete 837 // a specific 838 // message store here so we need to convert to a non ranged ack. 839 if (ack.getMessageCount() > 0) { 840 // Dup the ack 841 MessageAck a = new MessageAck(); 842 ack.copy(a); 843 ack = a; 844 // Convert to non-ranged. 845 ack.setFirstMessageId(node.getMessageId()); 846 ack.setLastMessageId(node.getMessageId()); 847 ack.setMessageCount(1); 848 } 849 850 store.removeAsyncMessage(context, ack); 851 } 852 } 853 854 Message loadMessage(MessageId messageId) throws IOException { 855 Message msg = null; 856 if (store != null) { // can be null for a temp q 857 msg = store.getMessage(messageId); 858 if (msg != null) { 859 msg.setRegionDestination(this); 860 } 861 } 862 return msg; 863 } 864 865 @Override 866 public String toString() { 867 int size = 0; 868 messagesLock.readLock().lock(); 869 try{ 870 size = messages.size(); 871 }finally { 872 messagesLock.readLock().unlock(); 873 } 874 return destination.getQualifiedName() + ", subscriptions=" + consumers.size() 875 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups=" 876 + messageGroupOwners; 877 } 878 879 public void start() throws Exception { 880 if (memoryUsage != null) { 881 memoryUsage.start(); 882 } 883 if (systemUsage.getStoreUsage() != null) { 884 systemUsage.getStoreUsage().start(); 885 } 886 systemUsage.getMemoryUsage().addUsageListener(this); 887 messages.start(); 888 if (getExpireMessagesPeriod() > 0) { 889 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); 890 } 891 doPageIn(false); 892 } 893 894 public void stop() throws Exception { 895 if (taskRunner != null) { 896 taskRunner.shutdown(); 897 } 898 if (this.executor != null) { 899 this.executor.shutdownNow(); 900 } 901 902 scheduler.cancel(expireMessagesTask); 903 904 if (flowControlTimeoutTask.isAlive()) { 905 flowControlTimeoutTask.interrupt(); 906 } 907 908 if (messages != null) { 909 messages.stop(); 910 } 911 912 systemUsage.getMemoryUsage().removeUsageListener(this); 913 if (memoryUsage != null) { 914 memoryUsage.stop(); 915 } 916 if (store != null) { 917 store.stop(); 918 } 919 } 920 921 // Properties 922 // ------------------------------------------------------------------------- 923 @Override 924 public ActiveMQDestination getActiveMQDestination() { 925 return destination; 926 } 927 928 public MessageGroupMap getMessageGroupOwners() { 929 if (messageGroupOwners == null) { 930 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); 931 } 932 return messageGroupOwners; 933 } 934 935 public DispatchPolicy getDispatchPolicy() { 936 return dispatchPolicy; 937 } 938 939 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 940 this.dispatchPolicy = dispatchPolicy; 941 } 942 943 public MessageGroupMapFactory getMessageGroupMapFactory() { 944 return messageGroupMapFactory; 945 } 946 947 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 948 this.messageGroupMapFactory = messageGroupMapFactory; 949 } 950 951 public PendingMessageCursor getMessages() { 952 return this.messages; 953 } 954 955 public void setMessages(PendingMessageCursor messages) { 956 this.messages = messages; 957 } 958 959 public boolean isUseConsumerPriority() { 960 return useConsumerPriority; 961 } 962 963 public void setUseConsumerPriority(boolean useConsumerPriority) { 964 this.useConsumerPriority = useConsumerPriority; 965 } 966 967 public boolean isStrictOrderDispatch() { 968 return strictOrderDispatch; 969 } 970 971 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 972 this.strictOrderDispatch = strictOrderDispatch; 973 } 974 975 public boolean isOptimizedDispatch() { 976 return optimizedDispatch; 977 } 978 979 public void setOptimizedDispatch(boolean optimizedDispatch) { 980 this.optimizedDispatch = optimizedDispatch; 981 } 982 983 public int getTimeBeforeDispatchStarts() { 984 return timeBeforeDispatchStarts; 985 } 986 987 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 988 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 989 } 990 991 public int getConsumersBeforeDispatchStarts() { 992 return consumersBeforeDispatchStarts; 993 } 994 995 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 996 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 997 } 998 999 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 1000 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 1001 } 1002 1003 public boolean isAllConsumersExclusiveByDefault() { 1004 return allConsumersExclusiveByDefault; 1005 } 1006 1007 1008 // Implementation methods 1009 // ------------------------------------------------------------------------- 1010 private QueueMessageReference createMessageReference(Message message) { 1011 QueueMessageReference result = new IndirectMessageReference(message); 1012 return result; 1013 } 1014 1015 public Message[] browse() { 1016 List<Message> browseList = new ArrayList<Message>(); 1017 doBrowse(browseList, getMaxBrowsePageSize()); 1018 return browseList.toArray(new Message[browseList.size()]); 1019 } 1020 1021 public void doBrowse(List<Message> browseList, int max) { 1022 final ConnectionContext connectionContext = createConnectionContext(); 1023 try { 1024 pageInMessages(false); 1025 List<MessageReference> toExpire = new ArrayList<MessageReference>(); 1026 1027 pagedInPendingDispatchLock.writeLock().lock(); 1028 try { 1029 addAll(pagedInPendingDispatch.values(), browseList, max, toExpire); 1030 for (MessageReference ref : toExpire) { 1031 pagedInPendingDispatch.remove(ref); 1032 if (broker.isExpired(ref)) { 1033 if (LOG.isDebugEnabled()) { 1034 LOG.debug("expiring from pagedInPending: " + ref); 1035 } 1036 messageExpired(connectionContext, ref); 1037 } 1038 } 1039 } finally { 1040 pagedInPendingDispatchLock.writeLock().unlock(); 1041 } 1042 toExpire.clear(); 1043 pagedInMessagesLock.readLock().lock(); 1044 try { 1045 addAll(pagedInMessages.values(), browseList, max, toExpire); 1046 } finally { 1047 pagedInMessagesLock.readLock().unlock(); 1048 } 1049 for (MessageReference ref : toExpire) { 1050 if (broker.isExpired(ref)) { 1051 if (LOG.isDebugEnabled()) { 1052 LOG.debug("expiring from pagedInMessages: " + ref); 1053 } 1054 messageExpired(connectionContext, ref); 1055 } else { 1056 pagedInMessagesLock.writeLock().lock(); 1057 try { 1058 pagedInMessages.remove(ref.getMessageId()); 1059 } finally { 1060 pagedInMessagesLock.writeLock().unlock(); 1061 } 1062 } 1063 } 1064 1065 if (browseList.size() < getMaxBrowsePageSize()) { 1066 messagesLock.writeLock().lock(); 1067 try { 1068 try { 1069 messages.reset(); 1070 while (messages.hasNext() && browseList.size() < max) { 1071 MessageReference node = messages.next(); 1072 if (node.isExpired()) { 1073 if (broker.isExpired(node)) { 1074 if (LOG.isDebugEnabled()) { 1075 LOG.debug("expiring from messages: " + node); 1076 } 1077 messageExpired(connectionContext, createMessageReference(node.getMessage())); 1078 } 1079 messages.remove(); 1080 } else { 1081 messages.rollback(node.getMessageId()); 1082 if (browseList.contains(node.getMessage()) == false) { 1083 browseList.add(node.getMessage()); 1084 } 1085 } 1086 node.decrementReferenceCount(); 1087 } 1088 } finally { 1089 messages.release(); 1090 } 1091 } finally { 1092 messagesLock.writeLock().unlock(); 1093 } 1094 } 1095 1096 } catch (Exception e) { 1097 LOG.error("Problem retrieving message for browse", e); 1098 } 1099 } 1100 1101 private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize, 1102 List<MessageReference> toExpire) throws Exception { 1103 for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) { 1104 QueueMessageReference ref = (QueueMessageReference) i.next(); 1105 if (ref.isExpired()) { 1106 toExpire.add(ref); 1107 } else if (l.contains(ref.getMessage()) == false) { 1108 l.add(ref.getMessage()); 1109 } 1110 } 1111 } 1112 1113 public QueueMessageReference getMessage(String id) { 1114 MessageId msgId = new MessageId(id); 1115 pagedInMessagesLock.readLock().lock(); 1116 try{ 1117 QueueMessageReference ref = this.pagedInMessages.get(msgId); 1118 if (ref != null) { 1119 return ref; 1120 } 1121 }finally { 1122 pagedInMessagesLock.readLock().unlock(); 1123 } 1124 messagesLock.readLock().lock(); 1125 try{ 1126 try { 1127 messages.reset(); 1128 while (messages.hasNext()) { 1129 MessageReference mr = messages.next(); 1130 QueueMessageReference qmr = createMessageReference(mr.getMessage()); 1131 qmr.decrementReferenceCount(); 1132 messages.rollback(qmr.getMessageId()); 1133 if (msgId.equals(qmr.getMessageId())) { 1134 return qmr; 1135 } 1136 } 1137 } finally { 1138 messages.release(); 1139 } 1140 }finally { 1141 messagesLock.readLock().unlock(); 1142 } 1143 return null; 1144 } 1145 1146 public void purge() throws Exception { 1147 ConnectionContext c = createConnectionContext(); 1148 List<MessageReference> list = null; 1149 do { 1150 doPageIn(true); 1151 pagedInMessagesLock.readLock().lock(); 1152 try { 1153 list = new ArrayList<MessageReference>(pagedInMessages.values()); 1154 }finally { 1155 pagedInMessagesLock.readLock().unlock(); 1156 } 1157 1158 for (MessageReference ref : list) { 1159 try { 1160 QueueMessageReference r = (QueueMessageReference) ref; 1161 removeMessage(c, r); 1162 } catch (IOException e) { 1163 } 1164 } 1165 // don't spin/hang if stats are out and there is nothing left in the 1166 // store 1167 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); 1168 if (this.destinationStatistics.getMessages().getCount() > 0) { 1169 LOG.warn(getActiveMQDestination().getQualifiedName() 1170 + " after purge complete, message count stats report: " 1171 + this.destinationStatistics.getMessages().getCount()); 1172 } 1173 gc(); 1174 this.destinationStatistics.getMessages().setCount(0); 1175 getMessages().clear(); 1176 } 1177 1178 public void clearPendingMessages() { 1179 messagesLock.writeLock().lock(); 1180 try { 1181 if (store != null) { 1182 store.resetBatching(); 1183 } 1184 messages.gc(); 1185 asyncWakeup(); 1186 } finally { 1187 messagesLock.writeLock().unlock(); 1188 } 1189 } 1190 1191 /** 1192 * Removes the message matching the given messageId 1193 */ 1194 public boolean removeMessage(String messageId) throws Exception { 1195 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0; 1196 } 1197 1198 /** 1199 * Removes the messages matching the given selector 1200 * 1201 * @return the number of messages removed 1202 */ 1203 public int removeMatchingMessages(String selector) throws Exception { 1204 return removeMatchingMessages(selector, -1); 1205 } 1206 1207 /** 1208 * Removes the messages matching the given selector up to the maximum number 1209 * of matched messages 1210 * 1211 * @return the number of messages removed 1212 */ 1213 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 1214 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages); 1215 } 1216 1217 /** 1218 * Removes the messages matching the given filter up to the maximum number 1219 * of matched messages 1220 * 1221 * @return the number of messages removed 1222 */ 1223 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { 1224 int movedCounter = 0; 1225 Set<MessageReference> set = new HashSet<MessageReference>(); 1226 ConnectionContext context = createConnectionContext(); 1227 do { 1228 doPageIn(true); 1229 pagedInMessagesLock.readLock().lock(); 1230 try{ 1231 set.addAll(pagedInMessages.values()); 1232 }finally { 1233 pagedInMessagesLock.readLock().unlock(); 1234 } 1235 List<MessageReference> list = new ArrayList<MessageReference>(set); 1236 for (MessageReference ref : list) { 1237 IndirectMessageReference r = (IndirectMessageReference) ref; 1238 if (filter.evaluate(context, r)) { 1239 1240 removeMessage(context, r); 1241 set.remove(r); 1242 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1243 return movedCounter; 1244 } 1245 } 1246 } 1247 } while (set.size() < this.destinationStatistics.getMessages().getCount()); 1248 return movedCounter; 1249 } 1250 1251 /** 1252 * Copies the message matching the given messageId 1253 */ 1254 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1255 throws Exception { 1256 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0; 1257 } 1258 1259 /** 1260 * Copies the messages matching the given selector 1261 * 1262 * @return the number of messages copied 1263 */ 1264 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1265 throws Exception { 1266 return copyMatchingMessagesTo(context, selector, dest, -1); 1267 } 1268 1269 /** 1270 * Copies the messages matching the given selector up to the maximum number 1271 * of matched messages 1272 * 1273 * @return the number of messages copied 1274 */ 1275 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1276 int maximumMessages) throws Exception { 1277 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages); 1278 } 1279 1280 /** 1281 * Copies the messages matching the given filter up to the maximum number of 1282 * matched messages 1283 * 1284 * @return the number of messages copied 1285 */ 1286 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, 1287 int maximumMessages) throws Exception { 1288 int movedCounter = 0; 1289 int count = 0; 1290 Set<MessageReference> set = new HashSet<MessageReference>(); 1291 do { 1292 int oldMaxSize = getMaxPageSize(); 1293 setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); 1294 doPageIn(true); 1295 setMaxPageSize(oldMaxSize); 1296 pagedInMessagesLock.readLock().lock(); 1297 try { 1298 set.addAll(pagedInMessages.values()); 1299 }finally { 1300 pagedInMessagesLock.readLock().unlock(); 1301 } 1302 List<MessageReference> list = new ArrayList<MessageReference>(set); 1303 for (MessageReference ref : list) { 1304 IndirectMessageReference r = (IndirectMessageReference) ref; 1305 if (filter.evaluate(context, r)) { 1306 1307 r.incrementReferenceCount(); 1308 try { 1309 Message m = r.getMessage(); 1310 BrokerSupport.resend(context, m, dest); 1311 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1312 return movedCounter; 1313 } 1314 } finally { 1315 r.decrementReferenceCount(); 1316 } 1317 } 1318 count++; 1319 } 1320 } while (count < this.destinationStatistics.getMessages().getCount()); 1321 return movedCounter; 1322 } 1323 1324 /** 1325 * Move a message 1326 * 1327 * @param context 1328 * connection context 1329 * @param m 1330 * QueueMessageReference 1331 * @param dest 1332 * ActiveMQDestination 1333 * @throws Exception 1334 */ 1335 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception { 1336 BrokerSupport.resend(context, m.getMessage(), dest); 1337 removeMessage(context, m); 1338 messagesLock.writeLock().lock(); 1339 try{ 1340 messages.rollback(m.getMessageId()); 1341 }finally { 1342 messagesLock.writeLock().unlock(); 1343 } 1344 return true; 1345 } 1346 1347 /** 1348 * Moves the message matching the given messageId 1349 */ 1350 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1351 throws Exception { 1352 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0; 1353 } 1354 1355 /** 1356 * Moves the messages matching the given selector 1357 * 1358 * @return the number of messages removed 1359 */ 1360 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1361 throws Exception { 1362 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); 1363 } 1364 1365 /** 1366 * Moves the messages matching the given selector up to the maximum number 1367 * of matched messages 1368 */ 1369 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1370 int maximumMessages) throws Exception { 1371 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages); 1372 } 1373 1374 /** 1375 * Moves the messages matching the given filter up to the maximum number of 1376 * matched messages 1377 */ 1378 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, 1379 ActiveMQDestination dest, int maximumMessages) throws Exception { 1380 int movedCounter = 0; 1381 Set<QueueMessageReference> set = new HashSet<QueueMessageReference>(); 1382 do { 1383 doPageIn(true); 1384 pagedInMessagesLock.readLock().lock(); 1385 try{ 1386 set.addAll(pagedInMessages.values()); 1387 }finally { 1388 pagedInMessagesLock.readLock().unlock(); 1389 } 1390 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set); 1391 for (QueueMessageReference ref : list) { 1392 if (filter.evaluate(context, ref)) { 1393 // We should only move messages that can be locked. 1394 moveMessageTo(context, ref, dest); 1395 set.remove(ref); 1396 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1397 return movedCounter; 1398 } 1399 } 1400 } 1401 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); 1402 return movedCounter; 1403 } 1404 1405 /** 1406 * @return true if we would like to iterate again 1407 * @see org.apache.activemq.thread.Task#iterate() 1408 */ 1409 public boolean iterate() { 1410 MDC.put("activemq.destination", getName()); 1411 boolean pageInMoreMessages = false; 1412 synchronized (iteratingMutex) { 1413 1414 // If optimize dispatch is on or this is a slave this method could be called recursively 1415 // we set this state value to short-circuit wakeup in those cases to avoid that as it 1416 // could lead to errors. 1417 iterationRunning = true; 1418 1419 // do early to allow dispatch of these waiting messages 1420 synchronized (messagesWaitingForSpace) { 1421 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator(); 1422 while (it.hasNext()) { 1423 if (!memoryUsage.isFull()) { 1424 Runnable op = it.next(); 1425 it.remove(); 1426 op.run(); 1427 } else { 1428 registerCallbackForNotFullNotification(); 1429 break; 1430 } 1431 } 1432 } 1433 1434 if (firstConsumer) { 1435 firstConsumer = false; 1436 try { 1437 if (consumersBeforeDispatchStarts > 0) { 1438 int timeout = 1000; // wait one second by default if 1439 // consumer count isn't reached 1440 if (timeBeforeDispatchStarts > 0) { 1441 timeout = timeBeforeDispatchStarts; 1442 } 1443 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { 1444 if (LOG.isDebugEnabled()) { 1445 LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch."); 1446 } 1447 } else { 1448 if (LOG.isDebugEnabled()) { 1449 LOG.debug(timeout + " ms elapsed and " + consumers.size() 1450 + " consumers subscribed. Starting dispatch."); 1451 } 1452 } 1453 } 1454 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { 1455 iteratingMutex.wait(timeBeforeDispatchStarts); 1456 if (LOG.isDebugEnabled()) { 1457 LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch."); 1458 } 1459 } 1460 } catch (Exception e) { 1461 LOG.error(e.toString()); 1462 } 1463 } 1464 1465 BrowserDispatch pendingBrowserDispatch = browserDispatches.poll(); 1466 1467 messagesLock.readLock().lock(); 1468 try{ 1469 pageInMoreMessages |= !messages.isEmpty(); 1470 } finally { 1471 messagesLock.readLock().unlock(); 1472 } 1473 1474 pagedInPendingDispatchLock.readLock().lock(); 1475 try { 1476 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); 1477 } finally { 1478 pagedInPendingDispatchLock.readLock().unlock(); 1479 } 1480 1481 // Perhaps we should page always into the pagedInPendingDispatch 1482 // list if 1483 // !messages.isEmpty(), and then if 1484 // !pagedInPendingDispatch.isEmpty() 1485 // then we do a dispatch. 1486 if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) { 1487 try { 1488 pageInMessages(pendingBrowserDispatch != null); 1489 1490 } catch (Throwable e) { 1491 LOG.error("Failed to page in more queue messages ", e); 1492 } 1493 } 1494 1495 if (pendingBrowserDispatch != null) { 1496 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null; 1497 pagedInMessagesLock.readLock().lock(); 1498 try{ 1499 alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values()); 1500 }finally { 1501 pagedInMessagesLock.readLock().unlock(); 1502 } 1503 if (LOG.isDebugEnabled()) { 1504 LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser() 1505 + ", already dispatched/paged count: " + alreadyDispatchedMessages.size()); 1506 } 1507 do { 1508 try { 1509 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 1510 msgContext.setDestination(destination); 1511 1512 QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser(); 1513 for (QueueMessageReference node : alreadyDispatchedMessages) { 1514 if (!node.isAcked()) { 1515 msgContext.setMessageReference(node); 1516 if (browser.matches(node, msgContext)) { 1517 browser.add(node); 1518 } 1519 } 1520 } 1521 pendingBrowserDispatch.done(); 1522 } catch (Exception e) { 1523 LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e); 1524 } 1525 1526 } while ((pendingBrowserDispatch = browserDispatches.poll()) != null); 1527 } 1528 1529 if (pendingWakeups.get() > 0) { 1530 pendingWakeups.decrementAndGet(); 1531 } 1532 MDC.remove("activemq.destination"); 1533 iterationRunning = false; 1534 1535 return pendingWakeups.get() > 0; 1536 } 1537 } 1538 1539 protected MessageReferenceFilter createMessageIdFilter(final String messageId) { 1540 return new MessageReferenceFilter() { 1541 public boolean evaluate(ConnectionContext context, MessageReference r) { 1542 return messageId.equals(r.getMessageId().toString()); 1543 } 1544 1545 @Override 1546 public String toString() { 1547 return "MessageIdFilter: " + messageId; 1548 } 1549 }; 1550 } 1551 1552 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException { 1553 1554 if (selector == null || selector.isEmpty()) { 1555 return new MessageReferenceFilter() { 1556 1557 @Override 1558 public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException { 1559 return true; 1560 } 1561 }; 1562 } 1563 1564 final BooleanExpression selectorExpression = SelectorParser.parse(selector); 1565 1566 return new MessageReferenceFilter() { 1567 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException { 1568 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext(); 1569 1570 messageEvaluationContext.setMessageReference(r); 1571 if (messageEvaluationContext.getDestination() == null) { 1572 messageEvaluationContext.setDestination(getActiveMQDestination()); 1573 } 1574 1575 return selectorExpression.matches(messageEvaluationContext); 1576 } 1577 }; 1578 } 1579 1580 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException { 1581 removeMessage(c, null, r); 1582 pagedInPendingDispatchLock.writeLock().lock(); 1583 try { 1584 pagedInPendingDispatch.remove(r); 1585 } finally { 1586 pagedInPendingDispatchLock.writeLock().unlock(); 1587 } 1588 1589 } 1590 1591 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException { 1592 MessageAck ack = new MessageAck(); 1593 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 1594 ack.setDestination(destination); 1595 ack.setMessageID(r.getMessageId()); 1596 removeMessage(c, subs, r, ack); 1597 } 1598 1599 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, 1600 MessageAck ack) throws IOException { 1601 reference.setAcked(true); 1602 // This sends the ack the the journal.. 1603 if (!ack.isInTransaction()) { 1604 acknowledge(context, sub, ack, reference); 1605 getDestinationStatistics().getDequeues().increment(); 1606 dropMessage(reference); 1607 } else { 1608 try { 1609 acknowledge(context, sub, ack, reference); 1610 } finally { 1611 context.getTransaction().addSynchronization(new Synchronization() { 1612 1613 @Override 1614 public void afterCommit() throws Exception { 1615 getDestinationStatistics().getDequeues().increment(); 1616 dropMessage(reference); 1617 wakeup(); 1618 } 1619 1620 @Override 1621 public void afterRollback() throws Exception { 1622 reference.setAcked(false); 1623 } 1624 }); 1625 } 1626 } 1627 if (ack.isPoisonAck()) { 1628 // message gone to DLQ, is ok to allow redelivery 1629 messagesLock.writeLock().lock(); 1630 try{ 1631 messages.rollback(reference.getMessageId()); 1632 }finally { 1633 messagesLock.writeLock().unlock(); 1634 } 1635 } 1636 1637 } 1638 1639 private void dropMessage(QueueMessageReference reference) { 1640 reference.drop(); 1641 destinationStatistics.getMessages().decrement(); 1642 pagedInMessagesLock.writeLock().lock(); 1643 try{ 1644 pagedInMessages.remove(reference.getMessageId()); 1645 }finally { 1646 pagedInMessagesLock.writeLock().unlock(); 1647 } 1648 } 1649 1650 public void messageExpired(ConnectionContext context, MessageReference reference) { 1651 messageExpired(context, null, reference); 1652 } 1653 1654 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 1655 if (LOG.isDebugEnabled()) { 1656 LOG.debug("message expired: " + reference); 1657 } 1658 broker.messageExpired(context, reference, subs); 1659 destinationStatistics.getExpired().increment(); 1660 try { 1661 removeMessage(context, subs, (QueueMessageReference) reference); 1662 } catch (IOException e) { 1663 LOG.error("Failed to remove expired Message from the store ", e); 1664 } 1665 } 1666 1667 final void sendMessage(final Message msg) throws Exception { 1668 messagesLock.writeLock().lock(); 1669 try{ 1670 messages.addMessageLast(msg); 1671 }finally { 1672 messagesLock.writeLock().unlock(); 1673 } 1674 } 1675 1676 final void messageSent(final ConnectionContext context, final Message msg) throws Exception { 1677 destinationStatistics.getEnqueues().increment(); 1678 destinationStatistics.getMessages().increment(); 1679 messageDelivered(context, msg); 1680 consumersLock.readLock().lock(); 1681 try { 1682 if (consumers.isEmpty()) { 1683 onMessageWithNoConsumers(context, msg); 1684 } 1685 }finally { 1686 consumersLock.readLock().unlock(); 1687 } 1688 if (LOG.isDebugEnabled()) { 1689 LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination); 1690 } 1691 wakeup(); 1692 } 1693 1694 public void wakeup() { 1695 if ((optimizedDispatch || isSlave()) && !iterationRunning) { 1696 iterate(); 1697 pendingWakeups.incrementAndGet(); 1698 } else { 1699 asyncWakeup(); 1700 } 1701 } 1702 1703 private void asyncWakeup() { 1704 try { 1705 pendingWakeups.incrementAndGet(); 1706 this.taskRunner.wakeup(); 1707 } catch (InterruptedException e) { 1708 LOG.warn("Async task tunner failed to wakeup ", e); 1709 } 1710 } 1711 1712 private boolean isSlave() { 1713 return broker.getBrokerService().isSlave(); 1714 } 1715 1716 private void doPageIn(boolean force) throws Exception { 1717 PendingList newlyPaged = doPageInForDispatch(force); 1718 pagedInPendingDispatchLock.writeLock().lock(); 1719 try { 1720 if (pagedInPendingDispatch.isEmpty()) { 1721 pagedInPendingDispatch.addAll(newlyPaged); 1722 1723 } else { 1724 for (MessageReference qmr : newlyPaged) { 1725 if (!pagedInPendingDispatch.contains(qmr)) { 1726 pagedInPendingDispatch.addMessageLast(qmr); 1727 } 1728 } 1729 } 1730 } finally { 1731 pagedInPendingDispatchLock.writeLock().unlock(); 1732 } 1733 } 1734 1735 private PendingList doPageInForDispatch(boolean force) throws Exception { 1736 List<QueueMessageReference> result = null; 1737 PendingList resultList = null; 1738 1739 int toPageIn = Math.min(getMaxPageSize(), messages.size()); 1740 if (LOG.isDebugEnabled()) { 1741 LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " 1742 + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " 1743 + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount() 1744 + ", dequeueCount: " + destinationStatistics.getDequeues().getCount()); 1745 } 1746 1747 if (isLazyDispatch() && !force) { 1748 // Only page in the minimum number of messages which can be 1749 // dispatched immediately. 1750 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); 1751 } 1752 int pagedInPendingSize = 0; 1753 pagedInPendingDispatchLock.readLock().lock(); 1754 try { 1755 pagedInPendingSize = pagedInPendingDispatch.size(); 1756 } finally { 1757 pagedInPendingDispatchLock.readLock().unlock(); 1758 } 1759 if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) { 1760 int count = 0; 1761 result = new ArrayList<QueueMessageReference>(toPageIn); 1762 messagesLock.writeLock().lock(); 1763 try { 1764 try { 1765 messages.setMaxBatchSize(toPageIn); 1766 messages.reset(); 1767 while (messages.hasNext() && count < toPageIn) { 1768 MessageReference node = messages.next(); 1769 messages.remove(); 1770 1771 QueueMessageReference ref = createMessageReference(node.getMessage()); 1772 if (ref.isExpired()) { 1773 if (broker.isExpired(ref)) { 1774 messageExpired(createConnectionContext(), ref); 1775 } else { 1776 ref.decrementReferenceCount(); 1777 } 1778 } else { 1779 result.add(ref); 1780 count++; 1781 } 1782 } 1783 } finally { 1784 messages.release(); 1785 } 1786 } finally { 1787 messagesLock.writeLock().unlock(); 1788 } 1789 // Only add new messages, not already pagedIn to avoid multiple 1790 // dispatch attempts 1791 pagedInMessagesLock.writeLock().lock(); 1792 try { 1793 if(isPrioritizedMessages()) { 1794 resultList = new PrioritizedPendingList(); 1795 } else { 1796 resultList = new OrderedPendingList(); 1797 } 1798 for (QueueMessageReference ref : result) { 1799 if (!pagedInMessages.containsKey(ref.getMessageId())) { 1800 pagedInMessages.put(ref.getMessageId(), ref); 1801 resultList.addMessageLast(ref); 1802 } else { 1803 ref.decrementReferenceCount(); 1804 } 1805 } 1806 } finally { 1807 pagedInMessagesLock.writeLock().unlock(); 1808 } 1809 } else { 1810 // Avoid return null list, if condition is not validated 1811 resultList = new OrderedPendingList(); 1812 } 1813 1814 return resultList; 1815 } 1816 1817 private void doDispatch(PendingList list) throws Exception { 1818 boolean doWakeUp = false; 1819 1820 pagedInPendingDispatchLock.writeLock().lock(); 1821 try { 1822 if (!redeliveredWaitingDispatch.isEmpty()) { 1823 // Try first to dispatch redelivered messages to keep an 1824 // proper order 1825 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); 1826 } 1827 if (!pagedInPendingDispatch.isEmpty()) { 1828 // Next dispatch anything that had not been 1829 // dispatched before. 1830 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); 1831 } 1832 // and now see if we can dispatch the new stuff.. and append to 1833 // the pending 1834 // list anything that does not actually get dispatched. 1835 if (list != null && !list.isEmpty()) { 1836 if (pagedInPendingDispatch.isEmpty()) { 1837 pagedInPendingDispatch.addAll(doActualDispatch(list)); 1838 } else { 1839 for (MessageReference qmr : list) { 1840 if (!pagedInPendingDispatch.contains(qmr)) { 1841 pagedInPendingDispatch.addMessageLast(qmr); 1842 } 1843 } 1844 doWakeUp = true; 1845 } 1846 } 1847 } finally { 1848 pagedInPendingDispatchLock.writeLock().unlock(); 1849 } 1850 1851 if (doWakeUp) { 1852 // avoid lock order contention 1853 asyncWakeup(); 1854 } 1855 } 1856 1857 /** 1858 * @return list of messages that could get dispatched to consumers if they 1859 * were not full. 1860 */ 1861 private PendingList doActualDispatch(PendingList list) throws Exception { 1862 List<Subscription> consumers; 1863 consumersLock.writeLock().lock(); 1864 1865 try { 1866 if (this.consumers.isEmpty() || isSlave()) { 1867 // slave dispatch happens in processDispatchNotification 1868 return list; 1869 } 1870 consumers = new ArrayList<Subscription>(this.consumers); 1871 }finally { 1872 consumersLock.writeLock().unlock(); 1873 } 1874 1875 PendingList rc; 1876 if(isPrioritizedMessages()) { 1877 rc = new PrioritizedPendingList(); 1878 } else { 1879 rc = new OrderedPendingList(); 1880 } 1881 1882 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); 1883 1884 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 1885 1886 MessageReference node = (MessageReference) iterator.next(); 1887 Subscription target = null; 1888 int interestCount = 0; 1889 for (Subscription s : consumers) { 1890 if (s instanceof QueueBrowserSubscription) { 1891 interestCount++; 1892 continue; 1893 } 1894 if (!fullConsumers.contains(s) && !s.isFull()) { 1895 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)) { 1896 // Dispatch it. 1897 s.add(node); 1898 target = s; 1899 break; 1900 } 1901 } else { 1902 // no further dispatch of list to a full consumer to 1903 // avoid out of order message receipt 1904 fullConsumers.add(s); 1905 if (LOG.isTraceEnabled()) { 1906 LOG.trace("Sub full " + s); 1907 } 1908 } 1909 // make sure it gets dispatched again 1910 if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && 1911 (!node.isDropped() || s.getConsumerInfo().isBrowser())) { 1912 interestCount++; 1913 } 1914 } 1915 1916 if ((target == null && interestCount > 0) || consumers.size() == 0) { 1917 // This means all subs were full or that there are no 1918 // consumers... 1919 rc.addMessageLast((QueueMessageReference) node); 1920 } 1921 1922 // If it got dispatched, rotate the consumer list to get round robin 1923 // distribution. 1924 if (target != null && !strictOrderDispatch && consumers.size() > 1 1925 && !dispatchSelector.isExclusiveConsumer(target)) { 1926 consumersLock.writeLock().lock(); 1927 try { 1928 if (removeFromConsumerList(target)) { 1929 addToConsumerList(target); 1930 consumers = new ArrayList<Subscription>(this.consumers); 1931 } 1932 }finally { 1933 consumersLock.writeLock().unlock(); 1934 } 1935 } 1936 } 1937 1938 return rc; 1939 } 1940 1941 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { 1942 boolean result = true; 1943 // Keep message groups together. 1944 String groupId = node.getGroupID(); 1945 int sequence = node.getGroupSequence(); 1946 if (groupId != null) { 1947 //MessageGroupMap messageGroupOwners = ((Queue) node 1948 // .getRegionDestination()).getMessageGroupOwners(); 1949 1950 MessageGroupMap messageGroupOwners = getMessageGroupOwners(); 1951 // If we can own the first, then no-one else should own the 1952 // rest. 1953 if (sequence == 1) { 1954 assignGroup(subscription, messageGroupOwners, node, groupId); 1955 } else { 1956 1957 // Make sure that the previous owner is still valid, we may 1958 // need to become the new owner. 1959 ConsumerId groupOwner; 1960 1961 groupOwner = messageGroupOwners.get(groupId); 1962 if (groupOwner == null) { 1963 assignGroup(subscription, messageGroupOwners, node, groupId); 1964 } else { 1965 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { 1966 // A group sequence < 1 is an end of group signal. 1967 if (sequence < 0) { 1968 messageGroupOwners.removeGroup(groupId); 1969 } 1970 } else { 1971 result = false; 1972 } 1973 } 1974 } 1975 } 1976 1977 return result; 1978 1979 } 1980 1981 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { 1982 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); 1983 Message message = n.getMessage(); 1984 if (message instanceof ActiveMQMessage) { 1985 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 1986 try { 1987 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); 1988 } catch (JMSException e) { 1989 LOG.warn("Failed to set boolean header: " + e, e); 1990 } 1991 } 1992 } 1993 1994 protected void pageInMessages(boolean force) throws Exception { 1995 doDispatch(doPageInForDispatch(force)); 1996 } 1997 1998 private void addToConsumerList(Subscription sub) { 1999 if (useConsumerPriority) { 2000 consumers.add(sub); 2001 Collections.sort(consumers, orderedCompare); 2002 } else { 2003 consumers.add(sub); 2004 } 2005 } 2006 2007 private boolean removeFromConsumerList(Subscription sub) { 2008 return consumers.remove(sub); 2009 } 2010 2011 private int getConsumerMessageCountBeforeFull() throws Exception { 2012 int total = 0; 2013 boolean zeroPrefetch = false; 2014 consumersLock.readLock().lock(); 2015 try{ 2016 for (Subscription s : consumers) { 2017 zeroPrefetch |= s.getPrefetchSize() == 0; 2018 int countBeforeFull = s.countBeforeFull(); 2019 total += countBeforeFull; 2020 } 2021 }finally { 2022 consumersLock.readLock().unlock(); 2023 } 2024 if (total == 0 && zeroPrefetch) { 2025 total = 1; 2026 } 2027 return total; 2028 } 2029 2030 /* 2031 * In slave mode, dispatch is ignored till we get this notification as the 2032 * dispatch process is non deterministic between master and slave. On a 2033 * notification, the actual dispatch to the subscription (as chosen by the 2034 * master) is completed. (non-Javadoc) 2035 * @see 2036 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification 2037 * (org.apache.activemq.command.MessageDispatchNotification) 2038 */ 2039 @Override 2040 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 2041 // do dispatch 2042 Subscription sub = getMatchingSubscription(messageDispatchNotification); 2043 if (sub != null) { 2044 MessageReference message = getMatchingMessage(messageDispatchNotification); 2045 sub.add(message); 2046 sub.processMessageDispatchNotification(messageDispatchNotification); 2047 } 2048 } 2049 2050 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) 2051 throws Exception { 2052 QueueMessageReference message = null; 2053 MessageId messageId = messageDispatchNotification.getMessageId(); 2054 2055 pagedInPendingDispatchLock.writeLock().lock(); 2056 try { 2057 for (MessageReference ref : pagedInPendingDispatch) { 2058 if (messageId.equals(ref.getMessageId())) { 2059 message = (QueueMessageReference)ref; 2060 pagedInPendingDispatch.remove(ref); 2061 break; 2062 } 2063 } 2064 } finally { 2065 pagedInPendingDispatchLock.writeLock().unlock(); 2066 } 2067 2068 if (message == null) { 2069 pagedInMessagesLock.readLock().lock(); 2070 try { 2071 message = pagedInMessages.get(messageId); 2072 } finally { 2073 pagedInMessagesLock.readLock().unlock(); 2074 } 2075 } 2076 2077 if (message == null) { 2078 messagesLock.writeLock().lock(); 2079 try { 2080 try { 2081 messages.setMaxBatchSize(getMaxPageSize()); 2082 messages.reset(); 2083 while (messages.hasNext()) { 2084 MessageReference node = messages.next(); 2085 messages.remove(); 2086 if (messageId.equals(node.getMessageId())) { 2087 message = this.createMessageReference(node.getMessage()); 2088 break; 2089 } 2090 } 2091 } finally { 2092 messages.release(); 2093 } 2094 } finally { 2095 messagesLock.writeLock().unlock(); 2096 } 2097 } 2098 2099 if (message == null) { 2100 Message msg = loadMessage(messageId); 2101 if (msg != null) { 2102 message = this.createMessageReference(msg); 2103 } 2104 } 2105 2106 if (message == null) { 2107 throw new JMSException("Slave broker out of sync with master - Message: " 2108 + messageDispatchNotification.getMessageId() + " on " 2109 + messageDispatchNotification.getDestination() + " does not exist among pending(" 2110 + pagedInPendingDispatch.size() + ") for subscription: " 2111 + messageDispatchNotification.getConsumerId()); 2112 } 2113 return message; 2114 } 2115 2116 /** 2117 * Find a consumer that matches the id in the message dispatch notification 2118 * 2119 * @param messageDispatchNotification 2120 * @return sub or null if the subscription has been removed before dispatch 2121 * @throws JMSException 2122 */ 2123 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) 2124 throws JMSException { 2125 Subscription sub = null; 2126 consumersLock.readLock().lock(); 2127 try { 2128 for (Subscription s : consumers) { 2129 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) { 2130 sub = s; 2131 break; 2132 } 2133 } 2134 }finally { 2135 consumersLock.readLock().unlock(); 2136 } 2137 return sub; 2138 } 2139 2140 public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) { 2141 if (oldPercentUsage > newPercentUsage) { 2142 asyncWakeup(); 2143 } 2144 } 2145 2146 @Override 2147 protected Logger getLog() { 2148 return LOG; 2149 } 2150 2151 protected boolean isOptimizeStorage(){ 2152 boolean result = false; 2153 if (isDoOptimzeMessageStorage()){ 2154 consumersLock.readLock().lock(); 2155 try{ 2156 if (consumers.isEmpty()==false){ 2157 result = true; 2158 for (Subscription s : consumers) { 2159 if (s.getPrefetchSize()==0){ 2160 result = false; 2161 break; 2162 } 2163 if (s.isSlowConsumer()){ 2164 result = false; 2165 break; 2166 } 2167 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 2168 result = false; 2169 break; 2170 } 2171 } 2172 } 2173 }finally { 2174 consumersLock.readLock().unlock(); 2175 } 2176 } 2177 return result; 2178 } 2179}