001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.LinkedList;
021import java.util.concurrent.atomic.AtomicLong;
022import javax.jms.JMSException;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
027import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
028import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
029import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
030import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
031import org.apache.activemq.command.ConsumerControl;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageDispatch;
036import org.apache.activemq.command.MessageDispatchNotification;
037import org.apache.activemq.command.MessagePull;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.usage.SystemUsage;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public class TopicSubscription extends AbstractSubscription {
045
046    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
047    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
048    
049    protected PendingMessageCursor matched;
050    protected final SystemUsage usageManager;
051    protected AtomicLong dispatchedCounter = new AtomicLong();
052       
053    boolean singleDestination = true;
054    Destination destination;
055
056    private int maximumPendingMessages = -1;
057    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
058    private int discarded;
059    private final Object matchedListMutex = new Object();
060    private final AtomicLong enqueueCounter = new AtomicLong(0);
061    private final AtomicLong dequeueCounter = new AtomicLong(0);
062    private int memoryUsageHighWaterMark = 95;
063    // allow duplicate suppression in a ring network of brokers
064    protected int maxProducersToAudit = 1024;
065    protected int maxAuditDepth = 1000;
066    protected boolean enableAudit = false;
067    protected ActiveMQMessageAudit audit;
068    protected boolean active = false;
069
070    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
071        super(broker, context, info);
072        this.usageManager = usageManager;
073        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
074        if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
075            this.matched = new VMPendingMessageCursor(false);
076        } else {
077            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
078        }
079    }
080
081    public void init() throws Exception {
082        this.matched.setSystemUsage(usageManager);
083        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
084        this.matched.start();
085        if (enableAudit) {
086            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
087        }
088        this.active=true;
089    }
090
091    public void add(MessageReference node) throws Exception {
092        if (isDuplicate(node)) {
093            return;
094        }
095        enqueueCounter.incrementAndGet();
096        if (!isFull() && matched.isEmpty()  && !isSlave()) {
097            // if maximumPendingMessages is set we will only discard messages which
098            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
099            dispatch(node);
100            setSlowConsumer(false);
101        } else {
102            if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) {
103                //we are slow
104                if(!isSlowConsumer()) {
105                    LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
106                    setSlowConsumer(true);
107                    for (Destination dest: destinations) {
108                        dest.slowConsumer(getContext(), this);
109                    }
110                }
111            }
112            if (maximumPendingMessages != 0) {
113                boolean warnedAboutWait = false;
114                while (active) {
115                    synchronized (matchedListMutex) {
116                        while (matched.isFull()) {
117                            if (getContext().getStopping().get()) {
118                                LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
119                                        + node.getMessageId());
120                                enqueueCounter.decrementAndGet();
121                                return;
122                            }
123                            if (!warnedAboutWait) {
124                                LOG.info(toString() + ": Pending message cursor [" + matched
125                                        + "] is full, temp usage ("
126                                        + +matched.getSystemUsage().getTempUsage().getPercentUsage()
127                                        + "%) or memory usage ("
128                                        + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
129                                        + "%) limit reached, blocking message add() pending the release of resources.");
130                                warnedAboutWait = true;
131                            }
132                            matchedListMutex.wait(20);
133                        }
134                        //Temporary storage could be full - so just try to add the message
135                        //see https://issues.apache.org/activemq/browse/AMQ-2475
136                        if (matched.tryAddMessageLast(node, 10)) {
137                            break;
138                        }
139                    }
140                }
141                synchronized (matchedListMutex) {
142                    
143                    // NOTE - be careful about the slaveBroker!
144                    if (maximumPendingMessages > 0) {
145                        // calculate the high water mark from which point we
146                        // will eagerly evict expired messages
147                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
148                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
149                            max = maximumPendingMessages;
150                        }
151                        if (!matched.isEmpty() && matched.size() > max) {
152                            removeExpiredMessages();
153                        }
154                        // lets discard old messages as we are a slow consumer
155                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
156                            int pageInSize = matched.size() - maximumPendingMessages;
157                            // only page in a 1000 at a time - else we could
158                            // blow da memory
159                            pageInSize = Math.max(1000, pageInSize);
160                            LinkedList<MessageReference> list = null;
161                            MessageReference[] oldMessages=null;
162                            synchronized(matched){
163                                list = matched.pageInList(pageInSize);
164                                oldMessages = messageEvictionStrategy.evictMessages(list);
165                                for (MessageReference ref : list) {
166                                    ref.decrementReferenceCount();
167                                }
168                            }
169                            int messagesToEvict = 0;
170                            if (oldMessages != null){
171                                    messagesToEvict = oldMessages.length;
172                                    for (int i = 0; i < messagesToEvict; i++) {
173                                        MessageReference oldMessage = oldMessages[i];
174                                        discard(oldMessage);
175                                    }
176                            }
177                            // lets avoid an infinite loop if we are given a bad
178                            // eviction strategy
179                            // for a bad strategy lets just not evict
180                            if (messagesToEvict == 0) {
181                                LOG.warn("No messages to evict returned for "  + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
182                                break;
183                            }
184                        }
185                    }
186                }
187                dispatchMatched();
188            }
189        }
190    }
191
192    private boolean isDuplicate(MessageReference node) {
193        boolean duplicate = false;
194        if (enableAudit && audit != null) {
195            duplicate = audit.isDuplicate(node);
196            if (LOG.isDebugEnabled()) {
197                if (duplicate) {
198                    LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
199                }
200            }
201        }
202        return duplicate;
203    }
204
205    /**
206     * Discard any expired messages from the matched list. Called from a
207     * synchronized block.
208     * 
209     * @throws IOException
210     */
211    protected void removeExpiredMessages() throws IOException {
212        try {
213            matched.reset();
214            while (matched.hasNext()) {
215                MessageReference node = matched.next();
216                node.decrementReferenceCount();
217                if (broker.isExpired(node)) {
218                    matched.remove();
219                    dispatchedCounter.incrementAndGet();
220                    node.decrementReferenceCount();
221                    node.getRegionDestination().getDestinationStatistics().getExpired().increment();
222                    broker.messageExpired(getContext(), node, this);
223                    break;
224                }
225            }
226        } finally {
227            matched.release();
228        }
229    }
230
231    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
232        synchronized (matchedListMutex) {
233            try {
234                matched.reset();
235                while (matched.hasNext()) {
236                    MessageReference node = matched.next();
237                    node.decrementReferenceCount();
238                    if (node.getMessageId().equals(mdn.getMessageId())) {
239                        matched.remove();
240                        dispatchedCounter.incrementAndGet();
241                        node.decrementReferenceCount();
242                        break;
243                    }
244                }
245            } finally {
246                matched.release();
247            }
248        }
249    }
250
251    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
252        // Handle the standard acknowledgment case.
253        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
254            if (context.isInTransaction()) {
255                context.getTransaction().addSynchronization(new Synchronization() {
256
257                    @Override
258                    public void afterCommit() throws Exception {
259                       synchronized (TopicSubscription.this) {
260                            if (singleDestination && destination != null) {
261                                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
262                            }
263                        }
264                        dequeueCounter.addAndGet(ack.getMessageCount());
265                        dispatchMatched();
266                    }
267                });
268            } else {
269                if (singleDestination && destination != null) {
270                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
271                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
272                }
273                dequeueCounter.addAndGet(ack.getMessageCount());
274            }
275            dispatchMatched();
276            return;
277        } else if (ack.isDeliveredAck()) {
278            // Message was delivered but not acknowledged: update pre-fetch
279            // counters.
280            // also. get these for a consumer expired message.
281            if (destination != null && !ack.isInTransaction()) {
282                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
283                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
284            }
285            dequeueCounter.addAndGet(ack.getMessageCount());
286            dispatchMatched();
287            return;
288        } else if (ack.isRedeliveredAck()) {
289            // nothing to do atm
290            return;
291        }
292        throw new JMSException("Invalid acknowledgment: " + ack);
293    }
294
295    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
296        // not supported for topics
297        return null;
298    }
299
300    public int getPendingQueueSize() {
301        return matched();
302    }
303
304    public int getDispatchedQueueSize() {
305        return (int)(dispatchedCounter.get() - dequeueCounter.get());
306    }
307
308    public int getMaximumPendingMessages() {
309        return maximumPendingMessages;
310    }
311
312    public long getDispatchedCounter() {
313        return dispatchedCounter.get();
314    }
315
316    public long getEnqueueCounter() {
317        return enqueueCounter.get();
318    }
319
320    public long getDequeueCounter() {
321        return dequeueCounter.get();
322    }
323
324    /**
325     * @return the number of messages discarded due to being a slow consumer
326     */
327    public int discarded() {
328        synchronized (matchedListMutex) {
329            return discarded;
330        }
331    }
332
333    /**
334     * @return the number of matched messages (messages targeted for the
335     *         subscription but not yet able to be dispatched due to the
336     *         prefetch buffer being full).
337     */
338    public int matched() {
339        synchronized (matchedListMutex) {
340            return matched.size();
341        }
342    }
343
344    /**
345     * Sets the maximum number of pending messages that can be matched against
346     * this consumer before old messages are discarded.
347     */
348    public void setMaximumPendingMessages(int maximumPendingMessages) {
349        this.maximumPendingMessages = maximumPendingMessages;
350    }
351
352    public MessageEvictionStrategy getMessageEvictionStrategy() {
353        return messageEvictionStrategy;
354    }
355
356    /**
357     * Sets the eviction strategy used to decide which message to evict when the
358     * slow consumer needs to discard messages
359     */
360    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
361        this.messageEvictionStrategy = messageEvictionStrategy;
362    }
363
364    public int getMaxProducersToAudit() {
365        return maxProducersToAudit;
366    }
367
368    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
369        this.maxProducersToAudit = maxProducersToAudit;
370        if (audit != null) {
371            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
372        }
373    }
374
375    public int getMaxAuditDepth() {
376        return maxAuditDepth;
377    }
378    
379    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
380        this.maxAuditDepth = maxAuditDepth;
381        if (audit != null) {
382            audit.setAuditDepth(maxAuditDepth);
383        }
384    }
385    
386    public boolean isEnableAudit() {
387        return enableAudit;
388    }
389
390    public synchronized void setEnableAudit(boolean enableAudit) {
391        this.enableAudit = enableAudit;
392        if (enableAudit && audit==null) {
393            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
394        }
395    }
396    
397    // Implementation methods
398    // -------------------------------------------------------------------------
399    public boolean isFull() {
400        return getDispatchedQueueSize()  >= info.getPrefetchSize();
401    }
402    
403    public int getInFlightSize() {
404        return getDispatchedQueueSize();
405    }
406    
407    
408    /**
409     * @return true when 60% or more room is left for dispatching messages
410     */
411    public boolean isLowWaterMark() {
412        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
413    }
414
415    /**
416     * @return true when 10% or less room is left for dispatching messages
417     */
418    public boolean isHighWaterMark() {
419        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
420    }
421
422    /**
423     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
424     */
425    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
426        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
427    }
428
429    /**
430     * @return the memoryUsageHighWaterMark
431     */
432    public int getMemoryUsageHighWaterMark() {
433        return this.memoryUsageHighWaterMark;
434    }
435
436    /**
437     * @return the usageManager
438     */
439    public SystemUsage getUsageManager() {
440        return this.usageManager;
441    }
442
443    /**
444     * @return the matched
445     */
446    public PendingMessageCursor getMatched() {
447        return this.matched;
448    }
449
450    /**
451     * @param matched the matched to set
452     */
453    public void setMatched(PendingMessageCursor matched) {
454        this.matched = matched;
455    }
456
457    /**
458     * inform the MessageConsumer on the client to change it's prefetch
459     * 
460     * @param newPrefetch
461     */
462    public void updateConsumerPrefetch(int newPrefetch) {
463        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
464            ConsumerControl cc = new ConsumerControl();
465            cc.setConsumerId(info.getConsumerId());
466            cc.setPrefetch(newPrefetch);
467            context.getConnection().dispatchAsync(cc);
468        }
469    }
470
471    private void dispatchMatched() throws IOException {       
472        synchronized (matchedListMutex) {
473            if (!matched.isEmpty() && !isFull()) {
474                try {
475                    matched.reset();
476                   
477                    while (matched.hasNext() && !isFull()) {
478                        MessageReference message = matched.next();
479                        message.decrementReferenceCount();
480                        matched.remove();
481                        // Message may have been sitting in the matched list a
482                        // while
483                        // waiting for the consumer to ak the message.
484                        if (message.isExpired()) {
485                            discard(message);
486                            continue; // just drop it.
487                        }
488                        dispatch(message);
489                    }
490                } finally {
491                    matched.release();
492                }
493            }
494        }
495    }
496
497    private void dispatch(final MessageReference node) throws IOException {
498        Message message = (Message)node;
499        node.incrementReferenceCount();
500        // Make sure we can dispatch a message.
501        MessageDispatch md = new MessageDispatch();
502        md.setMessage(message);
503        md.setConsumerId(info.getConsumerId());
504        md.setDestination(node.getRegionDestination().getActiveMQDestination());
505        dispatchedCounter.incrementAndGet();
506        // Keep track if this subscription is receiving messages from a single
507        // destination.
508        if (singleDestination) {
509            if (destination == null) {
510                destination = node.getRegionDestination();
511            } else {
512                if (destination != node.getRegionDestination()) {
513                    singleDestination = false;
514                }
515            }
516        }
517        if (info.isDispatchAsync()) {
518            md.setTransmitCallback(new Runnable() {
519
520                public void run() {
521                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
522                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();
523                    node.decrementReferenceCount();
524                }
525            });
526            context.getConnection().dispatchAsync(md);
527        } else {
528            context.getConnection().dispatchSync(md);
529            node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
530            node.getRegionDestination().getDestinationStatistics().getInflight().increment();
531            node.decrementReferenceCount();
532        }
533    }
534
535    private void discard(MessageReference message) {
536        message.decrementReferenceCount();
537        matched.remove(message);
538        discarded++;
539        if(destination != null) {
540            destination.getDestinationStatistics().getDequeues().increment();
541        }
542        if (LOG.isDebugEnabled()) {
543            LOG.debug(this + ", discarding message " + message);
544        }
545        Destination dest = message.getRegionDestination();
546        if (dest != null) {
547            dest.messageDiscarded(getContext(), this, message);
548        }
549        broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
550    }
551
552    @Override
553    public String toString() {
554        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
555               + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
556    }
557
558    public void destroy() {
559        this.active=false;
560        synchronized (matchedListMutex) {
561            try {
562                matched.destroy();
563            } catch (Exception e) {
564                LOG.warn("Failed to destroy cursor", e);
565            }
566        }
567        setSlowConsumer(false);
568    }
569
570    @Override
571    public int getPrefetchSize() {
572        return info.getPrefetchSize();
573    }
574
575}