001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.LinkedList;
021    import java.util.concurrent.atomic.AtomicLong;
022    import javax.jms.JMSException;
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
027    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
028    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
029    import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
030    import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
031    import org.apache.activemq.command.ConsumerControl;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageDispatch;
036    import org.apache.activemq.command.MessageDispatchNotification;
037    import org.apache.activemq.command.MessagePull;
038    import org.apache.activemq.command.Response;
039    import org.apache.activemq.transaction.Synchronization;
040    import org.apache.activemq.usage.SystemUsage;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    public class TopicSubscription extends AbstractSubscription {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
047        private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
048        
049        protected PendingMessageCursor matched;
050        protected final SystemUsage usageManager;
051        protected AtomicLong dispatchedCounter = new AtomicLong();
052           
053        boolean singleDestination = true;
054        Destination destination;
055    
056        private int maximumPendingMessages = -1;
057        private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
058        private int discarded;
059        private final Object matchedListMutex = new Object();
060        private final AtomicLong enqueueCounter = new AtomicLong(0);
061        private final AtomicLong dequeueCounter = new AtomicLong(0);
062        private int memoryUsageHighWaterMark = 95;
063        // allow duplicate suppression in a ring network of brokers
064        protected int maxProducersToAudit = 1024;
065        protected int maxAuditDepth = 1000;
066        protected boolean enableAudit = false;
067        protected ActiveMQMessageAudit audit;
068        protected boolean active = false;
069    
070        public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
071            super(broker, context, info);
072            this.usageManager = usageManager;
073            String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
074            if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
075                this.matched = new VMPendingMessageCursor(false);
076            } else {
077                this.matched = new FilePendingMessageCursor(broker,matchedName,false);
078            }
079        }
080    
081        public void init() throws Exception {
082            this.matched.setSystemUsage(usageManager);
083            this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
084            this.matched.start();
085            if (enableAudit) {
086                audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
087            }
088            this.active=true;
089        }
090    
091        public void add(MessageReference node) throws Exception {
092            if (isDuplicate(node)) {
093                return;
094            }
095            enqueueCounter.incrementAndGet();
096            if (!isFull() && matched.isEmpty()  && !isSlave()) {
097                // if maximumPendingMessages is set we will only discard messages which
098                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
099                dispatch(node);
100                setSlowConsumer(false);
101            } else {
102                if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) {
103                    //we are slow
104                    if(!isSlowConsumer()) {
105                        LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
106                        setSlowConsumer(true);
107                        for (Destination dest: destinations) {
108                            dest.slowConsumer(getContext(), this);
109                        }
110                    }
111                }
112                if (maximumPendingMessages != 0) {
113                    boolean warnedAboutWait = false;
114                    while (active) {
115                        synchronized (matchedListMutex) {
116                            while (matched.isFull()) {
117                                if (getContext().getStopping().get()) {
118                                    LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
119                                            + node.getMessageId());
120                                    enqueueCounter.decrementAndGet();
121                                    return;
122                                }
123                                if (!warnedAboutWait) {
124                                    LOG.info(toString() + ": Pending message cursor [" + matched
125                                            + "] is full, temp usage ("
126                                            + +matched.getSystemUsage().getTempUsage().getPercentUsage()
127                                            + "%) or memory usage ("
128                                            + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
129                                            + "%) limit reached, blocking message add() pending the release of resources.");
130                                    warnedAboutWait = true;
131                                }
132                                matchedListMutex.wait(20);
133                            }
134                            //Temporary storage could be full - so just try to add the message
135                            //see https://issues.apache.org/activemq/browse/AMQ-2475
136                            if (matched.tryAddMessageLast(node, 10)) {
137                                break;
138                            }
139                        }
140                    }
141                    synchronized (matchedListMutex) {
142                        
143                        // NOTE - be careful about the slaveBroker!
144                        if (maximumPendingMessages > 0) {
145                            // calculate the high water mark from which point we
146                            // will eagerly evict expired messages
147                            int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
148                            if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
149                                max = maximumPendingMessages;
150                            }
151                            if (!matched.isEmpty() && matched.size() > max) {
152                                removeExpiredMessages();
153                            }
154                            // lets discard old messages as we are a slow consumer
155                            while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
156                                int pageInSize = matched.size() - maximumPendingMessages;
157                                // only page in a 1000 at a time - else we could
158                                // blow da memory
159                                pageInSize = Math.max(1000, pageInSize);
160                                LinkedList<MessageReference> list = null;
161                                MessageReference[] oldMessages=null;
162                                synchronized(matched){
163                                    list = matched.pageInList(pageInSize);
164                                    oldMessages = messageEvictionStrategy.evictMessages(list);
165                                    for (MessageReference ref : list) {
166                                        ref.decrementReferenceCount();
167                                    }
168                                }
169                                int messagesToEvict = 0;
170                                if (oldMessages != null){
171                                        messagesToEvict = oldMessages.length;
172                                        for (int i = 0; i < messagesToEvict; i++) {
173                                            MessageReference oldMessage = oldMessages[i];
174                                            discard(oldMessage);
175                                        }
176                                }
177                                // lets avoid an infinite loop if we are given a bad
178                                // eviction strategy
179                                // for a bad strategy lets just not evict
180                                if (messagesToEvict == 0) {
181                                    LOG.warn("No messages to evict returned for "  + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
182                                    break;
183                                }
184                            }
185                        }
186                    }
187                    dispatchMatched();
188                }
189            }
190        }
191    
192        private boolean isDuplicate(MessageReference node) {
193            boolean duplicate = false;
194            if (enableAudit && audit != null) {
195                duplicate = audit.isDuplicate(node);
196                if (LOG.isDebugEnabled()) {
197                    if (duplicate) {
198                        LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
199                    }
200                }
201            }
202            return duplicate;
203        }
204    
205        /**
206         * Discard any expired messages from the matched list. Called from a
207         * synchronized block.
208         * 
209         * @throws IOException
210         */
211        protected void removeExpiredMessages() throws IOException {
212            try {
213                matched.reset();
214                while (matched.hasNext()) {
215                    MessageReference node = matched.next();
216                    node.decrementReferenceCount();
217                    if (broker.isExpired(node)) {
218                        matched.remove();
219                        dispatchedCounter.incrementAndGet();
220                        node.decrementReferenceCount();
221                        node.getRegionDestination().getDestinationStatistics().getExpired().increment();
222                        broker.messageExpired(getContext(), node, this);
223                        break;
224                    }
225                }
226            } finally {
227                matched.release();
228            }
229        }
230    
231        public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
232            synchronized (matchedListMutex) {
233                try {
234                    matched.reset();
235                    while (matched.hasNext()) {
236                        MessageReference node = matched.next();
237                        node.decrementReferenceCount();
238                        if (node.getMessageId().equals(mdn.getMessageId())) {
239                            matched.remove();
240                            dispatchedCounter.incrementAndGet();
241                            node.decrementReferenceCount();
242                            break;
243                        }
244                    }
245                } finally {
246                    matched.release();
247                }
248            }
249        }
250    
251        public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
252            // Handle the standard acknowledgment case.
253            if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
254                if (context.isInTransaction()) {
255                    context.getTransaction().addSynchronization(new Synchronization() {
256    
257                        @Override
258                        public void afterCommit() throws Exception {
259                           synchronized (TopicSubscription.this) {
260                                if (singleDestination && destination != null) {
261                                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
262                                }
263                            }
264                            dequeueCounter.addAndGet(ack.getMessageCount());
265                            dispatchMatched();
266                        }
267                    });
268                } else {
269                    if (singleDestination && destination != null) {
270                        destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
271                        destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
272                    }
273                    dequeueCounter.addAndGet(ack.getMessageCount());
274                }
275                dispatchMatched();
276                return;
277            } else if (ack.isDeliveredAck()) {
278                // Message was delivered but not acknowledged: update pre-fetch
279                // counters.
280                // also. get these for a consumer expired message.
281                if (destination != null && !ack.isInTransaction()) {
282                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
283                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
284                }
285                dequeueCounter.addAndGet(ack.getMessageCount());
286                dispatchMatched();
287                return;
288            } else if (ack.isRedeliveredAck()) {
289                // nothing to do atm
290                return;
291            }
292            throw new JMSException("Invalid acknowledgment: " + ack);
293        }
294    
295        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
296            // not supported for topics
297            return null;
298        }
299    
300        public int getPendingQueueSize() {
301            return matched();
302        }
303    
304        public int getDispatchedQueueSize() {
305            return (int)(dispatchedCounter.get() - dequeueCounter.get());
306        }
307    
308        public int getMaximumPendingMessages() {
309            return maximumPendingMessages;
310        }
311    
312        public long getDispatchedCounter() {
313            return dispatchedCounter.get();
314        }
315    
316        public long getEnqueueCounter() {
317            return enqueueCounter.get();
318        }
319    
320        public long getDequeueCounter() {
321            return dequeueCounter.get();
322        }
323    
324        /**
325         * @return the number of messages discarded due to being a slow consumer
326         */
327        public int discarded() {
328            synchronized (matchedListMutex) {
329                return discarded;
330            }
331        }
332    
333        /**
334         * @return the number of matched messages (messages targeted for the
335         *         subscription but not yet able to be dispatched due to the
336         *         prefetch buffer being full).
337         */
338        public int matched() {
339            synchronized (matchedListMutex) {
340                return matched.size();
341            }
342        }
343    
344        /**
345         * Sets the maximum number of pending messages that can be matched against
346         * this consumer before old messages are discarded.
347         */
348        public void setMaximumPendingMessages(int maximumPendingMessages) {
349            this.maximumPendingMessages = maximumPendingMessages;
350        }
351    
352        public MessageEvictionStrategy getMessageEvictionStrategy() {
353            return messageEvictionStrategy;
354        }
355    
356        /**
357         * Sets the eviction strategy used to decide which message to evict when the
358         * slow consumer needs to discard messages
359         */
360        public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
361            this.messageEvictionStrategy = messageEvictionStrategy;
362        }
363    
364        public int getMaxProducersToAudit() {
365            return maxProducersToAudit;
366        }
367    
368        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
369            this.maxProducersToAudit = maxProducersToAudit;
370            if (audit != null) {
371                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
372            }
373        }
374    
375        public int getMaxAuditDepth() {
376            return maxAuditDepth;
377        }
378        
379        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
380            this.maxAuditDepth = maxAuditDepth;
381            if (audit != null) {
382                audit.setAuditDepth(maxAuditDepth);
383            }
384        }
385        
386        public boolean isEnableAudit() {
387            return enableAudit;
388        }
389    
390        public synchronized void setEnableAudit(boolean enableAudit) {
391            this.enableAudit = enableAudit;
392            if (enableAudit && audit==null) {
393                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
394            }
395        }
396        
397        // Implementation methods
398        // -------------------------------------------------------------------------
399        public boolean isFull() {
400            return getDispatchedQueueSize()  >= info.getPrefetchSize();
401        }
402        
403        public int getInFlightSize() {
404            return getDispatchedQueueSize();
405        }
406        
407        
408        /**
409         * @return true when 60% or more room is left for dispatching messages
410         */
411        public boolean isLowWaterMark() {
412            return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
413        }
414    
415        /**
416         * @return true when 10% or less room is left for dispatching messages
417         */
418        public boolean isHighWaterMark() {
419            return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
420        }
421    
422        /**
423         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
424         */
425        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
426            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
427        }
428    
429        /**
430         * @return the memoryUsageHighWaterMark
431         */
432        public int getMemoryUsageHighWaterMark() {
433            return this.memoryUsageHighWaterMark;
434        }
435    
436        /**
437         * @return the usageManager
438         */
439        public SystemUsage getUsageManager() {
440            return this.usageManager;
441        }
442    
443        /**
444         * @return the matched
445         */
446        public PendingMessageCursor getMatched() {
447            return this.matched;
448        }
449    
450        /**
451         * @param matched the matched to set
452         */
453        public void setMatched(PendingMessageCursor matched) {
454            this.matched = matched;
455        }
456    
457        /**
458         * inform the MessageConsumer on the client to change it's prefetch
459         * 
460         * @param newPrefetch
461         */
462        public void updateConsumerPrefetch(int newPrefetch) {
463            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
464                ConsumerControl cc = new ConsumerControl();
465                cc.setConsumerId(info.getConsumerId());
466                cc.setPrefetch(newPrefetch);
467                context.getConnection().dispatchAsync(cc);
468            }
469        }
470    
471        private void dispatchMatched() throws IOException {       
472            synchronized (matchedListMutex) {
473                if (!matched.isEmpty() && !isFull()) {
474                    try {
475                        matched.reset();
476                       
477                        while (matched.hasNext() && !isFull()) {
478                            MessageReference message = matched.next();
479                            message.decrementReferenceCount();
480                            matched.remove();
481                            // Message may have been sitting in the matched list a
482                            // while
483                            // waiting for the consumer to ak the message.
484                            if (message.isExpired()) {
485                                discard(message);
486                                continue; // just drop it.
487                            }
488                            dispatch(message);
489                        }
490                    } finally {
491                        matched.release();
492                    }
493                }
494            }
495        }
496    
497        private void dispatch(final MessageReference node) throws IOException {
498            Message message = (Message)node;
499            node.incrementReferenceCount();
500            // Make sure we can dispatch a message.
501            MessageDispatch md = new MessageDispatch();
502            md.setMessage(message);
503            md.setConsumerId(info.getConsumerId());
504            md.setDestination(node.getRegionDestination().getActiveMQDestination());
505            dispatchedCounter.incrementAndGet();
506            // Keep track if this subscription is receiving messages from a single
507            // destination.
508            if (singleDestination) {
509                if (destination == null) {
510                    destination = node.getRegionDestination();
511                } else {
512                    if (destination != node.getRegionDestination()) {
513                        singleDestination = false;
514                    }
515                }
516            }
517            if (info.isDispatchAsync()) {
518                md.setTransmitCallback(new Runnable() {
519    
520                    public void run() {
521                        node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
522                        node.getRegionDestination().getDestinationStatistics().getInflight().increment();
523                        node.decrementReferenceCount();
524                    }
525                });
526                context.getConnection().dispatchAsync(md);
527            } else {
528                context.getConnection().dispatchSync(md);
529                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
530                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
531                node.decrementReferenceCount();
532            }
533        }
534    
535        private void discard(MessageReference message) {
536            message.decrementReferenceCount();
537            matched.remove(message);
538            discarded++;
539            if(destination != null) {
540                destination.getDestinationStatistics().getDequeues().increment();
541            }
542            if (LOG.isDebugEnabled()) {
543                LOG.debug(this + ", discarding message " + message);
544            }
545            Destination dest = message.getRegionDestination();
546            if (dest != null) {
547                dest.messageDiscarded(getContext(), this, message);
548            }
549            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
550        }
551    
552        @Override
553        public String toString() {
554            return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
555                   + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
556        }
557    
558        public void destroy() {
559            this.active=false;
560            synchronized (matchedListMutex) {
561                try {
562                    matched.destroy();
563                } catch (Exception e) {
564                    LOG.warn("Failed to destroy cursor", e);
565                }
566            }
567            setSlowConsumer(false);
568        }
569    
570        @Override
571        public int getPrefetchSize() {
572            return info.getPrefetchSize();
573        }
574    
575    }