001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Set;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.FutureTask;
029    import java.util.concurrent.LinkedBlockingQueue;
030    import java.util.concurrent.ThreadFactory;
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import org.apache.activeio.journal.InvalidRecordLocationException;
035    import org.apache.activeio.journal.Journal;
036    import org.apache.activeio.journal.JournalEventListener;
037    import org.apache.activeio.journal.RecordLocation;
038    import org.apache.activeio.packet.ByteArrayPacket;
039    import org.apache.activeio.packet.Packet;
040    import org.apache.activemq.broker.BrokerService;
041    import org.apache.activemq.broker.BrokerServiceAware;
042    import org.apache.activemq.broker.ConnectionContext;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.ActiveMQQueue;
045    import org.apache.activemq.command.ActiveMQTopic;
046    import org.apache.activemq.command.DataStructure;
047    import org.apache.activemq.command.JournalQueueAck;
048    import org.apache.activemq.command.JournalTopicAck;
049    import org.apache.activemq.command.JournalTrace;
050    import org.apache.activemq.command.JournalTransaction;
051    import org.apache.activemq.command.Message;
052    import org.apache.activemq.command.MessageAck;
053    import org.apache.activemq.command.ProducerId;
054    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055    import org.apache.activemq.openwire.OpenWireFormat;
056    import org.apache.activemq.store.MessageStore;
057    import org.apache.activemq.store.PersistenceAdapter;
058    import org.apache.activemq.store.TopicMessageStore;
059    import org.apache.activemq.store.TransactionStore;
060    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061    import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062    import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063    import org.apache.activemq.thread.Scheduler;
064    import org.apache.activemq.thread.Task;
065    import org.apache.activemq.thread.TaskRunner;
066    import org.apache.activemq.thread.TaskRunnerFactory;
067    import org.apache.activemq.usage.SystemUsage;
068    import org.apache.activemq.usage.Usage;
069    import org.apache.activemq.usage.UsageListener;
070    import org.apache.activemq.util.ByteSequence;
071    import org.apache.activemq.util.IOExceptionSupport;
072    import org.apache.activemq.wireformat.WireFormat;
073    import org.slf4j.Logger;
074    import org.slf4j.LoggerFactory;
075    
076    /**
077     * An implementation of {@link PersistenceAdapter} designed for use with a
078     * {@link Journal} and then check pointing asynchronously on a timeout with some
079     * other long term persistent storage.
080     * 
081     * @org.apache.xbean.XBean
082     * 
083     */
084    public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085    
086        private BrokerService brokerService;
087            
088        protected Scheduler scheduler;
089        private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
090    
091        private Journal journal;
092        private PersistenceAdapter longTermPersistence;
093    
094        private final WireFormat wireFormat = new OpenWireFormat();
095    
096        private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097        private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098    
099        private SystemUsage usageManager;
100        private final long checkpointInterval = 1000 * 60 * 5;
101        private long lastCheckpointRequest = System.currentTimeMillis();
102        private long lastCleanup = System.currentTimeMillis();
103        private int maxCheckpointWorkers = 10;
104        private int maxCheckpointMessageAddSize = 1024 * 1024;
105    
106        private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107        private ThreadPoolExecutor checkpointExecutor;
108    
109        private TaskRunner checkpointTask;
110        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111        private boolean fullCheckPoint;
112    
113        private final AtomicBoolean started = new AtomicBoolean(false);
114    
115        private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116    
117        private TaskRunnerFactory taskRunnerFactory;
118        private File directory;
119    
120        public JournalPersistenceAdapter() {        
121        }
122        
123        public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
124            setJournal(journal);
125            setTaskRunnerFactory(taskRunnerFactory);
126            setPersistenceAdapter(longTermPersistence);
127        }
128    
129        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
130            this.taskRunnerFactory = taskRunnerFactory;
131        }
132    
133        public void setJournal(Journal journal) {
134            this.journal = journal;
135            journal.setJournalEventListener(this);
136        }
137        
138        public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
139            this.longTermPersistence = longTermPersistence;
140        }
141        
142        final Runnable createPeriodicCheckpointTask() {
143            return new Runnable() {
144                public void run() {
145                    long lastTime = 0;
146                    synchronized (this) {
147                        lastTime = lastCheckpointRequest;
148                    }
149                    if (System.currentTimeMillis() > lastTime + checkpointInterval) {
150                        checkpoint(false, true);
151                    }
152                }
153            };
154        }
155    
156        /**
157         * @param usageManager The UsageManager that is controlling the
158         *                destination's memory usage.
159         */
160        public void setUsageManager(SystemUsage usageManager) {
161            this.usageManager = usageManager;
162            longTermPersistence.setUsageManager(usageManager);
163        }
164    
165        public Set<ActiveMQDestination> getDestinations() {
166            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
167            destinations.addAll(queues.keySet());
168            destinations.addAll(topics.keySet());
169            return destinations;
170        }
171    
172        private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
173            if (destination.isQueue()) {
174                return createQueueMessageStore((ActiveMQQueue)destination);
175            } else {
176                return createTopicMessageStore((ActiveMQTopic)destination);
177            }
178        }
179    
180        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
181            JournalMessageStore store = queues.get(destination);
182            if (store == null) {
183                MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
184                store = new JournalMessageStore(this, checkpointStore, destination);
185                queues.put(destination, store);
186            }
187            return store;
188        }
189    
190        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
191            JournalTopicMessageStore store = topics.get(destinationName);
192            if (store == null) {
193                TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
194                store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
195                topics.put(destinationName, store);
196            }
197            return store;
198        }
199    
200        /**
201         * Cleanup method to remove any state associated with the given destination
202         *
203         * @param destination Destination to forget
204         */
205        public void removeQueueMessageStore(ActiveMQQueue destination) {
206            queues.remove(destination);
207        }
208    
209        /**
210         * Cleanup method to remove any state associated with the given destination
211         *
212         * @param destination Destination to forget
213         */
214        public void removeTopicMessageStore(ActiveMQTopic destination) {
215            topics.remove(destination);
216        }
217    
218        public TransactionStore createTransactionStore() throws IOException {
219            return transactionStore;
220        }
221    
222        public long getLastMessageBrokerSequenceId() throws IOException {
223            return longTermPersistence.getLastMessageBrokerSequenceId();
224        }
225    
226        public void beginTransaction(ConnectionContext context) throws IOException {
227            longTermPersistence.beginTransaction(context);
228        }
229    
230        public void commitTransaction(ConnectionContext context) throws IOException {
231            longTermPersistence.commitTransaction(context);
232        }
233    
234        public void rollbackTransaction(ConnectionContext context) throws IOException {
235            longTermPersistence.rollbackTransaction(context);
236        }
237    
238        public synchronized void start() throws Exception {
239            if (!started.compareAndSet(false, true)) {
240                return;
241            }
242    
243            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
244                public boolean iterate() {
245                    return doCheckpoint();
246                }
247            }, "ActiveMQ Journal Checkpoint Worker");
248    
249            checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
250                public Thread newThread(Runnable runable) {
251                    Thread t = new Thread(runable, "Journal checkpoint worker");
252                    t.setPriority(7);
253                    return t;
254                }
255            });
256            // checkpointExecutor.allowCoreThreadTimeOut(true);
257    
258            this.usageManager.getMemoryUsage().addUsageListener(this);
259    
260            if (longTermPersistence instanceof JDBCPersistenceAdapter) {
261                // Disabled periodic clean up as it deadlocks with the checkpoint
262                // operations.
263                ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
264            }
265    
266            longTermPersistence.start();
267            createTransactionStore();
268            recover();
269    
270            // Do a checkpoint periodically.
271            this.scheduler = new Scheduler("Journal Scheduler");
272            this.scheduler.start();
273            this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
274    
275        }
276    
277        public void stop() throws Exception {
278    
279            this.usageManager.getMemoryUsage().removeUsageListener(this);
280            if (!started.compareAndSet(true, false)) {
281                return;
282            }
283    
284            this.scheduler.cancel(periodicCheckpointTask);
285            this.scheduler.stop();
286    
287            // Take one final checkpoint and stop checkpoint processing.
288            checkpoint(true, true);
289            checkpointTask.shutdown();
290            checkpointExecutor.shutdown();
291    
292            queues.clear();
293            topics.clear();
294    
295            IOException firstException = null;
296            try {
297                journal.close();
298            } catch (Exception e) {
299                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
300            }
301            longTermPersistence.stop();
302    
303            if (firstException != null) {
304                throw firstException;
305            }
306        }
307    
308        // Properties
309        // -------------------------------------------------------------------------
310        public PersistenceAdapter getLongTermPersistence() {
311            return longTermPersistence;
312        }
313    
314        /**
315         * @return Returns the wireFormat.
316         */
317        public WireFormat getWireFormat() {
318            return wireFormat;
319        }
320    
321        // Implementation methods
322        // -------------------------------------------------------------------------
323    
324        /**
325         * The Journal give us a call back so that we can move old data out of the
326         * journal. Taking a checkpoint does this for us.
327         * 
328         * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
329         */
330        public void overflowNotification(RecordLocation safeLocation) {
331            checkpoint(false, true);
332        }
333    
334        /**
335         * When we checkpoint we move all the journalled data to long term storage.
336         * 
337         */
338        public void checkpoint(boolean sync, boolean fullCheckpoint) {
339            try {
340                if (journal == null) {
341                    throw new IllegalStateException("Journal is closed.");
342                }
343    
344                long now = System.currentTimeMillis();
345                CountDownLatch latch = null;
346                synchronized (this) {
347                    latch = nextCheckpointCountDownLatch;
348                    lastCheckpointRequest = now;
349                    if (fullCheckpoint) {
350                        this.fullCheckPoint = true;
351                    }
352                }
353    
354                checkpointTask.wakeup();
355    
356                if (sync) {
357                    LOG.debug("Waking for checkpoint to complete.");
358                    latch.await();
359                }
360            } catch (InterruptedException e) {
361                Thread.currentThread().interrupt();
362                LOG.warn("Request to start checkpoint failed: " + e, e);
363            }
364        }
365    
366        public void checkpoint(boolean sync) {
367            checkpoint(sync, sync);
368        }
369    
370        /**
371         * This does the actual checkpoint.
372         * 
373         * @return
374         */
375        public boolean doCheckpoint() {
376            CountDownLatch latch = null;
377            boolean fullCheckpoint;
378            synchronized (this) {
379                latch = nextCheckpointCountDownLatch;
380                nextCheckpointCountDownLatch = new CountDownLatch(1);
381                fullCheckpoint = this.fullCheckPoint;
382                this.fullCheckPoint = false;
383            }
384            try {
385    
386                LOG.debug("Checkpoint started.");
387                RecordLocation newMark = null;
388    
389                ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
390    
391                //
392                // We do many partial checkpoints (fullCheckpoint==false) to move
393                // topic messages
394                // to long term store as soon as possible.
395                // 
396                // We want to avoid doing that for queue messages since removes the
397                // come in the same
398                // checkpoint cycle will nullify the previous message add.
399                // Therefore, we only
400                // checkpoint queues on the fullCheckpoint cycles.
401                //
402                if (fullCheckpoint) {
403                    Iterator<JournalMessageStore> iterator = queues.values().iterator();
404                    while (iterator.hasNext()) {
405                        try {
406                            final JournalMessageStore ms = iterator.next();
407                            FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
408                                public RecordLocation call() throws Exception {
409                                    return ms.checkpoint();
410                                }
411                            });
412                            futureTasks.add(task);
413                            checkpointExecutor.execute(task);
414                        } catch (Exception e) {
415                            LOG.error("Failed to checkpoint a message store: " + e, e);
416                        }
417                    }
418                }
419    
420                Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
421                while (iterator.hasNext()) {
422                    try {
423                        final JournalTopicMessageStore ms = iterator.next();
424                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
425                            public RecordLocation call() throws Exception {
426                                return ms.checkpoint();
427                            }
428                        });
429                        futureTasks.add(task);
430                        checkpointExecutor.execute(task);
431                    } catch (Exception e) {
432                        LOG.error("Failed to checkpoint a message store: " + e, e);
433                    }
434                }
435    
436                try {
437                    for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
438                        FutureTask<RecordLocation> ft = iter.next();
439                        RecordLocation mark = ft.get();
440                        // We only set a newMark on full checkpoints.
441                        if (fullCheckpoint) {
442                            if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
443                                newMark = mark;
444                            }
445                        }
446                    }
447                } catch (Throwable e) {
448                    LOG.error("Failed to checkpoint a message store: " + e, e);
449                }
450    
451                if (fullCheckpoint) {
452                    try {
453                        if (newMark != null) {
454                            LOG.debug("Marking journal at: " + newMark);
455                            journal.setMark(newMark, true);
456                        }
457                    } catch (Exception e) {
458                        LOG.error("Failed to mark the Journal: " + e, e);
459                    }
460    
461                    if (longTermPersistence instanceof JDBCPersistenceAdapter) {
462                        // We may be check pointing more often than the
463                        // checkpointInterval if under high use
464                        // But we don't want to clean up the db that often.
465                        long now = System.currentTimeMillis();
466                        if (now > lastCleanup + checkpointInterval) {
467                            lastCleanup = now;
468                            ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
469                        }
470                    }
471                }
472    
473                LOG.debug("Checkpoint done.");
474            } finally {
475                latch.countDown();
476            }
477            synchronized (this) {
478                return this.fullCheckPoint;
479            }
480    
481        }
482    
483        /**
484         * @param location
485         * @return
486         * @throws IOException
487         */
488        public DataStructure readCommand(RecordLocation location) throws IOException {
489            try {
490                Packet packet = journal.read(location);
491                return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
492            } catch (InvalidRecordLocationException e) {
493                throw createReadException(location, e);
494            } catch (IOException e) {
495                throw createReadException(location, e);
496            }
497        }
498    
499        /**
500         * Move all the messages that were in the journal into long term storage. We
501         * just replay and do a checkpoint.
502         * 
503         * @throws IOException
504         * @throws IOException
505         * @throws InvalidRecordLocationException
506         * @throws IllegalStateException
507         */
508        private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
509    
510            RecordLocation pos = null;
511            int transactionCounter = 0;
512    
513            LOG.info("Journal Recovery Started from: " + journal);
514            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
515    
516            // While we have records in the journal.
517            while ((pos = journal.getNextRecordLocation(pos)) != null) {
518                Packet data = journal.read(pos);
519                DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
520    
521                if (c instanceof Message) {
522                    Message message = (Message)c;
523                    JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
524                    if (message.isInTransaction()) {
525                        transactionStore.addMessage(store, message, pos);
526                    } else {
527                        store.replayAddMessage(context, message);
528                        transactionCounter++;
529                    }
530                } else {
531                    switch (c.getDataStructureType()) {
532                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
533                        JournalQueueAck command = (JournalQueueAck)c;
534                        JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
535                        if (command.getMessageAck().isInTransaction()) {
536                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
537                        } else {
538                            store.replayRemoveMessage(context, command.getMessageAck());
539                            transactionCounter++;
540                        }
541                    }
542                        break;
543                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
544                        JournalTopicAck command = (JournalTopicAck)c;
545                        JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
546                        if (command.getTransactionId() != null) {
547                            transactionStore.acknowledge(store, command, pos);
548                        } else {
549                            store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
550                            transactionCounter++;
551                        }
552                    }
553                        break;
554                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
555                        JournalTransaction command = (JournalTransaction)c;
556                        try {
557                            // Try to replay the packet.
558                            switch (command.getType()) {
559                            case JournalTransaction.XA_PREPARE:
560                                transactionStore.replayPrepare(command.getTransactionId());
561                                break;
562                            case JournalTransaction.XA_COMMIT:
563                            case JournalTransaction.LOCAL_COMMIT:
564                                Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
565                                if (tx == null) {
566                                    break; // We may be trying to replay a commit
567                                }
568                                // that
569                                // was already committed.
570    
571                                // Replay the committed operations.
572                                tx.getOperations();
573                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
574                                    TxOperation op = (TxOperation)iter.next();
575                                    if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
576                                        op.store.replayAddMessage(context, (Message)op.data);
577                                    }
578                                    if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
579                                        op.store.replayRemoveMessage(context, (MessageAck)op.data);
580                                    }
581                                    if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
582                                        JournalTopicAck ack = (JournalTopicAck)op.data;
583                                        ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
584                                    }
585                                }
586                                transactionCounter++;
587                                break;
588                            case JournalTransaction.LOCAL_ROLLBACK:
589                            case JournalTransaction.XA_ROLLBACK:
590                                transactionStore.replayRollback(command.getTransactionId());
591                                break;
592                            default:
593                                throw new IOException("Invalid journal command type: " + command.getType());
594                            }
595                        } catch (IOException e) {
596                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
597                        }
598                    }
599                        break;
600                    case JournalTrace.DATA_STRUCTURE_TYPE:
601                        JournalTrace trace = (JournalTrace)c;
602                        LOG.debug("TRACE Entry: " + trace.getMessage());
603                        break;
604                    default:
605                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
606                    }
607                }
608            }
609    
610            RecordLocation location = writeTraceMessage("RECOVERED", true);
611            journal.setMark(location, true);
612    
613            LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
614        }
615    
616        private IOException createReadException(RecordLocation location, Exception e) {
617            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
618        }
619    
620        protected IOException createWriteException(DataStructure packet, Exception e) {
621            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
622        }
623    
624        protected IOException createWriteException(String command, Exception e) {
625            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
626        }
627    
628        protected IOException createRecoveryFailedException(Exception e) {
629            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
630        }
631    
632        /**
633         * @param command
634         * @param sync
635         * @return
636         * @throws IOException
637         */
638        public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
639            if (started.get()) {
640                try {
641                        return journal.write(toPacket(wireFormat.marshal(command)), sync);
642                } catch (IOException ioe) {
643                        LOG.error("Cannot write to the journal", ioe);
644                        brokerService.handleIOException(ioe);
645                        throw ioe;
646                }
647            }
648            throw new IOException("closed");
649        }
650    
651        private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
652            JournalTrace trace = new JournalTrace();
653            trace.setMessage(message);
654            return writeCommand(trace, sync);
655        }
656    
657        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
658            newPercentUsage = (newPercentUsage / 10) * 10;
659            oldPercentUsage = (oldPercentUsage / 10) * 10;
660            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
661                boolean sync = newPercentUsage >= 90;
662                checkpoint(sync, true);
663            }
664        }
665    
666        public JournalTransactionStore getTransactionStore() {
667            return transactionStore;
668        }
669    
670        public void deleteAllMessages() throws IOException {
671            try {
672                JournalTrace trace = new JournalTrace();
673                trace.setMessage("DELETED");
674                RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
675                journal.setMark(location, true);
676                LOG.info("Journal deleted: ");
677            } catch (IOException e) {
678                throw e;
679            } catch (Throwable e) {
680                throw IOExceptionSupport.create(e);
681            }
682            longTermPersistence.deleteAllMessages();
683        }
684    
685        public SystemUsage getUsageManager() {
686            return usageManager;
687        }
688    
689        public int getMaxCheckpointMessageAddSize() {
690            return maxCheckpointMessageAddSize;
691        }
692    
693        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
694            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
695        }
696    
697        public int getMaxCheckpointWorkers() {
698            return maxCheckpointWorkers;
699        }
700    
701        public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
702            this.maxCheckpointWorkers = maxCheckpointWorkers;
703        }
704    
705        public boolean isUseExternalMessageReferences() {
706            return false;
707        }
708    
709        public void setUseExternalMessageReferences(boolean enable) {
710            if (enable) {
711                throw new IllegalArgumentException("The journal does not support message references.");
712            }
713        }
714    
715        public Packet toPacket(ByteSequence sequence) {
716            return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
717        }
718    
719        public ByteSequence toByteSequence(Packet packet) {
720            org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
721            return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
722        }
723    
724        public void setBrokerName(String brokerName) {
725            longTermPersistence.setBrokerName(brokerName);
726        }
727    
728        @Override
729        public String toString() {
730            return "JournalPersistenceAdapator(" + longTermPersistence + ")";
731        }
732    
733        public void setDirectory(File dir) {
734            this.directory=dir;
735        }
736        
737        public File getDirectory(){
738            return directory;
739        }
740        
741        public long size(){
742            return 0;
743        }
744    
745        public void setBrokerService(BrokerService brokerService) {
746            this.brokerService = brokerService;
747            PersistenceAdapter pa = getLongTermPersistence();
748            if( pa instanceof BrokerServiceAware ) {
749                ((BrokerServiceAware)pa).setBrokerService(brokerService);
750            }
751        }
752    
753        public long getLastProducerSequenceId(ProducerId id) {
754            return -1;
755        }
756    
757    }