001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.Map.Entry;
031    import java.util.concurrent.*;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    import java.util.concurrent.atomic.AtomicInteger;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.broker.region.Destination;
036    import org.apache.activemq.broker.region.RegionBroker;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQQueue;
039    import org.apache.activemq.command.ActiveMQTempQueue;
040    import org.apache.activemq.command.ActiveMQTempTopic;
041    import org.apache.activemq.command.ActiveMQTopic;
042    import org.apache.activemq.command.Message;
043    import org.apache.activemq.command.MessageAck;
044    import org.apache.activemq.command.MessageId;
045    import org.apache.activemq.command.ProducerId;
046    import org.apache.activemq.command.SubscriptionInfo;
047    import org.apache.activemq.command.TransactionId;
048    import org.apache.activemq.openwire.OpenWireFormat;
049    import org.apache.activemq.protobuf.Buffer;
050    import org.apache.activemq.store.AbstractMessageStore;
051    import org.apache.activemq.store.MessageRecoveryListener;
052    import org.apache.activemq.store.MessageStore;
053    import org.apache.activemq.store.PersistenceAdapter;
054    import org.apache.activemq.store.TopicMessageStore;
055    import org.apache.activemq.store.TransactionStore;
056    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
057    import org.apache.activemq.store.kahadb.data.KahaDestination;
058    import org.apache.activemq.store.kahadb.data.KahaLocation;
059    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
060    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
061    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
062    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
063    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
064    import org.apache.activemq.usage.MemoryUsage;
065    import org.apache.activemq.usage.SystemUsage;
066    import org.apache.activemq.util.ServiceStopper;
067    import org.apache.activemq.wireformat.WireFormat;
068    import org.apache.kahadb.util.ByteSequence;
069    import org.slf4j.Logger;
070    import org.slf4j.LoggerFactory;
071    import org.apache.kahadb.journal.Location;
072    import org.apache.kahadb.page.Transaction;
073    
074    public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
075        static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
076        private static final int MAX_ASYNC_JOBS = 10000;
077    
078        public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
079        public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
080                PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
081        public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
082        private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
083                PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
084    
085        protected ExecutorService queueExecutor;
086        protected ExecutorService topicExecutor;
087        protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
088        protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
089        final WireFormat wireFormat = new OpenWireFormat();
090        private SystemUsage usageManager;
091        private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
092        private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
093        Semaphore globalQueueSemaphore;
094        Semaphore globalTopicSemaphore;
095        private boolean concurrentStoreAndDispatchQueues = true;
096        // when true, message order may be compromised when cache is exhausted if store is out
097        // or order w.r.t cache
098        private boolean concurrentStoreAndDispatchTopics = false;
099        private boolean concurrentStoreAndDispatchTransactions = false;
100        private int maxAsyncJobs = MAX_ASYNC_JOBS;
101        private final KahaDBTransactionStore transactionStore;
102        private TransactionIdTransformer transactionIdTransformer;
103    
104        public KahaDBStore() {
105            this.transactionStore = new KahaDBTransactionStore(this);
106            this.transactionIdTransformer = new TransactionIdTransformer() {
107                @Override
108                public KahaTransactionInfo transform(TransactionId txid) {
109                    return TransactionIdConversion.convert(txid);
110                }
111            };
112        }
113    
114        @Override
115        public String toString() {
116            return "KahaDB:[" + directory.getAbsolutePath() + "]";
117        }
118    
119        public void setBrokerName(String brokerName) {
120        }
121    
122        public void setUsageManager(SystemUsage usageManager) {
123            this.usageManager = usageManager;
124        }
125    
126        public SystemUsage getUsageManager() {
127            return this.usageManager;
128        }
129    
130        /**
131         * @return the concurrentStoreAndDispatch
132         */
133        public boolean isConcurrentStoreAndDispatchQueues() {
134            return this.concurrentStoreAndDispatchQueues;
135        }
136    
137        /**
138         * @param concurrentStoreAndDispatch
139         *            the concurrentStoreAndDispatch to set
140         */
141        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142            this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143        }
144    
145        /**
146         * @return the concurrentStoreAndDispatch
147         */
148        public boolean isConcurrentStoreAndDispatchTopics() {
149            return this.concurrentStoreAndDispatchTopics;
150        }
151    
152        /**
153         * @param concurrentStoreAndDispatch
154         *            the concurrentStoreAndDispatch to set
155         */
156        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157            this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158        }
159    
160        public boolean isConcurrentStoreAndDispatchTransactions() {
161            return this.concurrentStoreAndDispatchTransactions;
162        }
163    
164        /**
165         * @return the maxAsyncJobs
166         */
167        public int getMaxAsyncJobs() {
168            return this.maxAsyncJobs;
169        }
170        /**
171         * @param maxAsyncJobs
172         *            the maxAsyncJobs to set
173         */
174        public void setMaxAsyncJobs(int maxAsyncJobs) {
175            this.maxAsyncJobs = maxAsyncJobs;
176        }
177    
178        @Override
179        public void doStart() throws Exception {
180            super.doStart();
181            this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182            this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183            this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184            this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185            this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186                    asyncQueueJobQueue, new ThreadFactory() {
187                        public Thread newThread(Runnable runnable) {
188                            Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189                            thread.setDaemon(true);
190                            return thread;
191                        }
192                    });
193            this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194                    asyncTopicJobQueue, new ThreadFactory() {
195                        public Thread newThread(Runnable runnable) {
196                            Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197                            thread.setDaemon(true);
198                            return thread;
199                        }
200                    });
201        }
202    
203        @Override
204        public void doStop(ServiceStopper stopper) throws Exception {
205            // drain down async jobs
206            LOG.info("Stopping async queue tasks");
207            if (this.globalQueueSemaphore != null) {
208                this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209            }
210            synchronized (this.asyncQueueMaps) {
211                for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212                    synchronized (m) {
213                        for (StoreTask task : m.values()) {
214                            task.cancel();
215                        }
216                    }
217                }
218                this.asyncQueueMaps.clear();
219            }
220            LOG.info("Stopping async topic tasks");
221            if (this.globalTopicSemaphore != null) {
222                this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223            }
224            synchronized (this.asyncTopicMaps) {
225                for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226                    synchronized (m) {
227                        for (StoreTask task : m.values()) {
228                            task.cancel();
229                        }
230                    }
231                }
232                this.asyncTopicMaps.clear();
233            }
234            if (this.globalQueueSemaphore != null) {
235                this.globalQueueSemaphore.drainPermits();
236            }
237            if (this.globalTopicSemaphore != null) {
238                this.globalTopicSemaphore.drainPermits();
239            }
240            if (this.queueExecutor != null) {
241                this.queueExecutor.shutdownNow();
242            }
243            if (this.topicExecutor != null) {
244                this.topicExecutor.shutdownNow();
245            }
246            LOG.info("Stopped KahaDB");
247            super.doStop(stopper);
248        }
249    
250        void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
251            Location location;
252            this.indexLock.writeLock().lock();
253            try {
254                  location = findMessageLocation(key, destination);
255            } finally {
256                this.indexLock.writeLock().unlock();
257            }
258    
259            if (location != null) {
260                KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
261                Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
262    
263                message.incrementRedeliveryCounter();
264                if (LOG.isTraceEnabled()) {
265                    LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
266                }
267                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
268                addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
269    
270                final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
271    
272                this.indexLock.writeLock().lock();
273                try {
274                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
275                        public void execute(Transaction tx) throws IOException {
276                            StoredDestination sd = getStoredDestination(destination, tx);
277                            Long sequence = sd.messageIdIndex.get(tx, key);
278                            MessageKeys keys = sd.orderIndex.get(tx, sequence);
279                            sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
280                        }
281                    });
282                } finally {
283                    this.indexLock.writeLock().unlock();
284                }
285            }
286        }
287    
288        @Override
289        void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
290            if (brokerService != null) {
291                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
292                if (regionBroker != null) {
293                    Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
294                    for (Destination destination : destinationSet) {
295                        destination.getDestinationStatistics().getMessages().decrement();
296                        destination.getDestinationStatistics().getEnqueues().decrement();
297                    }
298                }
299            }
300        }
301    
302        private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
303            return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
304                public Location execute(Transaction tx) throws IOException {
305                    StoredDestination sd = getStoredDestination(destination, tx);
306                    Long sequence = sd.messageIdIndex.get(tx, key);
307                    if (sequence == null) {
308                        return null;
309                    }
310                    return sd.orderIndex.get(tx, sequence).location;
311                }
312            });
313        }
314    
315        protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
316            StoreQueueTask task = null;
317            synchronized (store.asyncTaskMap) {
318                task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
319            }
320            return task;
321        }
322    
323        protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
324            synchronized (store.asyncTaskMap) {
325                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
326            }
327            this.queueExecutor.execute(task);
328        }
329    
330        protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
331            StoreTopicTask task = null;
332            synchronized (store.asyncTaskMap) {
333                task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
334            }
335            return task;
336        }
337    
338        protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
339            synchronized (store.asyncTaskMap) {
340                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
341            }
342            this.topicExecutor.execute(task);
343        }
344    
345        public TransactionStore createTransactionStore() throws IOException {
346            return this.transactionStore;
347        }
348    
349        public boolean getForceRecoverIndex() {
350            return this.forceRecoverIndex;
351        }
352    
353        public void setForceRecoverIndex(boolean forceRecoverIndex) {
354            this.forceRecoverIndex = forceRecoverIndex;
355        }
356    
357        public class KahaDBMessageStore extends AbstractMessageStore {
358            protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
359            protected KahaDestination dest;
360            private final int maxAsyncJobs;
361            private final Semaphore localDestinationSemaphore;
362    
363            double doneTasks, canceledTasks = 0;
364    
365            public KahaDBMessageStore(ActiveMQDestination destination) {
366                super(destination);
367                this.dest = convert(destination);
368                this.maxAsyncJobs = getMaxAsyncJobs();
369                this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
370            }
371    
372            @Override
373            public ActiveMQDestination getDestination() {
374                return destination;
375            }
376    
377            @Override
378            public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
379                    throws IOException {
380                if (isConcurrentStoreAndDispatchQueues()) {
381                    StoreQueueTask result = new StoreQueueTask(this, context, message);
382                    result.aquireLocks();
383                    addQueueTask(this, result);
384                    return result.getFuture();
385                } else {
386                    return super.asyncAddQueueMessage(context, message);
387                }
388            }
389    
390            @Override
391            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
392                if (isConcurrentStoreAndDispatchQueues()) {
393                    AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
394                    StoreQueueTask task = null;
395                    synchronized (asyncTaskMap) {
396                        task = (StoreQueueTask) asyncTaskMap.get(key);
397                    }
398                    if (task != null) {
399                        if (!task.cancel()) {
400                            try {
401    
402                                task.future.get();
403                            } catch (InterruptedException e) {
404                                throw new InterruptedIOException(e.toString());
405                            } catch (Exception ignored) {
406                                LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
407                            }
408                            removeMessage(context, ack);
409                        } else {
410                            synchronized (asyncTaskMap) {
411                                asyncTaskMap.remove(key);
412                            }
413                        }
414                    } else {
415                        removeMessage(context, ack);
416                    }
417                } else {
418                    removeMessage(context, ack);
419                }
420            }
421    
422            public void addMessage(ConnectionContext context, Message message) throws IOException {
423                KahaAddMessageCommand command = new KahaAddMessageCommand();
424                command.setDestination(dest);
425                command.setMessageId(message.getMessageId().toString());
426                command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
427                command.setPriority(message.getPriority());
428                command.setPrioritySupported(isPrioritizedMessages());
429                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
430                command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
431                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
432    
433            }
434    
435            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
436                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
437                command.setDestination(dest);
438                command.setMessageId(ack.getLastMessageId().toString());
439                command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
440    
441                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
442                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
443                store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
444            }
445    
446            public void removeAllMessages(ConnectionContext context) throws IOException {
447                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
448                command.setDestination(dest);
449                store(command, true, null, null);
450            }
451    
452            public Message getMessage(MessageId identity) throws IOException {
453                final String key = identity.toString();
454    
455                // Hopefully one day the page file supports concurrent read
456                // operations... but for now we must
457                // externally synchronize...
458                Location location;
459                indexLock.writeLock().lock();
460                try {
461                    location = findMessageLocation(key, dest);
462                }finally {
463                    indexLock.writeLock().unlock();
464                }
465                if (location == null) {
466                    return null;
467                }
468    
469                return loadMessage(location);
470            }
471    
472            public int getMessageCount() throws IOException {
473                try {
474                    lockAsyncJobQueue();
475                    indexLock.writeLock().lock();
476                    try {
477                        return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
478                            public Integer execute(Transaction tx) throws IOException {
479                                // Iterate through all index entries to get a count
480                                // of
481                                // messages in the destination.
482                                StoredDestination sd = getStoredDestination(dest, tx);
483                                int rc = 0;
484                                for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
485                                        .hasNext();) {
486                                    iterator.next();
487                                    rc++;
488                                }
489                                return rc;
490                            }
491                        });
492                    }finally {
493                        indexLock.writeLock().unlock();
494                    }
495                } finally {
496                    unlockAsyncJobQueue();
497                }
498            }
499    
500            @Override
501            public boolean isEmpty() throws IOException {
502                indexLock.writeLock().lock();
503                try {
504                    return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
505                        public Boolean execute(Transaction tx) throws IOException {
506                            // Iterate through all index entries to get a count of
507                            // messages in the destination.
508                            StoredDestination sd = getStoredDestination(dest, tx);
509                            return sd.locationIndex.isEmpty(tx);
510                        }
511                    });
512                }finally {
513                    indexLock.writeLock().unlock();
514                }
515            }
516    
517            public void recover(final MessageRecoveryListener listener) throws Exception {
518                // recovery may involve expiry which will modify
519                indexLock.writeLock().lock();
520                try {
521                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
522                        public void execute(Transaction tx) throws Exception {
523                            StoredDestination sd = getStoredDestination(dest, tx);
524                            sd.orderIndex.resetCursorPosition();
525                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
526                                    .hasNext(); ) {
527                                Entry<Long, MessageKeys> entry = iterator.next();
528                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
529                                    continue;
530                                }
531                                Message msg = loadMessage(entry.getValue().location);
532                                listener.recoverMessage(msg);
533                            }
534                        }
535                    });
536                }finally {
537                    indexLock.writeLock().unlock();
538                }
539            }
540    
541    
542            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
543                indexLock.writeLock().lock();
544                try {
545                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
546                        public void execute(Transaction tx) throws Exception {
547                            StoredDestination sd = getStoredDestination(dest, tx);
548                            Entry<Long, MessageKeys> entry = null;
549                            int counter = 0;
550                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
551                                 listener.hasSpace() && iterator.hasNext(); ) {
552                                entry = iterator.next();
553                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
554                                    continue;
555                                }
556                                Message msg = loadMessage(entry.getValue().location);
557                                listener.recoverMessage(msg);
558                                counter++;
559                                if (counter >= maxReturned) {
560                                    break;
561                                }
562                            }
563                            sd.orderIndex.stoppedIterating();
564                        }
565                    });
566                }finally {
567                    indexLock.writeLock().unlock();
568                }
569            }
570    
571            public void resetBatching() {
572                if (pageFile.isLoaded()) {
573                    indexLock.writeLock().lock();
574                    try {
575                        pageFile.tx().execute(new Transaction.Closure<Exception>() {
576                            public void execute(Transaction tx) throws Exception {
577                                StoredDestination sd = getExistingStoredDestination(dest, tx);
578                                if (sd != null) {
579                                    sd.orderIndex.resetCursorPosition();}
580                                }
581                            });
582                    } catch (Exception e) {
583                        LOG.error("Failed to reset batching",e);
584                    }finally {
585                        indexLock.writeLock().unlock();
586                    }
587                }
588            }
589    
590            @Override
591            public void setBatch(MessageId identity) throws IOException {
592                try {
593                    final String key = identity.toString();
594                    lockAsyncJobQueue();
595    
596                    // Hopefully one day the page file supports concurrent read
597                    // operations... but for now we must
598                    // externally synchronize...
599    
600                    indexLock.writeLock().lock();
601                    try {
602                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
603                            public void execute(Transaction tx) throws IOException {
604                                StoredDestination sd = getStoredDestination(dest, tx);
605                                Long location = sd.messageIdIndex.get(tx, key);
606                                if (location != null) {
607                                    sd.orderIndex.setBatch(tx, location);
608                                }
609                            }
610                        });
611                    } finally {
612                        indexLock.writeLock().unlock();
613                    }
614                } finally {
615                    unlockAsyncJobQueue();
616                }
617            }
618    
619            @Override
620            public void setMemoryUsage(MemoryUsage memoeyUSage) {
621            }
622            @Override
623            public void start() throws Exception {
624                super.start();
625            }
626            @Override
627            public void stop() throws Exception {
628                super.stop();
629            }
630    
631            protected void lockAsyncJobQueue() {
632                try {
633                    this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
634                } catch (Exception e) {
635                    LOG.error("Failed to lock async jobs for " + this.destination, e);
636                }
637            }
638    
639            protected void unlockAsyncJobQueue() {
640                this.localDestinationSemaphore.release(this.maxAsyncJobs);
641            }
642    
643            protected void acquireLocalAsyncLock() {
644                try {
645                    this.localDestinationSemaphore.acquire();
646                } catch (InterruptedException e) {
647                    LOG.error("Failed to aquire async lock for " + this.destination, e);
648                }
649            }
650    
651            protected void releaseLocalAsyncLock() {
652                this.localDestinationSemaphore.release();
653            }
654    
655        }
656    
657        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
658            private final AtomicInteger subscriptionCount = new AtomicInteger();
659            public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
660                super(destination);
661                this.subscriptionCount.set(getAllSubscriptions().length);
662                asyncTopicMaps.add(asyncTaskMap);
663            }
664    
665            @Override
666            public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
667                    throws IOException {
668                if (isConcurrentStoreAndDispatchTopics()) {
669                    StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
670                    result.aquireLocks();
671                    addTopicTask(this, result);
672                    return result.getFuture();
673                } else {
674                    return super.asyncAddTopicMessage(context, message);
675                }
676            }
677    
678            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
679                                    MessageId messageId, MessageAck ack)
680                    throws IOException {
681                String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
682                if (isConcurrentStoreAndDispatchTopics()) {
683                    AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
684                    StoreTopicTask task = null;
685                    synchronized (asyncTaskMap) {
686                        task = (StoreTopicTask) asyncTaskMap.get(key);
687                    }
688                    if (task != null) {
689                        if (task.addSubscriptionKey(subscriptionKey)) {
690                            removeTopicTask(this, messageId);
691                            if (task.cancel()) {
692                                synchronized (asyncTaskMap) {
693                                    asyncTaskMap.remove(key);
694                                }
695                            }
696                        }
697                    } else {
698                        doAcknowledge(context, subscriptionKey, messageId, ack);
699                    }
700                } else {
701                    doAcknowledge(context, subscriptionKey, messageId, ack);
702                }
703            }
704    
705            protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
706                    throws IOException {
707                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
708                command.setDestination(dest);
709                command.setSubscriptionKey(subscriptionKey);
710                command.setMessageId(messageId.toString());
711                command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
712                if (ack != null && ack.isUnmatchedAck()) {
713                    command.setAck(UNMATCHED);
714                }
715                store(command, false, null, null);
716            }
717    
718            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
719                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
720                        .getSubscriptionName());
721                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
722                command.setDestination(dest);
723                command.setSubscriptionKey(subscriptionKey.toString());
724                command.setRetroactive(retroactive);
725                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
726                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
727                store(command, isEnableJournalDiskSyncs() && true, null, null);
728                this.subscriptionCount.incrementAndGet();
729            }
730    
731            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
732                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
733                command.setDestination(dest);
734                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
735                store(command, isEnableJournalDiskSyncs() && true, null, null);
736                this.subscriptionCount.decrementAndGet();
737            }
738    
739            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
740    
741                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
742                indexLock.writeLock().lock();
743                try {
744                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
745                        public void execute(Transaction tx) throws IOException {
746                            StoredDestination sd = getStoredDestination(dest, tx);
747                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
748                                    .hasNext();) {
749                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
750                                SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
751                                        .getValue().getSubscriptionInfo().newInput()));
752                                subscriptions.add(info);
753    
754                            }
755                        }
756                    });
757                }finally {
758                    indexLock.writeLock().unlock();
759                }
760    
761                SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
762                subscriptions.toArray(rc);
763                return rc;
764            }
765    
766            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
767                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
768                indexLock.writeLock().lock();
769                try {
770                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
771                        public SubscriptionInfo execute(Transaction tx) throws IOException {
772                            StoredDestination sd = getStoredDestination(dest, tx);
773                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
774                            if (command == null) {
775                                return null;
776                            }
777                            return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
778                                    .getSubscriptionInfo().newInput()));
779                        }
780                    });
781                }finally {
782                    indexLock.writeLock().unlock();
783                }
784            }
785    
786            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
787                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
788                indexLock.writeLock().lock();
789                try {
790                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
791                        public Integer execute(Transaction tx) throws IOException {
792                            StoredDestination sd = getStoredDestination(dest, tx);
793                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
794                            if (cursorPos == null) {
795                                // The subscription might not exist.
796                                return 0;
797                            }
798    
799                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
800                        }
801                    });
802                }finally {
803                    indexLock.writeLock().unlock();
804                }
805            }
806    
807            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
808                    throws Exception {
809                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
810                @SuppressWarnings("unused")
811                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
812                indexLock.writeLock().lock();
813                try {
814                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
815                        public void execute(Transaction tx) throws Exception {
816                            StoredDestination sd = getStoredDestination(dest, tx);
817                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
818                            sd.orderIndex.setBatch(tx, cursorPos);
819                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
820                                    .hasNext();) {
821                                Entry<Long, MessageKeys> entry = iterator.next();
822                                listener.recoverMessage(loadMessage(entry.getValue().location));
823                            }
824                            sd.orderIndex.resetCursorPosition();
825                        }
826                    });
827                }finally {
828                    indexLock.writeLock().unlock();
829                }
830            }
831    
832            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
833                    final MessageRecoveryListener listener) throws Exception {
834                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
835                @SuppressWarnings("unused")
836                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
837                indexLock.writeLock().lock();
838                try {
839                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
840                        public void execute(Transaction tx) throws Exception {
841                            StoredDestination sd = getStoredDestination(dest, tx);
842                            sd.orderIndex.resetCursorPosition();
843                            MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
844                            if (moc == null) {
845                                LastAck pos = getLastAck(tx, sd, subscriptionKey);
846                                if (pos == null) {
847                                    // sub deleted
848                                    return;
849                                }
850                                sd.orderIndex.setBatch(tx, pos);
851                                moc = sd.orderIndex.cursor;
852                            } else {
853                                sd.orderIndex.cursor.sync(moc);
854                            }
855    
856                            Entry<Long, MessageKeys> entry = null;
857                            int counter = 0;
858                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
859                                    .hasNext();) {
860                                entry = iterator.next();
861                                if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
862                                    counter++;
863                                }
864                                if (counter >= maxReturned || listener.hasSpace() == false) {
865                                    break;
866                                }
867                            }
868                            sd.orderIndex.stoppedIterating();
869                            if (entry != null) {
870                                MessageOrderCursor copy = sd.orderIndex.cursor.copy();
871                                sd.subscriptionCursors.put(subscriptionKey, copy);
872                            }
873                        }
874                    });
875                }finally {
876                    indexLock.writeLock().unlock();
877                }
878            }
879    
880            public void resetBatching(String clientId, String subscriptionName) {
881                try {
882                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
883                    indexLock.writeLock().lock();
884                    try {
885                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
886                            public void execute(Transaction tx) throws IOException {
887                                StoredDestination sd = getStoredDestination(dest, tx);
888                                sd.subscriptionCursors.remove(subscriptionKey);
889                            }
890                        });
891                    }finally {
892                        indexLock.writeLock().unlock();
893                    }
894                } catch (IOException e) {
895                    throw new RuntimeException(e);
896                }
897            }
898        }
899    
900        String subscriptionKey(String clientId, String subscriptionName) {
901            return clientId + ":" + subscriptionName;
902        }
903    
904        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
905            return this.transactionStore.proxy(new KahaDBMessageStore(destination));
906        }
907    
908        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
909            return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
910        }
911    
912        /**
913         * Cleanup method to remove any state associated with the given destination.
914         * This method does not stop the message store (it might not be cached).
915         *
916         * @param destination
917         *            Destination to forget
918         */
919        public void removeQueueMessageStore(ActiveMQQueue destination) {
920        }
921    
922        /**
923         * Cleanup method to remove any state associated with the given destination
924         * This method does not stop the message store (it might not be cached).
925         *
926         * @param destination
927         *            Destination to forget
928         */
929        public void removeTopicMessageStore(ActiveMQTopic destination) {
930        }
931    
932        public void deleteAllMessages() throws IOException {
933            deleteAllMessages = true;
934        }
935    
936        public Set<ActiveMQDestination> getDestinations() {
937            try {
938                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
939                indexLock.writeLock().lock();
940                try {
941                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
942                        public void execute(Transaction tx) throws IOException {
943                            for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
944                                    .hasNext();) {
945                                Entry<String, StoredDestination> entry = iterator.next();
946                                if (!isEmptyTopic(entry, tx)) {
947                                    rc.add(convert(entry.getKey()));
948                                }
949                            }
950                        }
951    
952                        private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
953                                throws IOException {
954                            boolean isEmptyTopic = false;
955                            ActiveMQDestination dest = convert(entry.getKey());
956                            if (dest.isTopic()) {
957                                StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
958                                if (loadedStore.subscriptionAcks.isEmpty(tx)) {
959                                    isEmptyTopic = true;
960                                }
961                            }
962                            return isEmptyTopic;
963                        }
964                    });
965                }finally {
966                    indexLock.writeLock().unlock();
967                }
968                return rc;
969            } catch (IOException e) {
970                throw new RuntimeException(e);
971            }
972        }
973    
974        public long getLastMessageBrokerSequenceId() throws IOException {
975            return 0;
976        }
977    
978        public long getLastProducerSequenceId(ProducerId id) {
979            indexLock.readLock().lock();
980            try {
981                return metadata.producerSequenceIdTracker.getLastSeqId(id);
982            } finally {
983                indexLock.readLock().unlock();
984            }
985        }
986    
987        public long size() {
988            return storeSize.get();
989        }
990    
991        public void beginTransaction(ConnectionContext context) throws IOException {
992            throw new IOException("Not yet implemented.");
993        }
994        public void commitTransaction(ConnectionContext context) throws IOException {
995            throw new IOException("Not yet implemented.");
996        }
997        public void rollbackTransaction(ConnectionContext context) throws IOException {
998            throw new IOException("Not yet implemented.");
999        }
1000    
1001        public void checkpoint(boolean sync) throws IOException {
1002            super.checkpointCleanup(sync);
1003        }
1004    
1005        // /////////////////////////////////////////////////////////////////
1006        // Internal helper methods.
1007        // /////////////////////////////////////////////////////////////////
1008    
1009        /**
1010         * @param location
1011         * @return
1012         * @throws IOException
1013         */
1014        Message loadMessage(Location location) throws IOException {
1015            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1016            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1017            return msg;
1018        }
1019    
1020        // /////////////////////////////////////////////////////////////////
1021        // Internal conversion methods.
1022        // /////////////////////////////////////////////////////////////////
1023    
1024        KahaLocation convert(Location location) {
1025            KahaLocation rc = new KahaLocation();
1026            rc.setLogId(location.getDataFileId());
1027            rc.setOffset(location.getOffset());
1028            return rc;
1029        }
1030    
1031        KahaDestination convert(ActiveMQDestination dest) {
1032            KahaDestination rc = new KahaDestination();
1033            rc.setName(dest.getPhysicalName());
1034            switch (dest.getDestinationType()) {
1035            case ActiveMQDestination.QUEUE_TYPE:
1036                rc.setType(DestinationType.QUEUE);
1037                return rc;
1038            case ActiveMQDestination.TOPIC_TYPE:
1039                rc.setType(DestinationType.TOPIC);
1040                return rc;
1041            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1042                rc.setType(DestinationType.TEMP_QUEUE);
1043                return rc;
1044            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1045                rc.setType(DestinationType.TEMP_TOPIC);
1046                return rc;
1047            default:
1048                return null;
1049            }
1050        }
1051    
1052        ActiveMQDestination convert(String dest) {
1053            int p = dest.indexOf(":");
1054            if (p < 0) {
1055                throw new IllegalArgumentException("Not in the valid destination format");
1056            }
1057            int type = Integer.parseInt(dest.substring(0, p));
1058            String name = dest.substring(p + 1);
1059            return convert(type, name);
1060        }
1061    
1062        private ActiveMQDestination convert(KahaDestination commandDestination) {
1063            return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1064        }
1065    
1066        private ActiveMQDestination convert(int type, String name) {
1067            switch (KahaDestination.DestinationType.valueOf(type)) {
1068            case QUEUE:
1069                return new ActiveMQQueue(name);
1070            case TOPIC:
1071                return new ActiveMQTopic(name);
1072            case TEMP_QUEUE:
1073                return new ActiveMQTempQueue(name);
1074            case TEMP_TOPIC:
1075                return new ActiveMQTempTopic(name);
1076            default:
1077                throw new IllegalArgumentException("Not in the valid destination format");
1078            }
1079        }
1080    
1081        public TransactionIdTransformer getTransactionIdTransformer() {
1082            return transactionIdTransformer;
1083        }
1084    
1085        public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1086            this.transactionIdTransformer = transactionIdTransformer;
1087        }
1088    
1089        static class AsyncJobKey {
1090            MessageId id;
1091            ActiveMQDestination destination;
1092    
1093            AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1094                this.id = id;
1095                this.destination = destination;
1096            }
1097    
1098            @Override
1099            public boolean equals(Object obj) {
1100                if (obj == this) {
1101                    return true;
1102                }
1103                return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1104                        && destination.equals(((AsyncJobKey) obj).destination);
1105            }
1106    
1107            @Override
1108            public int hashCode() {
1109                return id.hashCode() + destination.hashCode();
1110            }
1111    
1112            @Override
1113            public String toString() {
1114                return destination.getPhysicalName() + "-" + id;
1115            }
1116        }
1117    
1118        public interface StoreTask {
1119            public boolean cancel();
1120    
1121            public void aquireLocks();
1122    
1123            public void releaseLocks();
1124        }
1125    
1126        class StoreQueueTask implements Runnable, StoreTask {
1127            protected final Message message;
1128            protected final ConnectionContext context;
1129            protected final KahaDBMessageStore store;
1130            protected final InnerFutureTask future;
1131            protected final AtomicBoolean done = new AtomicBoolean();
1132            protected final AtomicBoolean locked = new AtomicBoolean();
1133    
1134            public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1135                this.store = store;
1136                this.context = context;
1137                this.message = message;
1138                this.future = new InnerFutureTask(this);
1139            }
1140    
1141            public Future<Object> getFuture() {
1142                return this.future;
1143            }
1144    
1145            public boolean cancel() {
1146                if (this.done.compareAndSet(false, true)) {
1147                    return this.future.cancel(false);
1148                }
1149                return false;
1150            }
1151    
1152            public void aquireLocks() {
1153                if (this.locked.compareAndSet(false, true)) {
1154                    try {
1155                        globalQueueSemaphore.acquire();
1156                        store.acquireLocalAsyncLock();
1157                        message.incrementReferenceCount();
1158                    } catch (InterruptedException e) {
1159                        LOG.warn("Failed to aquire lock", e);
1160                    }
1161                }
1162    
1163            }
1164    
1165            public void releaseLocks() {
1166                if (this.locked.compareAndSet(true, false)) {
1167                    store.releaseLocalAsyncLock();
1168                    globalQueueSemaphore.release();
1169                    message.decrementReferenceCount();
1170                }
1171            }
1172    
1173            public void run() {
1174                this.store.doneTasks++;
1175                try {
1176                    if (this.done.compareAndSet(false, true)) {
1177                        this.store.addMessage(context, message);
1178                        removeQueueTask(this.store, this.message.getMessageId());
1179                        this.future.complete();
1180                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1181                        System.err.println(this.store.dest.getName() + " cancelled: "
1182                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1183                        this.store.canceledTasks = this.store.doneTasks = 0;
1184                    }
1185                } catch (Exception e) {
1186                    this.future.setException(e);
1187                }
1188            }
1189    
1190            protected Message getMessage() {
1191                return this.message;
1192            }
1193    
1194            private class InnerFutureTask extends FutureTask<Object> {
1195    
1196                public InnerFutureTask(Runnable runnable) {
1197                    super(runnable, null);
1198    
1199                }
1200    
1201                public void setException(final Exception e) {
1202                    super.setException(e);
1203                }
1204    
1205                public void complete() {
1206                    super.set(null);
1207                }
1208            }
1209        }
1210    
1211        class StoreTopicTask extends StoreQueueTask {
1212            private final int subscriptionCount;
1213            private final List<String> subscriptionKeys = new ArrayList<String>(1);
1214            private final KahaDBTopicMessageStore topicStore;
1215            public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1216                    int subscriptionCount) {
1217                super(store, context, message);
1218                this.topicStore = store;
1219                this.subscriptionCount = subscriptionCount;
1220    
1221            }
1222    
1223            @Override
1224            public void aquireLocks() {
1225                if (this.locked.compareAndSet(false, true)) {
1226                    try {
1227                        globalTopicSemaphore.acquire();
1228                        store.acquireLocalAsyncLock();
1229                        message.incrementReferenceCount();
1230                    } catch (InterruptedException e) {
1231                        LOG.warn("Failed to aquire lock", e);
1232                    }
1233                }
1234    
1235            }
1236    
1237            @Override
1238            public void releaseLocks() {
1239                if (this.locked.compareAndSet(true, false)) {
1240                    message.decrementReferenceCount();
1241                    store.releaseLocalAsyncLock();
1242                    globalTopicSemaphore.release();
1243                }
1244            }
1245    
1246            /**
1247             * add a key
1248             *
1249             * @param key
1250             * @return true if all acknowledgements received
1251             */
1252            public boolean addSubscriptionKey(String key) {
1253                synchronized (this.subscriptionKeys) {
1254                    this.subscriptionKeys.add(key);
1255                }
1256                return this.subscriptionKeys.size() >= this.subscriptionCount;
1257            }
1258    
1259            @Override
1260            public void run() {
1261                this.store.doneTasks++;
1262                try {
1263                    if (this.done.compareAndSet(false, true)) {
1264                        this.topicStore.addMessage(context, message);
1265                        // apply any acks we have
1266                        synchronized (this.subscriptionKeys) {
1267                            for (String key : this.subscriptionKeys) {
1268                                this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1269    
1270                            }
1271                        }
1272                        removeTopicTask(this.topicStore, this.message.getMessageId());
1273                        this.future.complete();
1274                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1275                        System.err.println(this.store.dest.getName() + " cancelled: "
1276                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1277                        this.store.canceledTasks = this.store.doneTasks = 0;
1278                    }
1279                } catch (Exception e) {
1280                    this.future.setException(e);
1281                }
1282            }
1283        }
1284    
1285        public class StoreTaskExecutor extends ThreadPoolExecutor {
1286    
1287            public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1288                super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1289            }
1290    
1291            protected void afterExecute(Runnable runnable, Throwable throwable) {
1292                super.afterExecute(runnable, throwable);
1293    
1294                if (runnable instanceof StoreTask) {
1295                   ((StoreTask)runnable).releaseLocks();
1296                }
1297    
1298            }
1299        }
1300    }