001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.amq;
018
019import java.io.IOException;
020
021import org.apache.activemq.broker.ConnectionContext;
022import org.apache.activemq.command.ActiveMQTopic;
023import org.apache.activemq.command.JournalTopicAck;
024import org.apache.activemq.command.Message;
025import org.apache.activemq.command.MessageAck;
026import org.apache.activemq.command.MessageId;
027import org.apache.activemq.command.SubscriptionInfo;
028import org.apache.activemq.filter.BooleanExpression;
029import org.apache.activemq.filter.MessageEvaluationContext;
030import org.apache.activemq.kaha.impl.async.Location;
031import org.apache.activemq.selector.SelectorParser;
032import org.apache.activemq.store.MessageRecoveryListener;
033import org.apache.activemq.store.TopicMessageStore;
034import org.apache.activemq.store.TopicReferenceStore;
035import org.apache.activemq.transaction.Synchronization;
036import org.apache.activemq.util.IOExceptionSupport;
037import org.apache.activemq.util.SubscriptionKey;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A MessageStore that uses a Journal to store it's messages.
043 * 
044 * 
045 */
046public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
047
048    private static final Logger LOG = LoggerFactory.getLogger(AMQTopicMessageStore.class);
049    private TopicReferenceStore topicReferenceStore;
050    public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
051        super(adapter, topicReferenceStore, destinationName);
052        this.topicReferenceStore = topicReferenceStore;
053    }
054
055    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
056        flush();
057        topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
058    }
059
060    public void recoverNextMessages(String clientId, String subscriptionName,
061            int maxReturned, final MessageRecoveryListener listener)
062            throws Exception {
063        RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
064            topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
065            if (recoveryListener.size() == 0) {
066                flush();
067                topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
068            }
069    }
070
071    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
072        return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
073    }
074
075    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
076        peristenceAdapter.writeCommand(subscriptionInfo, false);
077        topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
078    }
079
080    /**
081     */
082    public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName,
083                            final MessageId messageId, final MessageAck originalAck) throws IOException {
084        final boolean debug = LOG.isDebugEnabled();
085        JournalTopicAck ack = new JournalTopicAck();
086        ack.setDestination(destination);
087        ack.setMessageId(messageId);
088        ack.setMessageSequenceId(messageId.getBrokerSequenceId());
089        ack.setSubscritionName(subscriptionName);
090        ack.setClientId(clientId);
091        ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
092        final Location location = peristenceAdapter.writeCommand(ack, false);
093        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
094        if (!context.isInTransaction()) {
095            if (debug) {
096                LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
097            }
098            acknowledge(context,messageId, location, clientId,subscriptionName);
099        } else {
100            if (debug) {
101                LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
102            }
103            lock.lock();
104            try {
105                inFlightTxLocations.add(location);
106            }finally {
107                lock.unlock();
108            }
109            transactionStore.acknowledge(this, ack, location);
110            context.getTransaction().addSynchronization(new Synchronization() {
111
112                public void afterCommit() throws Exception {
113                    if (debug) {
114                        LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
115                    }
116                    lock.lock();
117                    try {
118                        inFlightTxLocations.remove(location);
119                        acknowledge(context,messageId, location, clientId,subscriptionName);
120                    }finally {
121                        lock.unlock();
122                    }
123                }
124
125                public void afterRollback() throws Exception {
126                    if (debug) {
127                        LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
128                    }
129                    lock.lock();
130                    try{
131                        inFlightTxLocations.remove(location);
132                    }finally {
133                        lock.unlock();
134                    }
135                }
136            });
137        }
138    }
139
140    public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
141        try {
142            SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
143            if (sub != null) {
144                topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null);
145                return true;
146            }
147        } catch (Throwable e) {
148            LOG.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
149        }
150        return false;
151    }
152
153    /**
154     * @param messageId
155     * @param location
156     * @param key
157     * @throws IOException 
158     */
159    protected void acknowledge(final ConnectionContext context, MessageId messageId,
160            Location location, String clientId, String subscriptionName)
161            throws IOException {
162        MessageAck ack = null;
163        lock.lock();
164        try {
165            lastLocation = location;
166        }finally {
167            lock.unlock();
168        }
169        
170            if (topicReferenceStore.acknowledgeReference(context, clientId,
171                    subscriptionName, messageId)) {
172                ack = new MessageAck();
173                ack.setLastMessageId(messageId);
174               
175            }
176        
177        if (ack != null) {
178            removeMessage(context, ack);
179        }
180    }
181
182    /**
183     * @return Returns the longTermStore.
184     */
185    public TopicReferenceStore getTopicReferenceStore() {
186        return topicReferenceStore;
187    }
188
189    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
190        topicReferenceStore.deleteSubscription(clientId, subscriptionName);
191    }
192
193    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
194        return topicReferenceStore.getAllSubscriptions();
195    }
196
197    public int getMessageCount(String clientId, String subscriberName) throws IOException {
198        flush();
199        SubscriptionInfo info = lookupSubscription(clientId, subscriberName);
200        try {
201            MessageCounter counter = new MessageCounter(info, this);
202            topicReferenceStore.recoverSubscription(clientId, subscriberName, counter);
203            return counter.count;
204        } catch (Exception e) {
205            throw IOExceptionSupport.create(e);
206        }
207    }
208    
209    private class MessageCounter implements MessageRecoveryListener {
210        
211        int count = 0;
212        SubscriptionInfo info;
213        BooleanExpression selectorExpression;
214        TopicMessageStore store;
215        
216        public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception {
217            this.info = info;
218            if (info != null) {
219                String selector = info.getSelector();
220                if (selector != null) {
221                    this.selectorExpression = SelectorParser.parse(selector);
222                }
223            }
224            this.store = store;
225        }
226
227        public boolean recoverMessageReference(MessageId ref) throws Exception {
228            if (selectorExpression != null) {
229                MessageEvaluationContext ctx = new MessageEvaluationContext();
230                ctx.setMessageReference(store.getMessage(ref));
231                if (selectorExpression.matches(ctx)) {
232                    count++;
233                }
234            } else {
235                count ++;
236            }
237            return true;
238        }
239
240        public boolean recoverMessage(Message message) throws Exception {
241            if (selectorExpression != null) {
242                MessageEvaluationContext ctx = new MessageEvaluationContext();
243                ctx.setMessageReference(store.getMessage(message.getMessageId()));
244                if (selectorExpression.matches(ctx)) {
245                    count++;
246                }
247            } else {
248                count++;
249            }
250            return true;
251        }
252
253        public boolean isDuplicate(MessageId ref) {
254            return false;
255        }
256
257        public boolean hasSpace() {
258            return true;
259        }
260    }
261
262    public void resetBatching(String clientId, String subscriptionName) {
263        topicReferenceStore.resetBatching(clientId, subscriptionName);
264    }
265}