001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.Arrays;
022    import java.util.Iterator;
023    import java.util.LinkedHashMap;
024    import java.util.Map;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.locks.ReentrantReadWriteLock;
027    
028    import org.apache.activemq.ActiveMQMessageAudit;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQTopic;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.command.MessageAck;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.SubscriptionInfo;
036    import org.apache.activemq.store.MessageRecoveryListener;
037    import org.apache.activemq.store.TopicMessageStore;
038    import org.apache.activemq.util.ByteSequence;
039    import org.apache.activemq.util.IOExceptionSupport;
040    import org.apache.activemq.wireformat.WireFormat;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * 
046     */
047    public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
048    
049        private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
050        private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
051    
052        public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
053        private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
054                   PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
055        private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
056        private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
057             protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
058               return size() > SEQUENCE_ID_CACHE_SIZE;
059            }
060        };
061    
062    
063        public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
064            super(persistenceAdapter, adapter, wireFormat, topic, audit);
065        }
066    
067        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
068            if (ack != null && ack.isUnmatchedAck()) {
069                if (LOG.isTraceEnabled()) {
070                    LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
071                }
072                return;
073            }
074            TransactionContext c = persistenceAdapter.getTransactionContext(context);
075            try {
076                long[] res = getCachedStoreSequenceId(c, destination, messageId);
077                if (this.isPrioritizedMessages()) {
078                    adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
079                } else {
080                    adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
081                }
082                if (LOG.isTraceEnabled()) {
083                    LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
084                }
085            } catch (SQLException e) {
086                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
087                throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
088            } finally {
089                c.close();
090            }
091        }
092    
093        private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
094            long[] val = null;
095            sequenceIdCacheSizeLock.readLock().lock();
096            try {
097                val = sequenceIdCache.get(messageId);
098            } finally {
099                sequenceIdCacheSizeLock.readLock().unlock();
100            }
101            if (val == null) {
102                val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
103            }
104            return val;
105        }
106    
107        /**
108         * @throws Exception
109         */
110        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
111            TransactionContext c = persistenceAdapter.getTransactionContext();
112            try {
113                adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
114                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
115                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
116                        msg.getMessageId().setBrokerSequenceId(sequenceId);
117                        return listener.recoverMessage(msg);
118                    }
119    
120                    public boolean recoverMessageReference(String reference) throws Exception {
121                        return listener.recoverMessageReference(new MessageId(reference));
122                    }
123    
124                });
125            } catch (SQLException e) {
126                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127                throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
128            } finally {
129                c.close();
130            }
131        }
132    
133        private class LastRecovered implements Iterable<LastRecoveredEntry> {
134            LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
135            LastRecovered() {
136                for (int i=0; i<perPriority.length; i++) {
137                    perPriority[i] = new LastRecoveredEntry(i);
138                }
139            }
140    
141            public void updateStored(long sequence, int priority) {
142                perPriority[priority].stored = sequence;
143            }
144    
145            public LastRecoveredEntry defaultPriority() {
146                return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
147            }
148    
149            public String toString() {
150                return Arrays.deepToString(perPriority);
151            }
152    
153            public Iterator<LastRecoveredEntry> iterator() {
154                return new PriorityIterator();
155            }
156    
157            class PriorityIterator implements Iterator<LastRecoveredEntry> {
158                int current = 9;
159                public boolean hasNext() {
160                    for (int i=current; i>=0; i--) {
161                        if (perPriority[i].hasMessages()) {
162                            current = i;
163                            return true;
164                        }
165                    }
166                    return false;
167                }
168    
169                public LastRecoveredEntry next() {
170                    return perPriority[current];
171                }
172    
173                public void remove() {
174                    throw new RuntimeException("not implemented");
175                }
176            }
177        }
178    
179        private class LastRecoveredEntry {
180            final int priority;
181            long recovered = 0;
182            long stored = Integer.MAX_VALUE;
183    
184            public LastRecoveredEntry(int priority) {
185                this.priority = priority;
186            }
187    
188            public String toString() {
189                return priority + "-" + stored + ":" + recovered;
190            }
191    
192            public void exhausted() {
193                stored = recovered;
194            }
195    
196            public boolean hasMessages() {
197                return stored > recovered;
198            }
199        }
200    
201        class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
202            final MessageRecoveryListener delegate;
203            final int maxMessages;
204            LastRecoveredEntry lastRecovered;
205            int recoveredCount;
206            int recoveredMarker;
207    
208            public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
209                this.delegate = delegate;
210                this.maxMessages = maxMessages;
211            }
212    
213            public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
214                if (delegate.hasSpace() && recoveredCount < maxMessages) {
215                    Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
216                    msg.getMessageId().setBrokerSequenceId(sequenceId);
217                    lastRecovered.recovered = sequenceId;
218                    if (delegate.recoverMessage(msg)) {
219                        recoveredCount++;
220                        return true;
221                    }
222                }
223                return false;
224            }
225    
226            public boolean recoverMessageReference(String reference) throws Exception {
227                return delegate.recoverMessageReference(new MessageId(reference));
228            }
229    
230            public void setLastRecovered(LastRecoveredEntry lastRecovered) {
231                this.lastRecovered = lastRecovered;
232                recoveredMarker = recoveredCount;
233            }
234    
235            public boolean complete() {
236                return  !delegate.hasSpace() || recoveredCount == maxMessages;
237            }
238    
239            public boolean stalled() {
240                return recoveredMarker == recoveredCount;
241            }
242        }
243    
244        public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
245                throws Exception {
246            //Duration duration = new Duration("recoverNextMessages");
247            TransactionContext c = persistenceAdapter.getTransactionContext();
248    
249            String key = getSubscriptionKey(clientId, subscriptionName);
250            if (!subscriberLastRecoveredMap.containsKey(key)) {
251               subscriberLastRecoveredMap.put(key, new LastRecovered());
252            }
253            final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
254            LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
255            try {
256                if (LOG.isTraceEnabled()) {
257                    LOG.trace(key + " existing last recovered: " + lastRecovered);
258                }
259                if (isPrioritizedMessages()) {
260                    Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
261                    for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
262                        LastRecoveredEntry entry = it.next();
263                        recoveredAwareListener.setLastRecovered(entry);
264                        //Duration microDuration = new Duration("recoverNextMessages:loop");
265                        adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
266                            entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
267                        //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
268                        if (recoveredAwareListener.stalled()) {
269                            if (recoveredAwareListener.complete()) {
270                                break;
271                            } else {
272                                entry.exhausted();
273                            }
274                        }
275                    }
276                } else {
277                    LastRecoveredEntry last = lastRecovered.defaultPriority();
278                    recoveredAwareListener.setLastRecovered(last);
279                    adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
280                            last.recovered, 0, maxReturned, recoveredAwareListener);
281                }
282                if (LOG.isTraceEnabled()) {
283                    LOG.trace(key + " last recovered: " + lastRecovered);
284                }
285                //duration.end();
286            } catch (SQLException e) {
287                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
288            } finally {
289                c.close();
290            }
291        }
292    
293        public void resetBatching(String clientId, String subscriptionName) {
294            subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
295        }
296    
297        protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
298            // update last recovered state
299            for (LastRecovered last : subscriberLastRecoveredMap.values()) {
300                last.updateStored(sequenceId, priority);
301            }
302            sequenceIdCacheSizeLock.writeLock().lock();
303            try {
304                sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
305            } finally {
306                sequenceIdCacheSizeLock.writeLock().unlock();
307            }
308        }
309    
310    
311        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
312            TransactionContext c = persistenceAdapter.getTransactionContext();
313            try {
314                c = persistenceAdapter.getTransactionContext();
315                adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
316            } catch (SQLException e) {
317                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
318                throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
319            } finally {
320                c.close();
321            }
322        }
323    
324        /**
325         * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
326         *      String)
327         */
328        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
329            TransactionContext c = persistenceAdapter.getTransactionContext();
330            try {
331                return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
332            } catch (SQLException e) {
333                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
334                throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
335            } finally {
336                c.close();
337            }
338        }
339    
340        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
341            TransactionContext c = persistenceAdapter.getTransactionContext();
342            try {
343                adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
344            } catch (SQLException e) {
345                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
346                throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
347            } finally {
348                c.close();
349                resetBatching(clientId, subscriptionName);
350            }
351        }
352    
353        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
354            TransactionContext c = persistenceAdapter.getTransactionContext();
355            try {
356                return adapter.doGetAllSubscriptions(c, destination);
357            } catch (SQLException e) {
358                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
359                throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
360            } finally {
361                c.close();
362            }
363        }
364    
365        public int getMessageCount(String clientId, String subscriberName) throws IOException {
366            //Duration duration = new Duration("getMessageCount");
367            int result = 0;
368            TransactionContext c = persistenceAdapter.getTransactionContext();
369            try {
370                result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
371            } catch (SQLException e) {
372                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
373                throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
374            } finally {
375                c.close();
376            }
377            if (LOG.isTraceEnabled()) {
378                LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
379            }
380            //duration.end();
381            return result;
382        }
383    
384        protected String getSubscriptionKey(String clientId, String subscriberName) {
385            String result = clientId + ":";
386            result += subscriberName != null ? subscriberName : "NOT_SET";
387            return result;
388        }
389    
390    }