001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.*;
020import java.util.concurrent.ConcurrentHashMap;
021import javax.jms.InvalidDestinationException;
022import javax.jms.JMSException;
023import org.apache.activemq.advisory.AdvisorySupport;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.policy.PolicyEntry;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ConnectionId;
028import org.apache.activemq.command.ConsumerId;
029import org.apache.activemq.command.ConsumerInfo;
030import org.apache.activemq.command.RemoveSubscriptionInfo;
031import org.apache.activemq.command.SessionId;
032import org.apache.activemq.command.SubscriptionInfo;
033import org.apache.activemq.store.TopicMessageStore;
034import org.apache.activemq.thread.TaskRunnerFactory;
035import org.apache.activemq.usage.SystemUsage;
036import org.apache.activemq.util.LongSequenceGenerator;
037import org.apache.activemq.util.SubscriptionKey;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *
043 */
044public class TopicRegion extends AbstractRegion {
045    private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
046    protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
047    private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
048    private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
049    private boolean keepDurableSubsActive;
050
051    private Timer cleanupTimer;
052    private TimerTask cleanupTask;
053
054    public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
055                       DestinationFactory destinationFactory) {
056        super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
057        if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
058            this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
059            this.cleanupTask = new TimerTask() {
060                public void run() {
061                    doCleanup();
062                }
063            };
064            this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
065        }
066    }
067
068    @Override
069    public void stop() throws Exception {
070        super.stop();
071        if (cleanupTimer != null) {
072            cleanupTimer.cancel();
073        }
074    }
075
076    public void doCleanup() {
077        long now = System.currentTimeMillis();
078        for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
079            DurableTopicSubscription sub = entry.getValue();
080            if (!sub.isActive()) {
081               long offline = sub.getOfflineTimestamp();
082                if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
083                    LOG.info("Destroying durable subscriber due to inactivity: " + sub);
084                    try {
085                        RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
086                        info.setClientId(entry.getKey().getClientId());
087                        info.setSubscriptionName(entry.getKey().getSubscriptionName());
088                        ConnectionContext context = new ConnectionContext();
089                        context.setBroker(broker);
090                        context.setClientId(entry.getKey().getClientId());
091                        removeSubscription(context, info);
092                    } catch (Exception e) {
093                        LOG.error("Failed to remove inactive durable subscriber", e);
094                    }
095                }
096            }
097        }
098    }
099
100    @Override
101    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
102        if (info.isDurable()) {
103            ActiveMQDestination destination = info.getDestination();
104            if (!destination.isPattern()) {
105                // Make sure the destination is created.
106                lookup(context, destination,true);
107            }
108            String clientId = context.getClientId();
109            String subscriptionName = info.getSubscriptionName();
110            SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
111            DurableTopicSubscription sub = durableSubscriptions.get(key);
112            if (sub != null) {
113                if (sub.isActive()) {
114                    throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
115                }
116                // Has the selector changed??
117                if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
118                    // Remove the consumer first then add it.
119                    durableSubscriptions.remove(key);
120                    destinationsLock.readLock().lock();
121                    try {
122                        for (Destination dest : destinations.values()) {
123                            //Account for virtual destinations
124                            if (dest instanceof Topic){
125                                Topic topic = (Topic)dest;
126                                topic.deleteSubscription(context, key);
127                            }
128                        }
129                    } finally {
130                        destinationsLock.readLock().unlock();
131                    }
132                    super.removeConsumer(context, sub.getConsumerInfo());
133                    super.addConsumer(context, info);
134                    sub = durableSubscriptions.get(key);
135                } else {
136                    // Change the consumer id key of the durable sub.
137                    if (sub.getConsumerInfo().getConsumerId() != null) {
138                        subscriptions.remove(sub.getConsumerInfo().getConsumerId());
139                    }
140                    subscriptions.put(info.getConsumerId(), sub);
141                }
142            } else {
143                super.addConsumer(context, info);
144                sub = durableSubscriptions.get(key);
145                if (sub == null) {
146                    throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
147                                           + " subscriberName: " + key.getSubscriptionName());
148                }
149            }
150            sub.activate(usageManager, context, info);
151            return sub;
152        } else {
153            return super.addConsumer(context, info);
154        }
155    }
156
157    @Override
158    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
159        if (info.isDurable()) {
160
161            SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
162            DurableTopicSubscription sub = durableSubscriptions.get(key);
163            if (sub != null) {
164                sub.deactivate(keepDurableSubsActive);
165            }
166
167        } else {
168            super.removeConsumer(context, info);
169        }
170    }
171
172    @Override
173    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
174        SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
175        DurableTopicSubscription sub = durableSubscriptions.get(key);
176        if (sub == null) {
177            throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
178        }
179        if (sub.isActive()) {
180            throw new JMSException("Durable consumer is in use");
181        } else {
182            durableSubscriptions.remove(key);
183        }
184
185        destinationsLock.readLock().lock();
186        try {
187            for (Destination dest : destinations.values()) {
188                //Account for virtual destinations
189                if (dest instanceof Topic){
190                    Topic topic = (Topic)dest;
191                    topic.deleteSubscription(context, key);
192                }
193            }
194        } finally {
195            destinationsLock.readLock().unlock();
196        }
197
198        if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) {
199            super.removeConsumer(context, sub.getConsumerInfo());
200        } else {
201            // try destroying inactive subscriptions
202            destroySubscription(sub);
203        }
204    }
205
206    @Override
207    public String toString() {
208        return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
209    }
210
211    @Override
212    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
213        List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
214        Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
215
216        TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
217        // Eagerly recover the durable subscriptions
218        if (store != null) {
219            SubscriptionInfo[] infos = store.getAllSubscriptions();
220            for (int i = 0; i < infos.length; i++) {
221
222                SubscriptionInfo info = infos[i];
223                if (LOG.isDebugEnabled()) {
224                    LOG.debug("Restoring durable subscription: " + info);
225                }
226                SubscriptionKey key = new SubscriptionKey(info);
227
228                // A single durable sub may be subscribing to multiple topics.
229                // so it might exist already.
230                DurableTopicSubscription sub = durableSubscriptions.get(key);
231                ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
232                if (sub == null) {
233                    ConnectionContext c = new ConnectionContext();
234                    c.setBroker(context.getBroker());
235                    c.setClientId(key.getClientId());
236                    c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
237                    sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
238                }
239
240                if (dupChecker.contains(sub)) {
241                    continue;
242                }
243
244                dupChecker.add(sub);
245                rc.add(sub);
246                dest.addSubscription(context, sub);
247            }
248
249            // Now perhaps there other durable subscriptions (via wild card)
250            // that would match this destination..
251            durableSubscriptions.values();
252            for (DurableTopicSubscription sub : durableSubscriptions.values()) {
253                // Skip over subscriptions that we allready added..
254                if (dupChecker.contains(sub)) {
255                    continue;
256                }
257
258                if (sub.matches(dest.getActiveMQDestination())) {
259                    rc.add(sub);
260                    dest.addSubscription(context, sub);
261                }
262            }
263        }
264        return rc;
265    }
266
267    public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
268        ConsumerInfo rc = new ConsumerInfo();
269        rc.setSelector(info.getSelector());
270        rc.setSubscriptionName(info.getSubscriptionName());
271        rc.setDestination(info.getSubscribedDestination());
272        rc.setConsumerId(createConsumerId());
273        return rc;
274    }
275
276    private ConsumerId createConsumerId() {
277        return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
278    }
279
280    protected void configureTopic(Topic topic, ActiveMQDestination destination) {
281        if (broker.getDestinationPolicy() != null) {
282            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
283            if (entry != null) {
284                entry.configure(broker,topic);
285            }
286        }
287    }
288
289    @Override
290    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
291        ActiveMQDestination destination = info.getDestination();
292
293        if (info.isDurable()) {
294            if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
295                throw new JMSException("Cannot create a durable subscription for an advisory Topic");
296            }
297            SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
298            DurableTopicSubscription sub = durableSubscriptions.get(key);
299
300            if (sub == null) {
301
302                sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
303
304                if (destination != null && broker.getDestinationPolicy() != null) {
305                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
306                    if (entry != null) {
307                        entry.configure(broker, usageManager, sub);
308                    }
309                }
310                durableSubscriptions.put(key, sub);
311            } else {
312                throw new JMSException("That durable subscription is already active.");
313            }
314            return sub;
315        }
316        try {
317            TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
318            // lets configure the subscription depending on the destination
319            if (destination != null && broker.getDestinationPolicy() != null) {
320                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
321                if (entry != null) {
322                    entry.configure(broker, usageManager, answer);
323                }
324            }
325            answer.init();
326            return answer;
327        } catch (Exception e) {
328            LOG.error("Failed to create TopicSubscription ", e);
329            JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
330            jmsEx.setLinkedException(e);
331            throw jmsEx;
332        }
333    }
334
335    /**
336     */
337    private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
338        if (info1.getSelector() != null ^ info2.getSelector() != null) {
339            return true;
340        }
341        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
342            return true;
343        }
344        return !info1.getDestination().equals(info2.getDestination());
345    }
346
347    @Override
348    protected Set<ActiveMQDestination> getInactiveDestinations() {
349        Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
350        for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
351            ActiveMQDestination dest = iter.next();
352            if (!dest.isTopic()) {
353                iter.remove();
354            }
355        }
356        return inactiveDestinations;
357    }
358
359    public boolean isKeepDurableSubsActive() {
360        return keepDurableSubsActive;
361    }
362
363    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
364        this.keepDurableSubsActive = keepDurableSubsActive;
365    }
366
367    public boolean durableSubscriptionExists(SubscriptionKey key) {
368        return this.durableSubscriptions.containsKey(key);
369    }
370
371}