001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Set;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.FutureTask;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.activeio.journal.InvalidRecordLocationException;
035import org.apache.activeio.journal.Journal;
036import org.apache.activeio.journal.JournalEventListener;
037import org.apache.activeio.journal.RecordLocation;
038import org.apache.activeio.packet.ByteArrayPacket;
039import org.apache.activeio.packet.Packet;
040import org.apache.activemq.broker.BrokerService;
041import org.apache.activemq.broker.BrokerServiceAware;
042import org.apache.activemq.broker.ConnectionContext;
043import org.apache.activemq.command.ActiveMQDestination;
044import org.apache.activemq.command.ActiveMQQueue;
045import org.apache.activemq.command.ActiveMQTopic;
046import org.apache.activemq.command.DataStructure;
047import org.apache.activemq.command.JournalQueueAck;
048import org.apache.activemq.command.JournalTopicAck;
049import org.apache.activemq.command.JournalTrace;
050import org.apache.activemq.command.JournalTransaction;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.ProducerId;
054import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055import org.apache.activemq.openwire.OpenWireFormat;
056import org.apache.activemq.store.MessageStore;
057import org.apache.activemq.store.PersistenceAdapter;
058import org.apache.activemq.store.TopicMessageStore;
059import org.apache.activemq.store.TransactionStore;
060import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063import org.apache.activemq.thread.Scheduler;
064import org.apache.activemq.thread.Task;
065import org.apache.activemq.thread.TaskRunner;
066import org.apache.activemq.thread.TaskRunnerFactory;
067import org.apache.activemq.usage.SystemUsage;
068import org.apache.activemq.usage.Usage;
069import org.apache.activemq.usage.UsageListener;
070import org.apache.activemq.util.ByteSequence;
071import org.apache.activemq.util.IOExceptionSupport;
072import org.apache.activemq.wireformat.WireFormat;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076/**
077 * An implementation of {@link PersistenceAdapter} designed for use with a
078 * {@link Journal} and then check pointing asynchronously on a timeout with some
079 * other long term persistent storage.
080 * 
081 * @org.apache.xbean.XBean
082 * 
083 */
084public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085
086    private BrokerService brokerService;
087        
088    protected Scheduler scheduler;
089    private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
090
091    private Journal journal;
092    private PersistenceAdapter longTermPersistence;
093
094    private final WireFormat wireFormat = new OpenWireFormat();
095
096    private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097    private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098
099    private SystemUsage usageManager;
100    private final long checkpointInterval = 1000 * 60 * 5;
101    private long lastCheckpointRequest = System.currentTimeMillis();
102    private long lastCleanup = System.currentTimeMillis();
103    private int maxCheckpointWorkers = 10;
104    private int maxCheckpointMessageAddSize = 1024 * 1024;
105
106    private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107    private ThreadPoolExecutor checkpointExecutor;
108
109    private TaskRunner checkpointTask;
110    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111    private boolean fullCheckPoint;
112
113    private final AtomicBoolean started = new AtomicBoolean(false);
114
115    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116
117    private TaskRunnerFactory taskRunnerFactory;
118    private File directory;
119
120    public JournalPersistenceAdapter() {        
121    }
122    
123    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
124        setJournal(journal);
125        setTaskRunnerFactory(taskRunnerFactory);
126        setPersistenceAdapter(longTermPersistence);
127    }
128
129    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
130        this.taskRunnerFactory = taskRunnerFactory;
131    }
132
133    public void setJournal(Journal journal) {
134        this.journal = journal;
135        journal.setJournalEventListener(this);
136    }
137    
138    public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
139        this.longTermPersistence = longTermPersistence;
140    }
141    
142    final Runnable createPeriodicCheckpointTask() {
143        return new Runnable() {
144            public void run() {
145                long lastTime = 0;
146                synchronized (this) {
147                    lastTime = lastCheckpointRequest;
148                }
149                if (System.currentTimeMillis() > lastTime + checkpointInterval) {
150                    checkpoint(false, true);
151                }
152            }
153        };
154    }
155
156    /**
157     * @param usageManager The UsageManager that is controlling the
158     *                destination's memory usage.
159     */
160    public void setUsageManager(SystemUsage usageManager) {
161        this.usageManager = usageManager;
162        longTermPersistence.setUsageManager(usageManager);
163    }
164
165    public Set<ActiveMQDestination> getDestinations() {
166        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
167        destinations.addAll(queues.keySet());
168        destinations.addAll(topics.keySet());
169        return destinations;
170    }
171
172    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
173        if (destination.isQueue()) {
174            return createQueueMessageStore((ActiveMQQueue)destination);
175        } else {
176            return createTopicMessageStore((ActiveMQTopic)destination);
177        }
178    }
179
180    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
181        JournalMessageStore store = queues.get(destination);
182        if (store == null) {
183            MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
184            store = new JournalMessageStore(this, checkpointStore, destination);
185            queues.put(destination, store);
186        }
187        return store;
188    }
189
190    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
191        JournalTopicMessageStore store = topics.get(destinationName);
192        if (store == null) {
193            TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
194            store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
195            topics.put(destinationName, store);
196        }
197        return store;
198    }
199
200    /**
201     * Cleanup method to remove any state associated with the given destination
202     *
203     * @param destination Destination to forget
204     */
205    public void removeQueueMessageStore(ActiveMQQueue destination) {
206        queues.remove(destination);
207    }
208
209    /**
210     * Cleanup method to remove any state associated with the given destination
211     *
212     * @param destination Destination to forget
213     */
214    public void removeTopicMessageStore(ActiveMQTopic destination) {
215        topics.remove(destination);
216    }
217
218    public TransactionStore createTransactionStore() throws IOException {
219        return transactionStore;
220    }
221
222    public long getLastMessageBrokerSequenceId() throws IOException {
223        return longTermPersistence.getLastMessageBrokerSequenceId();
224    }
225
226    public void beginTransaction(ConnectionContext context) throws IOException {
227        longTermPersistence.beginTransaction(context);
228    }
229
230    public void commitTransaction(ConnectionContext context) throws IOException {
231        longTermPersistence.commitTransaction(context);
232    }
233
234    public void rollbackTransaction(ConnectionContext context) throws IOException {
235        longTermPersistence.rollbackTransaction(context);
236    }
237
238    public synchronized void start() throws Exception {
239        if (!started.compareAndSet(false, true)) {
240            return;
241        }
242
243        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
244            public boolean iterate() {
245                return doCheckpoint();
246            }
247        }, "ActiveMQ Journal Checkpoint Worker");
248
249        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
250            public Thread newThread(Runnable runable) {
251                Thread t = new Thread(runable, "Journal checkpoint worker");
252                t.setPriority(7);
253                return t;
254            }
255        });
256        // checkpointExecutor.allowCoreThreadTimeOut(true);
257
258        this.usageManager.getMemoryUsage().addUsageListener(this);
259
260        if (longTermPersistence instanceof JDBCPersistenceAdapter) {
261            // Disabled periodic clean up as it deadlocks with the checkpoint
262            // operations.
263            ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
264        }
265
266        longTermPersistence.start();
267        createTransactionStore();
268        recover();
269
270        // Do a checkpoint periodically.
271        this.scheduler = new Scheduler("Journal Scheduler");
272        this.scheduler.start();
273        this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
274
275    }
276
277    public void stop() throws Exception {
278
279        this.usageManager.getMemoryUsage().removeUsageListener(this);
280        if (!started.compareAndSet(true, false)) {
281            return;
282        }
283
284        this.scheduler.cancel(periodicCheckpointTask);
285        this.scheduler.stop();
286
287        // Take one final checkpoint and stop checkpoint processing.
288        checkpoint(true, true);
289        checkpointTask.shutdown();
290        checkpointExecutor.shutdown();
291
292        queues.clear();
293        topics.clear();
294
295        IOException firstException = null;
296        try {
297            journal.close();
298        } catch (Exception e) {
299            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
300        }
301        longTermPersistence.stop();
302
303        if (firstException != null) {
304            throw firstException;
305        }
306    }
307
308    // Properties
309    // -------------------------------------------------------------------------
310    public PersistenceAdapter getLongTermPersistence() {
311        return longTermPersistence;
312    }
313
314    /**
315     * @return Returns the wireFormat.
316     */
317    public WireFormat getWireFormat() {
318        return wireFormat;
319    }
320
321    // Implementation methods
322    // -------------------------------------------------------------------------
323
324    /**
325     * The Journal give us a call back so that we can move old data out of the
326     * journal. Taking a checkpoint does this for us.
327     * 
328     * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
329     */
330    public void overflowNotification(RecordLocation safeLocation) {
331        checkpoint(false, true);
332    }
333
334    /**
335     * When we checkpoint we move all the journalled data to long term storage.
336     * 
337     */
338    public void checkpoint(boolean sync, boolean fullCheckpoint) {
339        try {
340            if (journal == null) {
341                throw new IllegalStateException("Journal is closed.");
342            }
343
344            long now = System.currentTimeMillis();
345            CountDownLatch latch = null;
346            synchronized (this) {
347                latch = nextCheckpointCountDownLatch;
348                lastCheckpointRequest = now;
349                if (fullCheckpoint) {
350                    this.fullCheckPoint = true;
351                }
352            }
353
354            checkpointTask.wakeup();
355
356            if (sync) {
357                LOG.debug("Waking for checkpoint to complete.");
358                latch.await();
359            }
360        } catch (InterruptedException e) {
361            Thread.currentThread().interrupt();
362            LOG.warn("Request to start checkpoint failed: " + e, e);
363        }
364    }
365
366    public void checkpoint(boolean sync) {
367        checkpoint(sync, sync);
368    }
369
370    /**
371     * This does the actual checkpoint.
372     * 
373     * @return
374     */
375    public boolean doCheckpoint() {
376        CountDownLatch latch = null;
377        boolean fullCheckpoint;
378        synchronized (this) {
379            latch = nextCheckpointCountDownLatch;
380            nextCheckpointCountDownLatch = new CountDownLatch(1);
381            fullCheckpoint = this.fullCheckPoint;
382            this.fullCheckPoint = false;
383        }
384        try {
385
386            LOG.debug("Checkpoint started.");
387            RecordLocation newMark = null;
388
389            ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
390
391            //
392            // We do many partial checkpoints (fullCheckpoint==false) to move
393            // topic messages
394            // to long term store as soon as possible.
395            // 
396            // We want to avoid doing that for queue messages since removes the
397            // come in the same
398            // checkpoint cycle will nullify the previous message add.
399            // Therefore, we only
400            // checkpoint queues on the fullCheckpoint cycles.
401            //
402            if (fullCheckpoint) {
403                Iterator<JournalMessageStore> iterator = queues.values().iterator();
404                while (iterator.hasNext()) {
405                    try {
406                        final JournalMessageStore ms = iterator.next();
407                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
408                            public RecordLocation call() throws Exception {
409                                return ms.checkpoint();
410                            }
411                        });
412                        futureTasks.add(task);
413                        checkpointExecutor.execute(task);
414                    } catch (Exception e) {
415                        LOG.error("Failed to checkpoint a message store: " + e, e);
416                    }
417                }
418            }
419
420            Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
421            while (iterator.hasNext()) {
422                try {
423                    final JournalTopicMessageStore ms = iterator.next();
424                    FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
425                        public RecordLocation call() throws Exception {
426                            return ms.checkpoint();
427                        }
428                    });
429                    futureTasks.add(task);
430                    checkpointExecutor.execute(task);
431                } catch (Exception e) {
432                    LOG.error("Failed to checkpoint a message store: " + e, e);
433                }
434            }
435
436            try {
437                for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
438                    FutureTask<RecordLocation> ft = iter.next();
439                    RecordLocation mark = ft.get();
440                    // We only set a newMark on full checkpoints.
441                    if (fullCheckpoint) {
442                        if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
443                            newMark = mark;
444                        }
445                    }
446                }
447            } catch (Throwable e) {
448                LOG.error("Failed to checkpoint a message store: " + e, e);
449            }
450
451            if (fullCheckpoint) {
452                try {
453                    if (newMark != null) {
454                        LOG.debug("Marking journal at: " + newMark);
455                        journal.setMark(newMark, true);
456                    }
457                } catch (Exception e) {
458                    LOG.error("Failed to mark the Journal: " + e, e);
459                }
460
461                if (longTermPersistence instanceof JDBCPersistenceAdapter) {
462                    // We may be check pointing more often than the
463                    // checkpointInterval if under high use
464                    // But we don't want to clean up the db that often.
465                    long now = System.currentTimeMillis();
466                    if (now > lastCleanup + checkpointInterval) {
467                        lastCleanup = now;
468                        ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
469                    }
470                }
471            }
472
473            LOG.debug("Checkpoint done.");
474        } finally {
475            latch.countDown();
476        }
477        synchronized (this) {
478            return this.fullCheckPoint;
479        }
480
481    }
482
483    /**
484     * @param location
485     * @return
486     * @throws IOException
487     */
488    public DataStructure readCommand(RecordLocation location) throws IOException {
489        try {
490            Packet packet = journal.read(location);
491            return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
492        } catch (InvalidRecordLocationException e) {
493            throw createReadException(location, e);
494        } catch (IOException e) {
495            throw createReadException(location, e);
496        }
497    }
498
499    /**
500     * Move all the messages that were in the journal into long term storage. We
501     * just replay and do a checkpoint.
502     * 
503     * @throws IOException
504     * @throws IOException
505     * @throws InvalidRecordLocationException
506     * @throws IllegalStateException
507     */
508    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
509
510        RecordLocation pos = null;
511        int transactionCounter = 0;
512
513        LOG.info("Journal Recovery Started from: " + journal);
514        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
515
516        // While we have records in the journal.
517        while ((pos = journal.getNextRecordLocation(pos)) != null) {
518            Packet data = journal.read(pos);
519            DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
520
521            if (c instanceof Message) {
522                Message message = (Message)c;
523                JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
524                if (message.isInTransaction()) {
525                    transactionStore.addMessage(store, message, pos);
526                } else {
527                    store.replayAddMessage(context, message);
528                    transactionCounter++;
529                }
530            } else {
531                switch (c.getDataStructureType()) {
532                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
533                    JournalQueueAck command = (JournalQueueAck)c;
534                    JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
535                    if (command.getMessageAck().isInTransaction()) {
536                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
537                    } else {
538                        store.replayRemoveMessage(context, command.getMessageAck());
539                        transactionCounter++;
540                    }
541                }
542                    break;
543                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
544                    JournalTopicAck command = (JournalTopicAck)c;
545                    JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
546                    if (command.getTransactionId() != null) {
547                        transactionStore.acknowledge(store, command, pos);
548                    } else {
549                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
550                        transactionCounter++;
551                    }
552                }
553                    break;
554                case JournalTransaction.DATA_STRUCTURE_TYPE: {
555                    JournalTransaction command = (JournalTransaction)c;
556                    try {
557                        // Try to replay the packet.
558                        switch (command.getType()) {
559                        case JournalTransaction.XA_PREPARE:
560                            transactionStore.replayPrepare(command.getTransactionId());
561                            break;
562                        case JournalTransaction.XA_COMMIT:
563                        case JournalTransaction.LOCAL_COMMIT:
564                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
565                            if (tx == null) {
566                                break; // We may be trying to replay a commit
567                            }
568                            // that
569                            // was already committed.
570
571                            // Replay the committed operations.
572                            tx.getOperations();
573                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
574                                TxOperation op = (TxOperation)iter.next();
575                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
576                                    op.store.replayAddMessage(context, (Message)op.data);
577                                }
578                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
579                                    op.store.replayRemoveMessage(context, (MessageAck)op.data);
580                                }
581                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
582                                    JournalTopicAck ack = (JournalTopicAck)op.data;
583                                    ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
584                                }
585                            }
586                            transactionCounter++;
587                            break;
588                        case JournalTransaction.LOCAL_ROLLBACK:
589                        case JournalTransaction.XA_ROLLBACK:
590                            transactionStore.replayRollback(command.getTransactionId());
591                            break;
592                        default:
593                            throw new IOException("Invalid journal command type: " + command.getType());
594                        }
595                    } catch (IOException e) {
596                        LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
597                    }
598                }
599                    break;
600                case JournalTrace.DATA_STRUCTURE_TYPE:
601                    JournalTrace trace = (JournalTrace)c;
602                    LOG.debug("TRACE Entry: " + trace.getMessage());
603                    break;
604                default:
605                    LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
606                }
607            }
608        }
609
610        RecordLocation location = writeTraceMessage("RECOVERED", true);
611        journal.setMark(location, true);
612
613        LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
614    }
615
616    private IOException createReadException(RecordLocation location, Exception e) {
617        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
618    }
619
620    protected IOException createWriteException(DataStructure packet, Exception e) {
621        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
622    }
623
624    protected IOException createWriteException(String command, Exception e) {
625        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
626    }
627
628    protected IOException createRecoveryFailedException(Exception e) {
629        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
630    }
631
632    /**
633     * @param command
634     * @param sync
635     * @return
636     * @throws IOException
637     */
638    public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
639        if (started.get()) {
640            try {
641                    return journal.write(toPacket(wireFormat.marshal(command)), sync);
642            } catch (IOException ioe) {
643                    LOG.error("Cannot write to the journal", ioe);
644                    brokerService.handleIOException(ioe);
645                    throw ioe;
646            }
647        }
648        throw new IOException("closed");
649    }
650
651    private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
652        JournalTrace trace = new JournalTrace();
653        trace.setMessage(message);
654        return writeCommand(trace, sync);
655    }
656
657    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
658        newPercentUsage = (newPercentUsage / 10) * 10;
659        oldPercentUsage = (oldPercentUsage / 10) * 10;
660        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
661            boolean sync = newPercentUsage >= 90;
662            checkpoint(sync, true);
663        }
664    }
665
666    public JournalTransactionStore getTransactionStore() {
667        return transactionStore;
668    }
669
670    public void deleteAllMessages() throws IOException {
671        try {
672            JournalTrace trace = new JournalTrace();
673            trace.setMessage("DELETED");
674            RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
675            journal.setMark(location, true);
676            LOG.info("Journal deleted: ");
677        } catch (IOException e) {
678            throw e;
679        } catch (Throwable e) {
680            throw IOExceptionSupport.create(e);
681        }
682        longTermPersistence.deleteAllMessages();
683    }
684
685    public SystemUsage getUsageManager() {
686        return usageManager;
687    }
688
689    public int getMaxCheckpointMessageAddSize() {
690        return maxCheckpointMessageAddSize;
691    }
692
693    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
694        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
695    }
696
697    public int getMaxCheckpointWorkers() {
698        return maxCheckpointWorkers;
699    }
700
701    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
702        this.maxCheckpointWorkers = maxCheckpointWorkers;
703    }
704
705    public boolean isUseExternalMessageReferences() {
706        return false;
707    }
708
709    public void setUseExternalMessageReferences(boolean enable) {
710        if (enable) {
711            throw new IllegalArgumentException("The journal does not support message references.");
712        }
713    }
714
715    public Packet toPacket(ByteSequence sequence) {
716        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
717    }
718
719    public ByteSequence toByteSequence(Packet packet) {
720        org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
721        return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
722    }
723
724    public void setBrokerName(String brokerName) {
725        longTermPersistence.setBrokerName(brokerName);
726    }
727
728    @Override
729    public String toString() {
730        return "JournalPersistenceAdapator(" + longTermPersistence + ")";
731    }
732
733    public void setDirectory(File dir) {
734        this.directory=dir;
735    }
736    
737    public File getDirectory(){
738        return directory;
739    }
740    
741    public long size(){
742        return 0;
743    }
744
745    public void setBrokerService(BrokerService brokerService) {
746        this.brokerService = brokerService;
747        PersistenceAdapter pa = getLongTermPersistence();
748        if( pa instanceof BrokerServiceAware ) {
749            ((BrokerServiceAware)pa).setBrokerService(brokerService);
750        }
751    }
752
753    public long getLastProducerSequenceId(ProducerId id) {
754        return -1;
755    }
756
757}