001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.amq;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.RandomAccessFile;
022import java.nio.channels.FileLock;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.activeio.journal.Journal;
035import org.apache.activemq.broker.BrokerService;
036import org.apache.activemq.broker.BrokerServiceAware;
037import org.apache.activemq.broker.ConnectionContext;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQQueue;
040import org.apache.activemq.command.ActiveMQTopic;
041import org.apache.activemq.command.DataStructure;
042import org.apache.activemq.command.JournalQueueAck;
043import org.apache.activemq.command.JournalTopicAck;
044import org.apache.activemq.command.JournalTrace;
045import org.apache.activemq.command.JournalTransaction;
046import org.apache.activemq.command.Message;
047import org.apache.activemq.command.ProducerId;
048import org.apache.activemq.command.SubscriptionInfo;
049import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051import org.apache.activemq.kaha.impl.async.Location;
052import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053import org.apache.activemq.openwire.OpenWireFormat;
054import org.apache.activemq.store.MessageStore;
055import org.apache.activemq.store.PersistenceAdapter;
056import org.apache.activemq.store.ReferenceStore;
057import org.apache.activemq.store.ReferenceStoreAdapter;
058import org.apache.activemq.store.TopicMessageStore;
059import org.apache.activemq.store.TopicReferenceStore;
060import org.apache.activemq.store.TransactionStore;
061import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
062import org.apache.activemq.thread.Scheduler;
063import org.apache.activemq.thread.Task;
064import org.apache.activemq.thread.TaskRunner;
065import org.apache.activemq.thread.TaskRunnerFactory;
066import org.apache.activemq.usage.SystemUsage;
067import org.apache.activemq.usage.Usage;
068import org.apache.activemq.usage.UsageListener;
069import org.apache.activemq.util.ByteSequence;
070import org.apache.activemq.util.IOExceptionSupport;
071import org.apache.activemq.util.IOHelper;
072import org.apache.activemq.wireformat.WireFormat;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076
077/**
078 * An implementation of {@link PersistenceAdapter} designed for use with a
079 * {@link Journal} and then check pointing asynchronously on a timeout with some
080 * other long term persistent storage.
081 * 
082 * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083 * 
084 */
085public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086
087    private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
088    private Scheduler scheduler;
089    private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090    private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091    private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092    private static final boolean BROKEN_FILE_LOCK;
093    private static final boolean DISABLE_LOCKING;
094    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095    private AsyncDataManager asyncDataManager;
096    private ReferenceStoreAdapter referenceStoreAdapter;
097    private TaskRunnerFactory taskRunnerFactory;
098    private WireFormat wireFormat = new OpenWireFormat();
099    private SystemUsage usageManager;
100    private long checkpointInterval = 1000 * 20;
101    private int maxCheckpointMessageAddSize = 1024 * 4;
102    private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103    private TaskRunner checkpointTask;
104    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105    private final AtomicBoolean started = new AtomicBoolean(false);
106    private Runnable periodicCheckpointTask;
107    private Runnable periodicCleanupTask;
108    private boolean deleteAllMessages;
109    private boolean syncOnWrite;
110    private String brokerName = "";
111    private File directory;
112    private File directoryArchive;
113    private BrokerService brokerService;
114    private final AtomicLong storeSize = new AtomicLong();
115    private boolean persistentIndex=true;
116    private boolean useNio = true;
117    private boolean archiveDataLogs=false;
118    private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
119    private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
120    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
121    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
122    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
123    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
124    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
125    private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
126    private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
127    private RandomAccessFile lockFile;
128    private FileLock lock;
129    private boolean disableLocking = DISABLE_LOCKING;
130        private boolean failIfJournalIsLocked;
131    private boolean lockLogged;
132    private boolean lockAquired;
133    private boolean recoverReferenceStore=true;
134    private boolean forceRecoverReferenceStore=false;
135    private boolean useDedicatedTaskRunner=false;
136    private int journalThreadPriority = Thread.MAX_PRIORITY;
137
138    public String getBrokerName() {
139        return this.brokerName;
140    }
141
142    public void setBrokerName(String brokerName) {
143        this.brokerName = brokerName;
144        if (this.referenceStoreAdapter != null) {
145            this.referenceStoreAdapter.setBrokerName(brokerName);
146        }
147    }
148
149    public BrokerService getBrokerService() {
150        return brokerService;
151    }
152
153    public void setBrokerService(BrokerService brokerService) {
154        this.brokerService = brokerService;
155    }
156
157    public synchronized void start() throws Exception {
158        if (!started.compareAndSet(false, true)) {
159            return;
160        }
161        if (this.directory == null) {
162            if (brokerService != null) {
163                this.directory = brokerService.getBrokerDataDirectory();
164               
165            } else {
166                this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
167                this.directory = new File(directory, "amqstore");
168                directory.getAbsolutePath();
169            }
170        }
171        if (this.directoryArchive == null) {
172            this.directoryArchive = new File(this.directory,"archive");
173        }
174        if (this.brokerService != null) {
175            this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
176            this.scheduler = this.brokerService.getScheduler();
177        } else {
178            this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
179                true, 1000, isUseDedicatedTaskRunner());
180            this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
181        }
182
183        IOHelper.mkdirs(this.directory);
184        lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
185        lock();
186        LOG.info("AMQStore starting using directory: " + directory); 
187        if (archiveDataLogs) {
188            IOHelper.mkdirs(this.directoryArchive);
189        }
190
191        if (this.usageManager != null) {
192            this.usageManager.getMemoryUsage().addUsageListener(this);
193        }
194        if (asyncDataManager == null) {
195            asyncDataManager = createAsyncDataManager();
196        }
197        if (referenceStoreAdapter == null) {
198            referenceStoreAdapter = createReferenceStoreAdapter();
199        }
200        referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
201        referenceStoreAdapter.setBrokerName(getBrokerName());
202        referenceStoreAdapter.setUsageManager(usageManager);
203        referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
204        
205        if (failIfJournalIsLocked) {
206            asyncDataManager.lock();
207        } else {
208            while (true) {
209                try {
210                    asyncDataManager.lock();
211                    break;
212                } catch (IOException e) {
213                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
214                    try {
215                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
216                    } catch (InterruptedException e1) {
217                    }
218                }
219            }
220        }
221        
222        asyncDataManager.start();
223        if (deleteAllMessages) {
224            asyncDataManager.delete();
225            try {
226                JournalTrace trace = new JournalTrace();
227                trace.setMessage("DELETED " + new Date());
228                Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
229                asyncDataManager.setMark(location, true);
230                LOG.info("Journal deleted: ");
231                deleteAllMessages = false;
232            } catch (IOException e) {
233                throw e;
234            } catch (Throwable e) {
235                throw IOExceptionSupport.create(e);
236            }
237            referenceStoreAdapter.deleteAllMessages();
238        }
239        referenceStoreAdapter.start();
240        Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
241        LOG.info("Active data files: " + files);
242        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243
244            public boolean iterate() {
245                doCheckpoint();
246                return false;
247            }
248        }, "ActiveMQ Journal Checkpoint Worker");
249        createTransactionStore();
250
251        //
252        // The following was attempting to reduce startup times by avoiding the
253        // log
254        // file scanning that recovery performs. The problem with it is that XA
255        // transactions
256        // only live in transaction log and are not stored in the reference
257        // store, but they still
258        // need to be recovered when the broker starts up.
259
260        if (isForceRecoverReferenceStore()
261                || (isRecoverReferenceStore() && !referenceStoreAdapter
262                        .isStoreValid())) {
263            LOG.warn("The ReferenceStore is not valid - recovering ...");
264            recover();
265            LOG.info("Finished recovering the ReferenceStore");
266        } else {
267            Location location = writeTraceMessage("RECOVERED " + new Date(),
268                    true);
269            asyncDataManager.setMark(location, true);
270            // recover transactions
271            getTransactionStore().setPreparedTransactions(
272                    referenceStoreAdapter.retrievePreparedState());
273        }
274
275        // Do a checkpoint periodically.
276        periodicCheckpointTask = new Runnable() {
277
278            public void run() {
279                checkpoint(false);
280            }
281        };
282        scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
283        periodicCleanupTask = new Runnable() {
284
285            public void run() {
286                cleanup();
287            }
288        };
289        scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
290        
291        if (lockAquired && lockLogged) {
292            LOG.info("Aquired lock for AMQ Store" + getDirectory());
293            if (brokerService != null) {
294                brokerService.getBroker().nowMasterBroker();
295            }
296        }
297
298    }
299
300    public void stop() throws Exception {
301
302        if (!started.compareAndSet(true, false)) {
303            return;
304        }
305        unlock();
306        if (lockFile != null) {
307            lockFile.close();
308            lockFile = null;
309        }
310        this.usageManager.getMemoryUsage().removeUsageListener(this);
311        synchronized (this) {
312            scheduler.cancel(periodicCheckpointTask);
313            scheduler.cancel(periodicCleanupTask);
314        }
315        Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
316        while (queueIterator.hasNext()) {
317            AMQMessageStore ms = queueIterator.next();
318            ms.stop();
319        }
320        Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
321        while (topicIterator.hasNext()) {
322            final AMQTopicMessageStore ms = topicIterator.next();
323            ms.stop();
324        }
325        // Take one final checkpoint and stop checkpoint processing.
326        checkpoint(true);
327        synchronized (this) {
328            checkpointTask.shutdown();
329        }
330        referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
331        queues.clear();
332        topics.clear();
333        IOException firstException = null;
334        referenceStoreAdapter.stop();
335        referenceStoreAdapter = null;
336
337        if (this.brokerService == null) {
338            this.taskRunnerFactory.shutdown();
339            this.scheduler.stop();
340        }
341        try {
342            LOG.debug("Journal close");
343            asyncDataManager.close();
344        } catch (Exception e) {
345            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
346        }
347        if (firstException != null) {
348            throw firstException;
349        }
350    }
351
352    /**
353     * When we checkpoint we move all the journalled data to long term storage.
354     * 
355     * @param sync
356     */
357    public void checkpoint(boolean sync) {
358        try {
359            if (asyncDataManager == null) {
360                throw new IllegalStateException("Journal is closed.");
361            }
362            CountDownLatch latch = null;
363            synchronized (this) {
364                latch = nextCheckpointCountDownLatch;
365                checkpointTask.wakeup();
366            }
367            if (sync) {
368                if (LOG.isDebugEnabled()) {
369                    LOG.debug("Waitng for checkpoint to complete.");
370                }
371                latch.await();
372            }
373            referenceStoreAdapter.checkpoint(sync);
374        } catch (InterruptedException e) {
375            Thread.currentThread().interrupt();
376            LOG.warn("Request to start checkpoint failed: " + e, e);
377        } catch (IOException e) {
378            LOG.error("checkpoint failed: " + e, e);
379        }
380    }
381
382    /**
383     * This does the actual checkpoint.
384     * 
385     * @return true if successful
386     */
387    public boolean doCheckpoint() {
388        CountDownLatch latch = null;
389        synchronized (this) {
390            latch = nextCheckpointCountDownLatch;
391            nextCheckpointCountDownLatch = new CountDownLatch(1);
392        }
393        try {
394            if (LOG.isDebugEnabled()) {
395                LOG.debug("Checkpoint started.");
396            }
397
398            Location currentMark = asyncDataManager.getMark();
399            Location newMark = currentMark;
400            Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
401            while (queueIterator.hasNext()) {
402                final AMQMessageStore ms = queueIterator.next();
403                Location mark = ms.getMark();
404                if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
405                    newMark = mark;
406                }
407            }
408            Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
409            while (topicIterator.hasNext()) {
410                final AMQTopicMessageStore ms = topicIterator.next();
411                Location mark = ms.getMark();
412                if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
413                    newMark = mark;
414                }
415            }
416            try {
417                if (newMark != currentMark) {
418                    if (LOG.isDebugEnabled()) {
419                        LOG.debug("Marking journal at: " + newMark);
420                    }
421                    asyncDataManager.setMark(newMark, false);
422                    writeTraceMessage("CHECKPOINT " + new Date(), true);
423                }
424            } catch (Exception e) {
425                LOG.error("Failed to mark the Journal: " + e, e);
426            }
427            if (LOG.isDebugEnabled()) {
428                LOG.debug("Checkpoint done.");
429            }
430        } finally {
431            latch.countDown();
432        }
433        return true;
434    }
435
436    /**
437     * Cleans up the data files
438     * @throws IOException
439     */
440    public void cleanup() {
441        try {
442            Set<Integer>inProgress = new HashSet<Integer>();
443            if (LOG.isDebugEnabled()) {
444                LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
445            }      
446            for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
447                inProgress.addAll(set.keySet());
448            }
449            Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
450            inProgress.add(lastDataFile);
451            lastDataFile = asyncDataManager.getMark().getDataFileId();
452            inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
453            Location lastActiveTx = transactionStore.checkpoint();
454            if (lastActiveTx != null) {
455                lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
456            }
457            LOG.debug("lastDataFile: " + lastDataFile);
458            asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
459        } catch (IOException e) {
460            LOG.error("Could not cleanup data files: " + e, e);
461        }
462    }
463
464    public Set<ActiveMQDestination> getDestinations() {
465        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
466        destinations.addAll(queues.keySet());
467        destinations.addAll(topics.keySet());
468        return destinations;
469    }
470
471    MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
472        if (destination.isQueue()) {
473            return createQueueMessageStore((ActiveMQQueue)destination);
474        } else {
475            return createTopicMessageStore((ActiveMQTopic)destination);
476        }
477    }
478
479    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
480        AMQMessageStore store = queues.get(destination);
481        if (store == null) {
482            ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
483            store = new AMQMessageStore(this, checkpointStore, destination);
484            try {
485                store.start();
486            } catch (Exception e) {
487                throw IOExceptionSupport.create(e);
488            }
489            queues.put(destination, store);
490        }
491        return store;
492    }
493
494    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
495        AMQTopicMessageStore store = topics.get(destinationName);
496        if (store == null) {
497            TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
498            store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
499            try {
500                store.start();
501            } catch (Exception e) {
502                throw IOExceptionSupport.create(e);
503            }
504            topics.put(destinationName, store);
505        }
506        return store;
507    }
508
509    /**
510     * Cleanup method to remove any state associated with the given destination
511     *
512     * @param destination
513     */
514    public void removeQueueMessageStore(ActiveMQQueue destination) {
515        AMQMessageStore store= queues.remove(destination);
516        referenceStoreAdapter.removeQueueMessageStore(destination);
517    }
518
519    /**
520     * Cleanup method to remove any state associated with the given destination
521     *
522     * @param destination
523     */
524    public void removeTopicMessageStore(ActiveMQTopic destination) {
525        topics.remove(destination);
526    }
527
528    public TransactionStore createTransactionStore() throws IOException {
529        return transactionStore;
530    }
531
532    public long getLastMessageBrokerSequenceId() throws IOException {
533        return referenceStoreAdapter.getLastMessageBrokerSequenceId();
534    }
535
536    public void beginTransaction(ConnectionContext context) throws IOException {
537        referenceStoreAdapter.beginTransaction(context);
538    }
539
540    public void commitTransaction(ConnectionContext context) throws IOException {
541        referenceStoreAdapter.commitTransaction(context);
542    }
543
544    public void rollbackTransaction(ConnectionContext context) throws IOException {
545        referenceStoreAdapter.rollbackTransaction(context);
546    }
547    
548    public boolean isPersistentIndex() {
549                return persistentIndex;
550        }
551
552        public void setPersistentIndex(boolean persistentIndex) {
553                this.persistentIndex = persistentIndex;
554        }
555
556    /**
557     * @param location
558     * @return
559     * @throws IOException
560     */
561    public DataStructure readCommand(Location location) throws IOException {
562        try {
563            ByteSequence packet = asyncDataManager.read(location);
564            return (DataStructure)wireFormat.unmarshal(packet);
565        } catch (IOException e) {
566            throw createReadException(location, e);
567        }
568    }
569
570    /**
571     * Move all the messages that were in the journal into long term storage. We
572     * just replay and do a checkpoint.
573     * 
574     * @throws IOException
575     * @throws IOException
576     * @throws IllegalStateException
577     */
578    private void recover() throws IllegalStateException, IOException {
579        referenceStoreAdapter.clearMessages();
580        Location pos = null;
581        int redoCounter = 0;
582        LOG.info("Journal Recovery Started from: " + asyncDataManager);
583        long start = System.currentTimeMillis();
584        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
585        // While we have records in the journal.
586        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
587            ByteSequence data = asyncDataManager.read(pos);
588            DataStructure c = (DataStructure)wireFormat.unmarshal(data);
589            if (c instanceof Message) {
590                Message message = (Message)c;
591                AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
592                if (message.isInTransaction()) {
593                    transactionStore.addMessage(store, message, pos);
594                } else {
595                    if (store.replayAddMessage(context, message, pos)) {
596                        redoCounter++;
597                    }
598                }
599            } else {
600                switch (c.getDataStructureType()) {
601                case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
602                    referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
603                }
604                    break;
605                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
606                    JournalQueueAck command = (JournalQueueAck)c;
607                    AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
608                    if (command.getMessageAck().isInTransaction()) {
609                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
610                    } else {
611                        if (store.replayRemoveMessage(context, command.getMessageAck())) {
612                            redoCounter++;
613                        }
614                    }
615                }
616                    break;
617                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
618                    JournalTopicAck command = (JournalTopicAck)c;
619                    AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
620                    if (command.getTransactionId() != null) {
621                        transactionStore.acknowledge(store, command, pos);
622                    } else {
623                        if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
624                            redoCounter++;
625                        }
626                    }
627                }
628                    break;
629                case JournalTransaction.DATA_STRUCTURE_TYPE: {
630                    JournalTransaction command = (JournalTransaction)c;
631                    try {
632                        // Try to replay the packet.
633                        switch (command.getType()) {
634                        case JournalTransaction.XA_PREPARE:
635                            transactionStore.replayPrepare(command.getTransactionId());
636                            break;
637                        case JournalTransaction.XA_COMMIT:
638                        case JournalTransaction.LOCAL_COMMIT:
639                            AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
640                            if (tx == null) {
641                                break; // We may be trying to replay a commit
642                            }
643                            // that
644                            // was already committed.
645                            // Replay the committed operations.
646                            tx.getOperations();
647                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
648                                AMQTxOperation op = (AMQTxOperation)iter.next();
649                                if (op.replay(this, context)) {
650                                    redoCounter++;
651                                }
652                            }
653                            break;
654                        case JournalTransaction.LOCAL_ROLLBACK:
655                        case JournalTransaction.XA_ROLLBACK:
656                            transactionStore.replayRollback(command.getTransactionId());
657                            break;
658                        default:
659                            throw new IOException("Invalid journal command type: " + command.getType());
660                        }
661                    } catch (IOException e) {
662                        LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
663                    }
664                }
665                    break;
666                case JournalTrace.DATA_STRUCTURE_TYPE:
667                    JournalTrace trace = (JournalTrace)c;
668                    LOG.debug("TRACE Entry: " + trace.getMessage());
669                    break;
670                default:
671                    LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
672                }
673            }
674        }
675        Location location = writeTraceMessage("RECOVERED " + new Date(), true);
676        asyncDataManager.setMark(location, true);
677        long end = System.currentTimeMillis();
678        LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
679    }
680
681    private IOException createReadException(Location location, Exception e) {
682        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
683    }
684
685    protected IOException createWriteException(DataStructure packet, Exception e) {
686        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
687    }
688
689    protected IOException createWriteException(String command, Exception e) {
690        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
691    }
692
693    protected IOException createRecoveryFailedException(Exception e) {
694        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
695    }
696
697    /**
698     * @param command
699     * @param syncHint
700     * @return
701     * @throws IOException
702     */
703    public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
704        return writeCommand(command, syncHint,false);
705    }
706    
707    public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
708        try {
709                return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
710        } catch (IOException ioe) {
711                LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
712                brokerService.handleIOException(ioe);
713                throw ioe;
714        }
715    }
716
717    private Location writeTraceMessage(String message, boolean sync) throws IOException {
718        JournalTrace trace = new JournalTrace();
719        trace.setMessage(message);
720        return writeCommand(trace, sync);
721    }
722
723    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
724        newPercentUsage = (newPercentUsage / 10) * 10;
725        oldPercentUsage = (oldPercentUsage / 10) * 10;
726        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
727            checkpoint(false);
728        }
729    }
730
731    public AMQTransactionStore getTransactionStore() {
732        return transactionStore;
733    }
734
735    public synchronized void deleteAllMessages() throws IOException {
736        deleteAllMessages = true;
737    }
738
739    @Override
740    public String toString() {
741        return "AMQPersistenceAdapter(" + directory + ")";
742    }
743
744    // /////////////////////////////////////////////////////////////////
745    // Subclass overridables
746    // /////////////////////////////////////////////////////////////////
747    protected AsyncDataManager createAsyncDataManager() {
748        AsyncDataManager manager = new AsyncDataManager(storeSize);
749        manager.setDirectory(new File(directory, "journal"));
750        manager.setDirectoryArchive(getDirectoryArchive());
751        manager.setArchiveDataLogs(isArchiveDataLogs());
752        manager.setMaxFileLength(maxFileLength);
753        manager.setUseNio(useNio);    
754        return manager;
755    }
756
757    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
758        KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
759        adaptor.setPersistentIndex(isPersistentIndex());
760        adaptor.setIndexBinSize(getIndexBinSize());
761        adaptor.setIndexKeySize(getIndexKeySize());
762        adaptor.setIndexPageSize(getIndexPageSize());
763        adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
764        adaptor.setIndexLoadFactor(getIndexLoadFactor());
765        return adaptor;
766    }
767
768    // /////////////////////////////////////////////////////////////////
769    // Property Accessors
770    // /////////////////////////////////////////////////////////////////
771    public AsyncDataManager getAsyncDataManager() {
772        return asyncDataManager;
773    }
774
775    public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
776        this.asyncDataManager = asyncDataManager;
777    }
778
779    public ReferenceStoreAdapter getReferenceStoreAdapter() {
780        return referenceStoreAdapter;
781    }
782
783    public TaskRunnerFactory getTaskRunnerFactory() {
784        return taskRunnerFactory;
785    }
786
787    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
788        this.taskRunnerFactory = taskRunnerFactory;
789    }
790
791    /**
792     * @return Returns the wireFormat.
793     */
794    public WireFormat getWireFormat() {
795        return wireFormat;
796    }
797
798    public void setWireFormat(WireFormat wireFormat) {
799        this.wireFormat = wireFormat;
800    }
801
802    public SystemUsage getUsageManager() {
803        return usageManager;
804    }
805
806    public void setUsageManager(SystemUsage usageManager) {
807        this.usageManager = usageManager;
808    }
809
810    public int getMaxCheckpointMessageAddSize() {
811        return maxCheckpointMessageAddSize;
812    }
813
814    /** 
815     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
816     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
817     */
818    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
819        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
820    }
821
822   
823    public synchronized File getDirectory() {
824        return directory;
825    }
826
827    public synchronized void setDirectory(File directory) {
828        this.directory = directory;
829    }
830
831    public boolean isSyncOnWrite() {
832        return this.syncOnWrite;
833    }
834
835    public void setSyncOnWrite(boolean syncOnWrite) {
836        this.syncOnWrite = syncOnWrite;
837    }
838    
839    /**
840     * @param referenceStoreAdapter the referenceStoreAdapter to set
841     */
842    public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
843        this.referenceStoreAdapter = referenceStoreAdapter;
844    }
845    
846    public long size(){
847        return storeSize.get();
848    }
849
850        public boolean isUseNio() {
851                return useNio;
852        }
853
854        public void setUseNio(boolean useNio) {
855                this.useNio = useNio;
856        }
857
858        public int getMaxFileLength() {
859                return maxFileLength;
860        }
861
862         /**
863      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
864      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
865      */
866        public void setMaxFileLength(int maxFileLength) {
867                this.maxFileLength = maxFileLength;
868        }
869        
870        public long getCleanupInterval() {
871        return cleanupInterval;
872    }
873
874    public void setCleanupInterval(long cleanupInterval) {
875        this.cleanupInterval = cleanupInterval;
876    }
877
878    public long getCheckpointInterval() {
879        return checkpointInterval;
880    }
881
882    public void setCheckpointInterval(long checkpointInterval) {
883        this.checkpointInterval = checkpointInterval;
884    }
885    
886    public int getIndexBinSize() {
887        return indexBinSize;
888    }
889
890    public void setIndexBinSize(int indexBinSize) {
891        this.indexBinSize = indexBinSize;
892    }
893
894    public int getIndexKeySize() {
895        return indexKeySize;
896    }
897
898    public void setIndexKeySize(int indexKeySize) {
899        this.indexKeySize = indexKeySize;
900    }
901
902    public int getIndexPageSize() {
903        return indexPageSize;
904    }
905    
906    public int getIndexMaxBinSize() {
907        return indexMaxBinSize;
908    }
909
910    public void setIndexMaxBinSize(int maxBinSize) {
911        this.indexMaxBinSize = maxBinSize;
912    }
913
914    /**
915     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
916     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
917     */
918    public void setIndexPageSize(int indexPageSize) {
919        this.indexPageSize = indexPageSize;
920    }
921    
922    public void setIndexLoadFactor(int factor){
923        this.indexLoadFactor=factor;    
924    }
925    
926    public int getIndexLoadFactor(){
927        return this.indexLoadFactor;
928    }
929    
930    public int getMaxReferenceFileLength() {
931        return maxReferenceFileLength;
932    }
933
934    /**
935     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
936     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
937     */
938    public void setMaxReferenceFileLength(int maxReferenceFileLength) {
939        this.maxReferenceFileLength = maxReferenceFileLength;
940    }
941    
942    public File getDirectoryArchive() {
943        return directoryArchive;
944    }
945
946    public void setDirectoryArchive(File directoryArchive) {
947        this.directoryArchive = directoryArchive;
948    }
949
950    public boolean isArchiveDataLogs() {
951        return archiveDataLogs;
952    }
953
954    public void setArchiveDataLogs(boolean archiveDataLogs) {
955        this.archiveDataLogs = archiveDataLogs;
956    }  
957    
958    public boolean isDisableLocking() {
959        return disableLocking;
960    }
961
962    public void setDisableLocking(boolean disableLocking) {
963        this.disableLocking = disableLocking;
964    }
965    
966    /**
967     * @return the recoverReferenceStore
968     */
969    public boolean isRecoverReferenceStore() {
970        return recoverReferenceStore;
971    }
972
973    /**
974     * @param recoverReferenceStore the recoverReferenceStore to set
975     */
976    public void setRecoverReferenceStore(boolean recoverReferenceStore) {
977        this.recoverReferenceStore = recoverReferenceStore;
978    }
979
980    /**
981     * @return the forceRecoverReferenceStore
982     */
983    public boolean isForceRecoverReferenceStore() {
984        return forceRecoverReferenceStore;
985    }
986
987    /**
988     * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
989     */
990    public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
991        this.forceRecoverReferenceStore = forceRecoverReferenceStore;
992    }
993    
994    public boolean isUseDedicatedTaskRunner() {
995        return useDedicatedTaskRunner;
996    }
997    
998    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
999        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1000    }
1001    
1002    /**
1003     * @return the journalThreadPriority
1004     */
1005    public int getJournalThreadPriority() {
1006        return this.journalThreadPriority;
1007    }
1008
1009    /**
1010     * @param journalThreadPriority the journalThreadPriority to set
1011     */
1012    public void setJournalThreadPriority(int journalThreadPriority) {
1013        this.journalThreadPriority = journalThreadPriority;
1014    }
1015
1016        
1017        protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1018            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1019            if (map == null) {
1020                map = new ConcurrentHashMap<Integer, AtomicInteger>();
1021                dataFilesInProgress.put(store, map);
1022            }
1023            AtomicInteger count = map.get(dataFileId);
1024            if (count == null) {
1025                count = new AtomicInteger(0);
1026                map.put(dataFileId, count);
1027            }
1028            count.incrementAndGet();
1029        }
1030        
1031        protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1032        Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1033        if (map != null) {
1034            AtomicInteger count = map.get(dataFileId);
1035            if (count != null) {
1036                int newCount = count.decrementAndGet(); 
1037                if (newCount <=0) {
1038                    map.remove(dataFileId);
1039                }
1040            }
1041            if (map.isEmpty()) {
1042                dataFilesInProgress.remove(store);
1043            }
1044        }
1045    }
1046        
1047        
1048        protected void lock() throws Exception {
1049        lockLogged = false;
1050        lockAquired = false;
1051        do {
1052            if (doLock()) {
1053                lockAquired = true;
1054            } else {
1055                if (!lockLogged) {
1056                    LOG.warn("Waiting to Lock the Store " + getDirectory());
1057                    lockLogged = true;
1058                }
1059                Thread.sleep(1000);
1060            }
1061
1062        } while (!lockAquired && !disableLocking);
1063    }
1064        
1065        private synchronized void unlock() throws IOException {
1066        if (!disableLocking && (null != lock)) {
1067            //clear property doesn't work on some platforms
1068            System.getProperties().remove(getPropertyKey());
1069            System.clearProperty(getPropertyKey());
1070            assert(System.getProperty(getPropertyKey())==null);
1071            if (lock.isValid()) {
1072                lock.release();
1073                lock.channel().close();
1074                
1075            }
1076            lock = null;
1077        }
1078    }
1079
1080        
1081        protected boolean doLock() throws IOException {
1082            boolean result = true;
1083            if (!disableLocking && directory != null && lock == null) {
1084            String key = getPropertyKey();
1085            String property = System.getProperty(key);
1086            if (null == property) {
1087                if (!BROKEN_FILE_LOCK) {
1088                    lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false);
1089                    if (lock == null) {
1090                        result = false;
1091                    } else {
1092                        System.setProperty(key, new Date().toString());
1093                    }
1094                }
1095            } else { // already locked
1096                result = false;
1097            }
1098        }
1099            return result;
1100        }
1101        
1102        private String getPropertyKey() throws IOException {
1103        return getClass().getName() + ".lock." + directory.getCanonicalPath();
1104    }
1105        
1106        static {
1107            BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1108                    + ".FileLockBroken",
1109                    "false"));
1110            DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1111                   + ".DisableLocking",
1112                   "false"));
1113        }
1114
1115        
1116    public long getLastProducerSequenceId(ProducerId id) {
1117        // reference store send has adequate duplicate suppression
1118        return -1;
1119    }
1120}