001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.*;
020    import java.util.concurrent.ConcurrentHashMap;
021    import javax.jms.InvalidDestinationException;
022    import javax.jms.JMSException;
023    import org.apache.activemq.advisory.AdvisorySupport;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.region.policy.PolicyEntry;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.ConnectionId;
028    import org.apache.activemq.command.ConsumerId;
029    import org.apache.activemq.command.ConsumerInfo;
030    import org.apache.activemq.command.RemoveSubscriptionInfo;
031    import org.apache.activemq.command.SessionId;
032    import org.apache.activemq.command.SubscriptionInfo;
033    import org.apache.activemq.store.TopicMessageStore;
034    import org.apache.activemq.thread.TaskRunnerFactory;
035    import org.apache.activemq.usage.SystemUsage;
036    import org.apache.activemq.util.LongSequenceGenerator;
037    import org.apache.activemq.util.SubscriptionKey;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     *
043     */
044    public class TopicRegion extends AbstractRegion {
045        private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
046        protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
047        private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
048        private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
049        private boolean keepDurableSubsActive;
050    
051        private Timer cleanupTimer;
052        private TimerTask cleanupTask;
053    
054        public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
055                           DestinationFactory destinationFactory) {
056            super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
057            if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
058                this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
059                this.cleanupTask = new TimerTask() {
060                    public void run() {
061                        doCleanup();
062                    }
063                };
064                this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
065            }
066        }
067    
068        @Override
069        public void stop() throws Exception {
070            super.stop();
071            if (cleanupTimer != null) {
072                cleanupTimer.cancel();
073            }
074        }
075    
076        public void doCleanup() {
077            long now = System.currentTimeMillis();
078            for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
079                DurableTopicSubscription sub = entry.getValue();
080                if (!sub.isActive()) {
081                   long offline = sub.getOfflineTimestamp();
082                    if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
083                        LOG.info("Destroying durable subscriber due to inactivity: " + sub);
084                        try {
085                            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
086                            info.setClientId(entry.getKey().getClientId());
087                            info.setSubscriptionName(entry.getKey().getSubscriptionName());
088                            ConnectionContext context = new ConnectionContext();
089                            context.setBroker(broker);
090                            context.setClientId(entry.getKey().getClientId());
091                            removeSubscription(context, info);
092                        } catch (Exception e) {
093                            LOG.error("Failed to remove inactive durable subscriber", e);
094                        }
095                    }
096                }
097            }
098        }
099    
100        @Override
101        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
102            if (info.isDurable()) {
103                ActiveMQDestination destination = info.getDestination();
104                if (!destination.isPattern()) {
105                    // Make sure the destination is created.
106                    lookup(context, destination,true);
107                }
108                String clientId = context.getClientId();
109                String subscriptionName = info.getSubscriptionName();
110                SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
111                DurableTopicSubscription sub = durableSubscriptions.get(key);
112                if (sub != null) {
113                    if (sub.isActive()) {
114                        throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
115                    }
116                    // Has the selector changed??
117                    if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
118                        // Remove the consumer first then add it.
119                        durableSubscriptions.remove(key);
120                        destinationsLock.readLock().lock();
121                        try {
122                            for (Destination dest : destinations.values()) {
123                                //Account for virtual destinations
124                                if (dest instanceof Topic){
125                                    Topic topic = (Topic)dest;
126                                    topic.deleteSubscription(context, key);
127                                }
128                            }
129                        } finally {
130                            destinationsLock.readLock().unlock();
131                        }
132                        super.removeConsumer(context, sub.getConsumerInfo());
133                        super.addConsumer(context, info);
134                        sub = durableSubscriptions.get(key);
135                    } else {
136                        // Change the consumer id key of the durable sub.
137                        if (sub.getConsumerInfo().getConsumerId() != null) {
138                            subscriptions.remove(sub.getConsumerInfo().getConsumerId());
139                        }
140                        subscriptions.put(info.getConsumerId(), sub);
141                    }
142                } else {
143                    super.addConsumer(context, info);
144                    sub = durableSubscriptions.get(key);
145                    if (sub == null) {
146                        throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
147                                               + " subscriberName: " + key.getSubscriptionName());
148                    }
149                }
150                sub.activate(usageManager, context, info);
151                return sub;
152            } else {
153                return super.addConsumer(context, info);
154            }
155        }
156    
157        @Override
158        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
159            if (info.isDurable()) {
160    
161                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
162                DurableTopicSubscription sub = durableSubscriptions.get(key);
163                if (sub != null) {
164                    sub.deactivate(keepDurableSubsActive);
165                }
166    
167            } else {
168                super.removeConsumer(context, info);
169            }
170        }
171    
172        @Override
173        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
174            SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
175            DurableTopicSubscription sub = durableSubscriptions.get(key);
176            if (sub == null) {
177                throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
178            }
179            if (sub.isActive()) {
180                throw new JMSException("Durable consumer is in use");
181            } else {
182                durableSubscriptions.remove(key);
183            }
184    
185            destinationsLock.readLock().lock();
186            try {
187                for (Destination dest : destinations.values()) {
188                    //Account for virtual destinations
189                    if (dest instanceof Topic){
190                        Topic topic = (Topic)dest;
191                        topic.deleteSubscription(context, key);
192                    }
193                }
194            } finally {
195                destinationsLock.readLock().unlock();
196            }
197    
198            if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) {
199                super.removeConsumer(context, sub.getConsumerInfo());
200            } else {
201                // try destroying inactive subscriptions
202                destroySubscription(sub);
203            }
204        }
205    
206        @Override
207        public String toString() {
208            return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
209        }
210    
211        @Override
212        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
213            List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
214            Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
215    
216            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
217            // Eagerly recover the durable subscriptions
218            if (store != null) {
219                SubscriptionInfo[] infos = store.getAllSubscriptions();
220                for (int i = 0; i < infos.length; i++) {
221    
222                    SubscriptionInfo info = infos[i];
223                    if (LOG.isDebugEnabled()) {
224                        LOG.debug("Restoring durable subscription: " + info);
225                    }
226                    SubscriptionKey key = new SubscriptionKey(info);
227    
228                    // A single durable sub may be subscribing to multiple topics.
229                    // so it might exist already.
230                    DurableTopicSubscription sub = durableSubscriptions.get(key);
231                    ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
232                    if (sub == null) {
233                        ConnectionContext c = new ConnectionContext();
234                        c.setBroker(context.getBroker());
235                        c.setClientId(key.getClientId());
236                        c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
237                        sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
238                    }
239    
240                    if (dupChecker.contains(sub)) {
241                        continue;
242                    }
243    
244                    dupChecker.add(sub);
245                    rc.add(sub);
246                    dest.addSubscription(context, sub);
247                }
248    
249                // Now perhaps there other durable subscriptions (via wild card)
250                // that would match this destination..
251                durableSubscriptions.values();
252                for (DurableTopicSubscription sub : durableSubscriptions.values()) {
253                    // Skip over subscriptions that we allready added..
254                    if (dupChecker.contains(sub)) {
255                        continue;
256                    }
257    
258                    if (sub.matches(dest.getActiveMQDestination())) {
259                        rc.add(sub);
260                        dest.addSubscription(context, sub);
261                    }
262                }
263            }
264            return rc;
265        }
266    
267        public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
268            ConsumerInfo rc = new ConsumerInfo();
269            rc.setSelector(info.getSelector());
270            rc.setSubscriptionName(info.getSubscriptionName());
271            rc.setDestination(info.getSubscribedDestination());
272            rc.setConsumerId(createConsumerId());
273            return rc;
274        }
275    
276        private ConsumerId createConsumerId() {
277            return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
278        }
279    
280        protected void configureTopic(Topic topic, ActiveMQDestination destination) {
281            if (broker.getDestinationPolicy() != null) {
282                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
283                if (entry != null) {
284                    entry.configure(broker,topic);
285                }
286            }
287        }
288    
289        @Override
290        protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
291            ActiveMQDestination destination = info.getDestination();
292    
293            if (info.isDurable()) {
294                if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
295                    throw new JMSException("Cannot create a durable subscription for an advisory Topic");
296                }
297                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
298                DurableTopicSubscription sub = durableSubscriptions.get(key);
299    
300                if (sub == null) {
301    
302                    sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
303    
304                    if (destination != null && broker.getDestinationPolicy() != null) {
305                        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
306                        if (entry != null) {
307                            entry.configure(broker, usageManager, sub);
308                        }
309                    }
310                    durableSubscriptions.put(key, sub);
311                } else {
312                    throw new JMSException("That durable subscription is already active.");
313                }
314                return sub;
315            }
316            try {
317                TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
318                // lets configure the subscription depending on the destination
319                if (destination != null && broker.getDestinationPolicy() != null) {
320                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
321                    if (entry != null) {
322                        entry.configure(broker, usageManager, answer);
323                    }
324                }
325                answer.init();
326                return answer;
327            } catch (Exception e) {
328                LOG.error("Failed to create TopicSubscription ", e);
329                JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
330                jmsEx.setLinkedException(e);
331                throw jmsEx;
332            }
333        }
334    
335        /**
336         */
337        private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
338            if (info1.getSelector() != null ^ info2.getSelector() != null) {
339                return true;
340            }
341            if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
342                return true;
343            }
344            return !info1.getDestination().equals(info2.getDestination());
345        }
346    
347        @Override
348        protected Set<ActiveMQDestination> getInactiveDestinations() {
349            Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
350            for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
351                ActiveMQDestination dest = iter.next();
352                if (!dest.isTopic()) {
353                    iter.remove();
354                }
355            }
356            return inactiveDestinations;
357        }
358    
359        public boolean isKeepDurableSubsActive() {
360            return keepDurableSubsActive;
361        }
362    
363        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
364            this.keepDurableSubsActive = keepDurableSubsActive;
365        }
366    
367        public boolean durableSubscriptionExists(SubscriptionKey key) {
368            return this.durableSubscriptions.containsKey(key);
369        }
370    
371    }