001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.LinkedList; 021 import java.util.concurrent.atomic.AtomicLong; 022 import javax.jms.JMSException; 023 import org.apache.activemq.ActiveMQMessageAudit; 024 import org.apache.activemq.broker.Broker; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 027 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 028 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 029 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 030 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 031 import org.apache.activemq.command.ConsumerControl; 032 import org.apache.activemq.command.ConsumerInfo; 033 import org.apache.activemq.command.Message; 034 import org.apache.activemq.command.MessageAck; 035 import org.apache.activemq.command.MessageDispatch; 036 import org.apache.activemq.command.MessageDispatchNotification; 037 import org.apache.activemq.command.MessagePull; 038 import org.apache.activemq.command.Response; 039 import org.apache.activemq.transaction.Synchronization; 040 import org.apache.activemq.usage.SystemUsage; 041 import org.slf4j.Logger; 042 import org.slf4j.LoggerFactory; 043 044 public class TopicSubscription extends AbstractSubscription { 045 046 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); 047 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 048 049 protected PendingMessageCursor matched; 050 protected final SystemUsage usageManager; 051 protected AtomicLong dispatchedCounter = new AtomicLong(); 052 053 boolean singleDestination = true; 054 Destination destination; 055 056 private int maximumPendingMessages = -1; 057 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 058 private int discarded; 059 private final Object matchedListMutex = new Object(); 060 private final AtomicLong enqueueCounter = new AtomicLong(0); 061 private final AtomicLong dequeueCounter = new AtomicLong(0); 062 private int memoryUsageHighWaterMark = 95; 063 // allow duplicate suppression in a ring network of brokers 064 protected int maxProducersToAudit = 1024; 065 protected int maxAuditDepth = 1000; 066 protected boolean enableAudit = false; 067 protected ActiveMQMessageAudit audit; 068 protected boolean active = false; 069 070 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 071 super(broker, context, info); 072 this.usageManager = usageManager; 073 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 074 if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { 075 this.matched = new VMPendingMessageCursor(false); 076 } else { 077 this.matched = new FilePendingMessageCursor(broker,matchedName,false); 078 } 079 } 080 081 public void init() throws Exception { 082 this.matched.setSystemUsage(usageManager); 083 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 084 this.matched.start(); 085 if (enableAudit) { 086 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); 087 } 088 this.active=true; 089 } 090 091 public void add(MessageReference node) throws Exception { 092 if (isDuplicate(node)) { 093 return; 094 } 095 enqueueCounter.incrementAndGet(); 096 if (!isFull() && matched.isEmpty() && !isSlave()) { 097 // if maximumPendingMessages is set we will only discard messages which 098 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 099 dispatch(node); 100 setSlowConsumer(false); 101 } else { 102 if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) { 103 //we are slow 104 if(!isSlowConsumer()) { 105 LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow"); 106 setSlowConsumer(true); 107 for (Destination dest: destinations) { 108 dest.slowConsumer(getContext(), this); 109 } 110 } 111 } 112 if (maximumPendingMessages != 0) { 113 boolean warnedAboutWait = false; 114 while (active) { 115 synchronized (matchedListMutex) { 116 while (matched.isFull()) { 117 if (getContext().getStopping().get()) { 118 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: " 119 + node.getMessageId()); 120 enqueueCounter.decrementAndGet(); 121 return; 122 } 123 if (!warnedAboutWait) { 124 LOG.info(toString() + ": Pending message cursor [" + matched 125 + "] is full, temp usage (" 126 + +matched.getSystemUsage().getTempUsage().getPercentUsage() 127 + "%) or memory usage (" 128 + matched.getSystemUsage().getMemoryUsage().getPercentUsage() 129 + "%) limit reached, blocking message add() pending the release of resources."); 130 warnedAboutWait = true; 131 } 132 matchedListMutex.wait(20); 133 } 134 //Temporary storage could be full - so just try to add the message 135 //see https://issues.apache.org/activemq/browse/AMQ-2475 136 if (matched.tryAddMessageLast(node, 10)) { 137 break; 138 } 139 } 140 } 141 synchronized (matchedListMutex) { 142 143 // NOTE - be careful about the slaveBroker! 144 if (maximumPendingMessages > 0) { 145 // calculate the high water mark from which point we 146 // will eagerly evict expired messages 147 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 148 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 149 max = maximumPendingMessages; 150 } 151 if (!matched.isEmpty() && matched.size() > max) { 152 removeExpiredMessages(); 153 } 154 // lets discard old messages as we are a slow consumer 155 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 156 int pageInSize = matched.size() - maximumPendingMessages; 157 // only page in a 1000 at a time - else we could 158 // blow da memory 159 pageInSize = Math.max(1000, pageInSize); 160 LinkedList<MessageReference> list = null; 161 MessageReference[] oldMessages=null; 162 synchronized(matched){ 163 list = matched.pageInList(pageInSize); 164 oldMessages = messageEvictionStrategy.evictMessages(list); 165 for (MessageReference ref : list) { 166 ref.decrementReferenceCount(); 167 } 168 } 169 int messagesToEvict = 0; 170 if (oldMessages != null){ 171 messagesToEvict = oldMessages.length; 172 for (int i = 0; i < messagesToEvict; i++) { 173 MessageReference oldMessage = oldMessages[i]; 174 discard(oldMessage); 175 } 176 } 177 // lets avoid an infinite loop if we are given a bad 178 // eviction strategy 179 // for a bad strategy lets just not evict 180 if (messagesToEvict == 0) { 181 LOG.warn("No messages to evict returned for " + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates"); 182 break; 183 } 184 } 185 } 186 } 187 dispatchMatched(); 188 } 189 } 190 } 191 192 private boolean isDuplicate(MessageReference node) { 193 boolean duplicate = false; 194 if (enableAudit && audit != null) { 195 duplicate = audit.isDuplicate(node); 196 if (LOG.isDebugEnabled()) { 197 if (duplicate) { 198 LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId()); 199 } 200 } 201 } 202 return duplicate; 203 } 204 205 /** 206 * Discard any expired messages from the matched list. Called from a 207 * synchronized block. 208 * 209 * @throws IOException 210 */ 211 protected void removeExpiredMessages() throws IOException { 212 try { 213 matched.reset(); 214 while (matched.hasNext()) { 215 MessageReference node = matched.next(); 216 node.decrementReferenceCount(); 217 if (broker.isExpired(node)) { 218 matched.remove(); 219 dispatchedCounter.incrementAndGet(); 220 node.decrementReferenceCount(); 221 node.getRegionDestination().getDestinationStatistics().getExpired().increment(); 222 broker.messageExpired(getContext(), node, this); 223 break; 224 } 225 } 226 } finally { 227 matched.release(); 228 } 229 } 230 231 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 232 synchronized (matchedListMutex) { 233 try { 234 matched.reset(); 235 while (matched.hasNext()) { 236 MessageReference node = matched.next(); 237 node.decrementReferenceCount(); 238 if (node.getMessageId().equals(mdn.getMessageId())) { 239 matched.remove(); 240 dispatchedCounter.incrementAndGet(); 241 node.decrementReferenceCount(); 242 break; 243 } 244 } 245 } finally { 246 matched.release(); 247 } 248 } 249 } 250 251 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 252 // Handle the standard acknowledgment case. 253 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { 254 if (context.isInTransaction()) { 255 context.getTransaction().addSynchronization(new Synchronization() { 256 257 @Override 258 public void afterCommit() throws Exception { 259 synchronized (TopicSubscription.this) { 260 if (singleDestination && destination != null) { 261 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 262 } 263 } 264 dequeueCounter.addAndGet(ack.getMessageCount()); 265 dispatchMatched(); 266 } 267 }); 268 } else { 269 if (singleDestination && destination != null) { 270 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 271 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 272 } 273 dequeueCounter.addAndGet(ack.getMessageCount()); 274 } 275 dispatchMatched(); 276 return; 277 } else if (ack.isDeliveredAck()) { 278 // Message was delivered but not acknowledged: update pre-fetch 279 // counters. 280 // also. get these for a consumer expired message. 281 if (destination != null && !ack.isInTransaction()) { 282 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 283 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 284 } 285 dequeueCounter.addAndGet(ack.getMessageCount()); 286 dispatchMatched(); 287 return; 288 } else if (ack.isRedeliveredAck()) { 289 // nothing to do atm 290 return; 291 } 292 throw new JMSException("Invalid acknowledgment: " + ack); 293 } 294 295 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { 296 // not supported for topics 297 return null; 298 } 299 300 public int getPendingQueueSize() { 301 return matched(); 302 } 303 304 public int getDispatchedQueueSize() { 305 return (int)(dispatchedCounter.get() - dequeueCounter.get()); 306 } 307 308 public int getMaximumPendingMessages() { 309 return maximumPendingMessages; 310 } 311 312 public long getDispatchedCounter() { 313 return dispatchedCounter.get(); 314 } 315 316 public long getEnqueueCounter() { 317 return enqueueCounter.get(); 318 } 319 320 public long getDequeueCounter() { 321 return dequeueCounter.get(); 322 } 323 324 /** 325 * @return the number of messages discarded due to being a slow consumer 326 */ 327 public int discarded() { 328 synchronized (matchedListMutex) { 329 return discarded; 330 } 331 } 332 333 /** 334 * @return the number of matched messages (messages targeted for the 335 * subscription but not yet able to be dispatched due to the 336 * prefetch buffer being full). 337 */ 338 public int matched() { 339 synchronized (matchedListMutex) { 340 return matched.size(); 341 } 342 } 343 344 /** 345 * Sets the maximum number of pending messages that can be matched against 346 * this consumer before old messages are discarded. 347 */ 348 public void setMaximumPendingMessages(int maximumPendingMessages) { 349 this.maximumPendingMessages = maximumPendingMessages; 350 } 351 352 public MessageEvictionStrategy getMessageEvictionStrategy() { 353 return messageEvictionStrategy; 354 } 355 356 /** 357 * Sets the eviction strategy used to decide which message to evict when the 358 * slow consumer needs to discard messages 359 */ 360 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 361 this.messageEvictionStrategy = messageEvictionStrategy; 362 } 363 364 public int getMaxProducersToAudit() { 365 return maxProducersToAudit; 366 } 367 368 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 369 this.maxProducersToAudit = maxProducersToAudit; 370 if (audit != null) { 371 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 372 } 373 } 374 375 public int getMaxAuditDepth() { 376 return maxAuditDepth; 377 } 378 379 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 380 this.maxAuditDepth = maxAuditDepth; 381 if (audit != null) { 382 audit.setAuditDepth(maxAuditDepth); 383 } 384 } 385 386 public boolean isEnableAudit() { 387 return enableAudit; 388 } 389 390 public synchronized void setEnableAudit(boolean enableAudit) { 391 this.enableAudit = enableAudit; 392 if (enableAudit && audit==null) { 393 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 394 } 395 } 396 397 // Implementation methods 398 // ------------------------------------------------------------------------- 399 public boolean isFull() { 400 return getDispatchedQueueSize() >= info.getPrefetchSize(); 401 } 402 403 public int getInFlightSize() { 404 return getDispatchedQueueSize(); 405 } 406 407 408 /** 409 * @return true when 60% or more room is left for dispatching messages 410 */ 411 public boolean isLowWaterMark() { 412 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); 413 } 414 415 /** 416 * @return true when 10% or less room is left for dispatching messages 417 */ 418 public boolean isHighWaterMark() { 419 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); 420 } 421 422 /** 423 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 424 */ 425 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 426 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 427 } 428 429 /** 430 * @return the memoryUsageHighWaterMark 431 */ 432 public int getMemoryUsageHighWaterMark() { 433 return this.memoryUsageHighWaterMark; 434 } 435 436 /** 437 * @return the usageManager 438 */ 439 public SystemUsage getUsageManager() { 440 return this.usageManager; 441 } 442 443 /** 444 * @return the matched 445 */ 446 public PendingMessageCursor getMatched() { 447 return this.matched; 448 } 449 450 /** 451 * @param matched the matched to set 452 */ 453 public void setMatched(PendingMessageCursor matched) { 454 this.matched = matched; 455 } 456 457 /** 458 * inform the MessageConsumer on the client to change it's prefetch 459 * 460 * @param newPrefetch 461 */ 462 public void updateConsumerPrefetch(int newPrefetch) { 463 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 464 ConsumerControl cc = new ConsumerControl(); 465 cc.setConsumerId(info.getConsumerId()); 466 cc.setPrefetch(newPrefetch); 467 context.getConnection().dispatchAsync(cc); 468 } 469 } 470 471 private void dispatchMatched() throws IOException { 472 synchronized (matchedListMutex) { 473 if (!matched.isEmpty() && !isFull()) { 474 try { 475 matched.reset(); 476 477 while (matched.hasNext() && !isFull()) { 478 MessageReference message = matched.next(); 479 message.decrementReferenceCount(); 480 matched.remove(); 481 // Message may have been sitting in the matched list a 482 // while 483 // waiting for the consumer to ak the message. 484 if (message.isExpired()) { 485 discard(message); 486 continue; // just drop it. 487 } 488 dispatch(message); 489 } 490 } finally { 491 matched.release(); 492 } 493 } 494 } 495 } 496 497 private void dispatch(final MessageReference node) throws IOException { 498 Message message = (Message)node; 499 node.incrementReferenceCount(); 500 // Make sure we can dispatch a message. 501 MessageDispatch md = new MessageDispatch(); 502 md.setMessage(message); 503 md.setConsumerId(info.getConsumerId()); 504 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 505 dispatchedCounter.incrementAndGet(); 506 // Keep track if this subscription is receiving messages from a single 507 // destination. 508 if (singleDestination) { 509 if (destination == null) { 510 destination = node.getRegionDestination(); 511 } else { 512 if (destination != node.getRegionDestination()) { 513 singleDestination = false; 514 } 515 } 516 } 517 if (info.isDispatchAsync()) { 518 md.setTransmitCallback(new Runnable() { 519 520 public void run() { 521 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 522 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 523 node.decrementReferenceCount(); 524 } 525 }); 526 context.getConnection().dispatchAsync(md); 527 } else { 528 context.getConnection().dispatchSync(md); 529 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 530 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 531 node.decrementReferenceCount(); 532 } 533 } 534 535 private void discard(MessageReference message) { 536 message.decrementReferenceCount(); 537 matched.remove(message); 538 discarded++; 539 if(destination != null) { 540 destination.getDestinationStatistics().getDequeues().increment(); 541 } 542 if (LOG.isDebugEnabled()) { 543 LOG.debug(this + ", discarding message " + message); 544 } 545 Destination dest = message.getRegionDestination(); 546 if (dest != null) { 547 dest.messageDiscarded(getContext(), this, message); 548 } 549 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this); 550 } 551 552 @Override 553 public String toString() { 554 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 555 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); 556 } 557 558 public void destroy() { 559 this.active=false; 560 synchronized (matchedListMutex) { 561 try { 562 matched.destroy(); 563 } catch (Exception e) { 564 LOG.warn("Failed to destroy cursor", e); 565 } 566 } 567 setSlowConsumer(false); 568 } 569 570 @Override 571 public int getPrefetchSize() { 572 return info.getPrefetchSize(); 573 } 574 575 }