001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.LinkedHashMap;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import org.apache.activemq.ActiveMQMessageAudit;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQTopic;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.store.MessageRecoveryListener;
037import org.apache.activemq.store.TopicMessageStore;
038import org.apache.activemq.util.ByteSequence;
039import org.apache.activemq.util.IOExceptionSupport;
040import org.apache.activemq.wireformat.WireFormat;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * 
046 */
047public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
048
049    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
050    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
051
052    public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
053    private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
054               PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
055    private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
056    private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
057         protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
058           return size() > SEQUENCE_ID_CACHE_SIZE;
059        }
060    };
061
062
063    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
064        super(persistenceAdapter, adapter, wireFormat, topic, audit);
065    }
066
067    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
068        if (ack != null && ack.isUnmatchedAck()) {
069            if (LOG.isTraceEnabled()) {
070                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
071            }
072            return;
073        }
074        TransactionContext c = persistenceAdapter.getTransactionContext(context);
075        try {
076            long[] res = getCachedStoreSequenceId(c, destination, messageId);
077            if (this.isPrioritizedMessages()) {
078                adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
079            } else {
080                adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
081            }
082            if (LOG.isTraceEnabled()) {
083                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
084            }
085        } catch (SQLException e) {
086            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
087            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
088        } finally {
089            c.close();
090        }
091    }
092
093    private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
094        long[] val = null;
095        sequenceIdCacheSizeLock.readLock().lock();
096        try {
097            val = sequenceIdCache.get(messageId);
098        } finally {
099            sequenceIdCacheSizeLock.readLock().unlock();
100        }
101        if (val == null) {
102            val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
103        }
104        return val;
105    }
106
107    /**
108     * @throws Exception
109     */
110    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
111        TransactionContext c = persistenceAdapter.getTransactionContext();
112        try {
113            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
114                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
115                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
116                    msg.getMessageId().setBrokerSequenceId(sequenceId);
117                    return listener.recoverMessage(msg);
118                }
119
120                public boolean recoverMessageReference(String reference) throws Exception {
121                    return listener.recoverMessageReference(new MessageId(reference));
122                }
123
124            });
125        } catch (SQLException e) {
126            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
128        } finally {
129            c.close();
130        }
131    }
132
133    private class LastRecovered implements Iterable<LastRecoveredEntry> {
134        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
135        LastRecovered() {
136            for (int i=0; i<perPriority.length; i++) {
137                perPriority[i] = new LastRecoveredEntry(i);
138            }
139        }
140
141        public void updateStored(long sequence, int priority) {
142            perPriority[priority].stored = sequence;
143        }
144
145        public LastRecoveredEntry defaultPriority() {
146            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
147        }
148
149        public String toString() {
150            return Arrays.deepToString(perPriority);
151        }
152
153        public Iterator<LastRecoveredEntry> iterator() {
154            return new PriorityIterator();
155        }
156
157        class PriorityIterator implements Iterator<LastRecoveredEntry> {
158            int current = 9;
159            public boolean hasNext() {
160                for (int i=current; i>=0; i--) {
161                    if (perPriority[i].hasMessages()) {
162                        current = i;
163                        return true;
164                    }
165                }
166                return false;
167            }
168
169            public LastRecoveredEntry next() {
170                return perPriority[current];
171            }
172
173            public void remove() {
174                throw new RuntimeException("not implemented");
175            }
176        }
177    }
178
179    private class LastRecoveredEntry {
180        final int priority;
181        long recovered = 0;
182        long stored = Integer.MAX_VALUE;
183
184        public LastRecoveredEntry(int priority) {
185            this.priority = priority;
186        }
187
188        public String toString() {
189            return priority + "-" + stored + ":" + recovered;
190        }
191
192        public void exhausted() {
193            stored = recovered;
194        }
195
196        public boolean hasMessages() {
197            return stored > recovered;
198        }
199    }
200
201    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
202        final MessageRecoveryListener delegate;
203        final int maxMessages;
204        LastRecoveredEntry lastRecovered;
205        int recoveredCount;
206        int recoveredMarker;
207
208        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
209            this.delegate = delegate;
210            this.maxMessages = maxMessages;
211        }
212
213        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
214            if (delegate.hasSpace() && recoveredCount < maxMessages) {
215                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
216                msg.getMessageId().setBrokerSequenceId(sequenceId);
217                lastRecovered.recovered = sequenceId;
218                if (delegate.recoverMessage(msg)) {
219                    recoveredCount++;
220                    return true;
221                }
222            }
223            return false;
224        }
225
226        public boolean recoverMessageReference(String reference) throws Exception {
227            return delegate.recoverMessageReference(new MessageId(reference));
228        }
229
230        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
231            this.lastRecovered = lastRecovered;
232            recoveredMarker = recoveredCount;
233        }
234
235        public boolean complete() {
236            return  !delegate.hasSpace() || recoveredCount == maxMessages;
237        }
238
239        public boolean stalled() {
240            return recoveredMarker == recoveredCount;
241        }
242    }
243
244    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
245            throws Exception {
246        //Duration duration = new Duration("recoverNextMessages");
247        TransactionContext c = persistenceAdapter.getTransactionContext();
248
249        String key = getSubscriptionKey(clientId, subscriptionName);
250        if (!subscriberLastRecoveredMap.containsKey(key)) {
251           subscriberLastRecoveredMap.put(key, new LastRecovered());
252        }
253        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
254        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
255        try {
256            if (LOG.isTraceEnabled()) {
257                LOG.trace(key + " existing last recovered: " + lastRecovered);
258            }
259            if (isPrioritizedMessages()) {
260                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
261                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
262                    LastRecoveredEntry entry = it.next();
263                    recoveredAwareListener.setLastRecovered(entry);
264                    //Duration microDuration = new Duration("recoverNextMessages:loop");
265                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
266                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
267                    //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
268                    if (recoveredAwareListener.stalled()) {
269                        if (recoveredAwareListener.complete()) {
270                            break;
271                        } else {
272                            entry.exhausted();
273                        }
274                    }
275                }
276            } else {
277                LastRecoveredEntry last = lastRecovered.defaultPriority();
278                recoveredAwareListener.setLastRecovered(last);
279                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
280                        last.recovered, 0, maxReturned, recoveredAwareListener);
281            }
282            if (LOG.isTraceEnabled()) {
283                LOG.trace(key + " last recovered: " + lastRecovered);
284            }
285            //duration.end();
286        } catch (SQLException e) {
287            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
288        } finally {
289            c.close();
290        }
291    }
292
293    public void resetBatching(String clientId, String subscriptionName) {
294        subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
295    }
296
297    protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
298        // update last recovered state
299        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
300            last.updateStored(sequenceId, priority);
301        }
302        sequenceIdCacheSizeLock.writeLock().lock();
303        try {
304            sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
305        } finally {
306            sequenceIdCacheSizeLock.writeLock().unlock();
307        }
308    }
309
310
311    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
312        TransactionContext c = persistenceAdapter.getTransactionContext();
313        try {
314            c = persistenceAdapter.getTransactionContext();
315            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
316        } catch (SQLException e) {
317            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
318            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
319        } finally {
320            c.close();
321        }
322    }
323
324    /**
325     * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
326     *      String)
327     */
328    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
329        TransactionContext c = persistenceAdapter.getTransactionContext();
330        try {
331            return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
332        } catch (SQLException e) {
333            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
334            throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
335        } finally {
336            c.close();
337        }
338    }
339
340    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
341        TransactionContext c = persistenceAdapter.getTransactionContext();
342        try {
343            adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
344        } catch (SQLException e) {
345            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
346            throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
347        } finally {
348            c.close();
349            resetBatching(clientId, subscriptionName);
350        }
351    }
352
353    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
354        TransactionContext c = persistenceAdapter.getTransactionContext();
355        try {
356            return adapter.doGetAllSubscriptions(c, destination);
357        } catch (SQLException e) {
358            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
359            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
360        } finally {
361            c.close();
362        }
363    }
364
365    public int getMessageCount(String clientId, String subscriberName) throws IOException {
366        //Duration duration = new Duration("getMessageCount");
367        int result = 0;
368        TransactionContext c = persistenceAdapter.getTransactionContext();
369        try {
370            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
371        } catch (SQLException e) {
372            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
373            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
374        } finally {
375            c.close();
376        }
377        if (LOG.isTraceEnabled()) {
378            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
379        }
380        //duration.end();
381        return result;
382    }
383
384    protected String getSubscriptionKey(String clientId, String subscriberName) {
385        String result = clientId + ":";
386        result += subscriberName != null ? subscriberName : "NOT_SET";
387        return result;
388    }
389
390}