001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.concurrent.ConcurrentHashMap;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    
025    import javax.jms.InvalidSelectorException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.broker.Broker;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
032    import org.apache.activemq.command.ActiveMQDestination;
033    import org.apache.activemq.command.ConsumerInfo;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.apache.activemq.command.MessageId;
038    import org.apache.activemq.store.TopicMessageStore;
039    import org.apache.activemq.usage.SystemUsage;
040    import org.apache.activemq.usage.Usage;
041    import org.apache.activemq.usage.UsageListener;
042    import org.apache.activemq.util.SubscriptionKey;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
047    
048        private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
049        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
050        private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
051        private final SubscriptionKey subscriptionKey;
052        private final boolean keepDurableSubsActive;
053        private AtomicBoolean active = new AtomicBoolean();
054        private AtomicLong offlineTimestamp = new AtomicLong(-1);
055    
056        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
057            throws JMSException {
058            super(broker,usageManager, context, info);
059            this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
060            this.pending.setSystemUsage(usageManager);
061            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
062            this.keepDurableSubsActive = keepDurableSubsActive;
063            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
064    
065        }
066    
067        public final boolean isActive() {
068            return active.get();
069        }
070    
071        public final long getOfflineTimestamp() {
072            return offlineTimestamp.get();
073        }
074    
075        public boolean isFull() {
076            return !active.get() || super.isFull();
077        }
078    
079        public void gc() {
080        }
081    
082        /**
083         * store will have a pending ack for all durables, irrespective of the selector
084         * so we need to ack if node is un-matched
085         */
086        public void unmatched(MessageReference node) throws IOException {
087            MessageAck ack = new MessageAck();
088            ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
089            ack.setMessageID(node.getMessageId());
090            node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
091        }
092    
093        @Override
094        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
095            // statically configured via maxPageSize
096        }
097    
098        public void add(ConnectionContext context, Destination destination) throws Exception {
099            if (!destinations.contains(destination)) {
100                super.add(context, destination);
101            }
102            // do it just once per destination
103            if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
104                 return;
105            }
106            durableDestinations.put(destination.getActiveMQDestination(), destination);
107    
108            if (active.get() || keepDurableSubsActive) {
109                Topic topic = (Topic)destination;
110                topic.activate(context, this);
111                if (pending.isEmpty(topic)) {
112                    topic.recoverRetroactiveMessages(context, this);
113                }
114                this.enqueueCounter+=pending.size();
115            } else if (destination.getMessageStore() != null) {
116                TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
117                try {
118                    this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
119                } catch (IOException e) {
120                    JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store "+ e);
121                    jmsEx.setLinkedException(e);
122                    throw jmsEx;
123                }
124            }
125            dispatchPending();
126        }
127    
128        public void activate(SystemUsage memoryManager, ConnectionContext context,
129                ConsumerInfo info) throws Exception {
130            if (!active.get()) {
131                this.context = context;
132                this.info = info;
133                LOG.debug("Activating " + this);
134                if (!keepDurableSubsActive) {
135                    for (Iterator<Destination> iter = durableDestinations.values()
136                            .iterator(); iter.hasNext();) {
137                        Topic topic = (Topic) iter.next();
138                        add(context, topic);
139                        topic.activate(context, this);
140                    }
141                }
142                synchronized (pendingLock) {
143                    pending.setSystemUsage(memoryManager);
144                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
145                    pending.setMaxAuditDepth(getMaxAuditDepth());
146                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
147                    pending.start();
148                    // If nothing was in the persistent store, then try to use the
149                    // recovery policy.
150                    if (pending.isEmpty()) {
151                        for (Iterator<Destination> iter = durableDestinations.values()
152                                .iterator(); iter.hasNext();) {
153                            Topic topic = (Topic) iter.next();
154                            topic.recoverRetroactiveMessages(context, this);
155                        }
156                    }
157                }
158                this.active.set(true);
159                this.offlineTimestamp.set(-1);
160                dispatchPending();
161                this.usageManager.getMemoryUsage().addUsageListener(this);
162            }
163        }
164    
165        public void deactivate(boolean keepDurableSubsActive) throws Exception {
166            LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
167            active.set(false);
168            offlineTimestamp.set(System.currentTimeMillis());
169            this.usageManager.getMemoryUsage().removeUsageListener(this);
170            synchronized (pendingLock) {
171                pending.stop();
172    
173                synchronized (dispatchLock) {
174                    for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
175                        Topic topic = (Topic)iter.next();
176                        if (!keepDurableSubsActive) {
177                            topic.deactivate(context, this);
178                        } else {
179                            topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
180                        }
181                    }
182    
183                    for (final MessageReference node : dispatched) {
184                        // Mark the dispatched messages as redelivered for next time.
185                        Integer count = redeliveredMessages.get(node.getMessageId());
186                        if (count != null) {
187                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
188                        } else {
189                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
190                        }
191                        if (keepDurableSubsActive && pending.isTransient()) {
192                            pending.addMessageFirst(node);
193                            pending.rollback(node.getMessageId());
194                        } else {
195                            node.decrementReferenceCount();
196                        }
197                    }
198                    dispatched.clear();
199                }
200                if (!keepDurableSubsActive && pending.isTransient()) {
201                    try {
202                        pending.reset();
203                        while (pending.hasNext()) {
204                            MessageReference node = pending.next();
205                            node.decrementReferenceCount();
206                            pending.remove();
207                        }
208                    } finally {
209                        pending.release();
210                    }
211                }
212            }
213            prefetchExtension.set(0);
214        }
215    
216    
217        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
218            MessageDispatch md = super.createMessageDispatch(node, message);
219            if (node != QueueMessageReference.NULL_MESSAGE) {
220                Integer count = redeliveredMessages.get(node.getMessageId());
221                if (count != null) {
222                    md.setRedeliveryCounter(count.intValue());
223                }
224            }
225            return md;
226        }
227    
228        public void add(MessageReference node) throws Exception {
229            if (!active.get() && !keepDurableSubsActive) {
230                return;
231            }
232            super.add(node);
233        }
234    
235        protected void dispatchPending() throws IOException {
236            if (isActive()) {
237                super.dispatchPending();
238            }
239        }
240    
241        public void removePending(MessageReference node) throws IOException {
242            pending.remove(node);
243        }
244    
245        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
246            synchronized(pending) {
247                pending.addRecoveredMessage(message);
248            }
249        }
250    
251        public int getPendingQueueSize() {
252            if (active.get() || keepDurableSubsActive) {
253                return super.getPendingQueueSize();
254            }
255            // TODO: need to get from store
256            return 0;
257        }
258    
259        public void setSelector(String selector) throws InvalidSelectorException {
260            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
261        }
262    
263        protected boolean canDispatch(MessageReference node) {
264            return isActive();
265        }
266    
267        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
268            node.getRegionDestination().acknowledge(context, this, ack, node);
269            redeliveredMessages.remove(node.getMessageId());
270            node.decrementReferenceCount();
271        }
272    
273        public synchronized String toString() {
274            return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
275                   + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
276        }
277    
278        public SubscriptionKey getSubscriptionKey() {
279            return subscriptionKey;
280        }
281    
282        /**
283         * Release any references that we are holding.
284         */
285        public void destroy() {
286            synchronized (pendingLock) {
287                try {
288    
289                    pending.reset();
290                    while (pending.hasNext()) {
291                        MessageReference node = pending.next();
292                        node.decrementReferenceCount();
293                    }
294    
295                } finally {
296                    pending.release();
297                    pending.clear();
298                }
299            }
300            synchronized  (dispatchLock) {
301                for (MessageReference node : dispatched) {
302                    node.decrementReferenceCount();
303                }
304                dispatched.clear();
305            }
306            setSlowConsumer(false);
307        }
308    
309        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
310            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
311                try {
312                    dispatchPending();
313                } catch (IOException e) {
314                    LOG.warn("problem calling dispatchMatched", e);
315                }
316            }
317        }
318    
319        protected boolean isDropped(MessageReference node) {
320           return false;
321        }
322    
323        public boolean isKeepDurableSubsActive() {
324            return keepDurableSubsActive;
325        }
326    }