001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.journal; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.FutureTask; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import org.apache.activeio.journal.InvalidRecordLocationException; 035import org.apache.activeio.journal.Journal; 036import org.apache.activeio.journal.JournalEventListener; 037import org.apache.activeio.journal.RecordLocation; 038import org.apache.activeio.packet.ByteArrayPacket; 039import org.apache.activeio.packet.Packet; 040import org.apache.activemq.broker.BrokerService; 041import org.apache.activemq.broker.BrokerServiceAware; 042import org.apache.activemq.broker.ConnectionContext; 043import org.apache.activemq.command.ActiveMQDestination; 044import org.apache.activemq.command.ActiveMQQueue; 045import org.apache.activemq.command.ActiveMQTopic; 046import org.apache.activemq.command.DataStructure; 047import org.apache.activemq.command.JournalQueueAck; 048import org.apache.activemq.command.JournalTopicAck; 049import org.apache.activemq.command.JournalTrace; 050import org.apache.activemq.command.JournalTransaction; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.ProducerId; 054import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 055import org.apache.activemq.openwire.OpenWireFormat; 056import org.apache.activemq.store.MessageStore; 057import org.apache.activemq.store.PersistenceAdapter; 058import org.apache.activemq.store.TopicMessageStore; 059import org.apache.activemq.store.TransactionStore; 060import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 061import org.apache.activemq.store.journal.JournalTransactionStore.Tx; 062import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation; 063import org.apache.activemq.thread.Scheduler; 064import org.apache.activemq.thread.Task; 065import org.apache.activemq.thread.TaskRunner; 066import org.apache.activemq.thread.TaskRunnerFactory; 067import org.apache.activemq.usage.SystemUsage; 068import org.apache.activemq.usage.Usage; 069import org.apache.activemq.usage.UsageListener; 070import org.apache.activemq.util.ByteSequence; 071import org.apache.activemq.util.IOExceptionSupport; 072import org.apache.activemq.wireformat.WireFormat; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076/** 077 * An implementation of {@link PersistenceAdapter} designed for use with a 078 * {@link Journal} and then check pointing asynchronously on a timeout with some 079 * other long term persistent storage. 080 * 081 * @org.apache.xbean.XBean 082 * 083 */ 084public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { 085 086 private BrokerService brokerService; 087 088 protected Scheduler scheduler; 089 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class); 090 091 private Journal journal; 092 private PersistenceAdapter longTermPersistence; 093 094 private final WireFormat wireFormat = new OpenWireFormat(); 095 096 private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>(); 097 private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>(); 098 099 private SystemUsage usageManager; 100 private final long checkpointInterval = 1000 * 60 * 5; 101 private long lastCheckpointRequest = System.currentTimeMillis(); 102 private long lastCleanup = System.currentTimeMillis(); 103 private int maxCheckpointWorkers = 10; 104 private int maxCheckpointMessageAddSize = 1024 * 1024; 105 106 private final JournalTransactionStore transactionStore = new JournalTransactionStore(this); 107 private ThreadPoolExecutor checkpointExecutor; 108 109 private TaskRunner checkpointTask; 110 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 111 private boolean fullCheckPoint; 112 113 private final AtomicBoolean started = new AtomicBoolean(false); 114 115 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 116 117 private TaskRunnerFactory taskRunnerFactory; 118 private File directory; 119 120 public JournalPersistenceAdapter() { 121 } 122 123 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { 124 setJournal(journal); 125 setTaskRunnerFactory(taskRunnerFactory); 126 setPersistenceAdapter(longTermPersistence); 127 } 128 129 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 130 this.taskRunnerFactory = taskRunnerFactory; 131 } 132 133 public void setJournal(Journal journal) { 134 this.journal = journal; 135 journal.setJournalEventListener(this); 136 } 137 138 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { 139 this.longTermPersistence = longTermPersistence; 140 } 141 142 final Runnable createPeriodicCheckpointTask() { 143 return new Runnable() { 144 public void run() { 145 long lastTime = 0; 146 synchronized (this) { 147 lastTime = lastCheckpointRequest; 148 } 149 if (System.currentTimeMillis() > lastTime + checkpointInterval) { 150 checkpoint(false, true); 151 } 152 } 153 }; 154 } 155 156 /** 157 * @param usageManager The UsageManager that is controlling the 158 * destination's memory usage. 159 */ 160 public void setUsageManager(SystemUsage usageManager) { 161 this.usageManager = usageManager; 162 longTermPersistence.setUsageManager(usageManager); 163 } 164 165 public Set<ActiveMQDestination> getDestinations() { 166 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations()); 167 destinations.addAll(queues.keySet()); 168 destinations.addAll(topics.keySet()); 169 return destinations; 170 } 171 172 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 173 if (destination.isQueue()) { 174 return createQueueMessageStore((ActiveMQQueue)destination); 175 } else { 176 return createTopicMessageStore((ActiveMQTopic)destination); 177 } 178 } 179 180 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 181 JournalMessageStore store = queues.get(destination); 182 if (store == null) { 183 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); 184 store = new JournalMessageStore(this, checkpointStore, destination); 185 queues.put(destination, store); 186 } 187 return store; 188 } 189 190 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 191 JournalTopicMessageStore store = topics.get(destinationName); 192 if (store == null) { 193 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 194 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 195 topics.put(destinationName, store); 196 } 197 return store; 198 } 199 200 /** 201 * Cleanup method to remove any state associated with the given destination 202 * 203 * @param destination Destination to forget 204 */ 205 public void removeQueueMessageStore(ActiveMQQueue destination) { 206 queues.remove(destination); 207 } 208 209 /** 210 * Cleanup method to remove any state associated with the given destination 211 * 212 * @param destination Destination to forget 213 */ 214 public void removeTopicMessageStore(ActiveMQTopic destination) { 215 topics.remove(destination); 216 } 217 218 public TransactionStore createTransactionStore() throws IOException { 219 return transactionStore; 220 } 221 222 public long getLastMessageBrokerSequenceId() throws IOException { 223 return longTermPersistence.getLastMessageBrokerSequenceId(); 224 } 225 226 public void beginTransaction(ConnectionContext context) throws IOException { 227 longTermPersistence.beginTransaction(context); 228 } 229 230 public void commitTransaction(ConnectionContext context) throws IOException { 231 longTermPersistence.commitTransaction(context); 232 } 233 234 public void rollbackTransaction(ConnectionContext context) throws IOException { 235 longTermPersistence.rollbackTransaction(context); 236 } 237 238 public synchronized void start() throws Exception { 239 if (!started.compareAndSet(false, true)) { 240 return; 241 } 242 243 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 244 public boolean iterate() { 245 return doCheckpoint(); 246 } 247 }, "ActiveMQ Journal Checkpoint Worker"); 248 249 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 250 public Thread newThread(Runnable runable) { 251 Thread t = new Thread(runable, "Journal checkpoint worker"); 252 t.setPriority(7); 253 return t; 254 } 255 }); 256 // checkpointExecutor.allowCoreThreadTimeOut(true); 257 258 this.usageManager.getMemoryUsage().addUsageListener(this); 259 260 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 261 // Disabled periodic clean up as it deadlocks with the checkpoint 262 // operations. 263 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); 264 } 265 266 longTermPersistence.start(); 267 createTransactionStore(); 268 recover(); 269 270 // Do a checkpoint periodically. 271 this.scheduler = new Scheduler("Journal Scheduler"); 272 this.scheduler.start(); 273 this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); 274 275 } 276 277 public void stop() throws Exception { 278 279 this.usageManager.getMemoryUsage().removeUsageListener(this); 280 if (!started.compareAndSet(true, false)) { 281 return; 282 } 283 284 this.scheduler.cancel(periodicCheckpointTask); 285 this.scheduler.stop(); 286 287 // Take one final checkpoint and stop checkpoint processing. 288 checkpoint(true, true); 289 checkpointTask.shutdown(); 290 checkpointExecutor.shutdown(); 291 292 queues.clear(); 293 topics.clear(); 294 295 IOException firstException = null; 296 try { 297 journal.close(); 298 } catch (Exception e) { 299 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 300 } 301 longTermPersistence.stop(); 302 303 if (firstException != null) { 304 throw firstException; 305 } 306 } 307 308 // Properties 309 // ------------------------------------------------------------------------- 310 public PersistenceAdapter getLongTermPersistence() { 311 return longTermPersistence; 312 } 313 314 /** 315 * @return Returns the wireFormat. 316 */ 317 public WireFormat getWireFormat() { 318 return wireFormat; 319 } 320 321 // Implementation methods 322 // ------------------------------------------------------------------------- 323 324 /** 325 * The Journal give us a call back so that we can move old data out of the 326 * journal. Taking a checkpoint does this for us. 327 * 328 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) 329 */ 330 public void overflowNotification(RecordLocation safeLocation) { 331 checkpoint(false, true); 332 } 333 334 /** 335 * When we checkpoint we move all the journalled data to long term storage. 336 * 337 */ 338 public void checkpoint(boolean sync, boolean fullCheckpoint) { 339 try { 340 if (journal == null) { 341 throw new IllegalStateException("Journal is closed."); 342 } 343 344 long now = System.currentTimeMillis(); 345 CountDownLatch latch = null; 346 synchronized (this) { 347 latch = nextCheckpointCountDownLatch; 348 lastCheckpointRequest = now; 349 if (fullCheckpoint) { 350 this.fullCheckPoint = true; 351 } 352 } 353 354 checkpointTask.wakeup(); 355 356 if (sync) { 357 LOG.debug("Waking for checkpoint to complete."); 358 latch.await(); 359 } 360 } catch (InterruptedException e) { 361 Thread.currentThread().interrupt(); 362 LOG.warn("Request to start checkpoint failed: " + e, e); 363 } 364 } 365 366 public void checkpoint(boolean sync) { 367 checkpoint(sync, sync); 368 } 369 370 /** 371 * This does the actual checkpoint. 372 * 373 * @return 374 */ 375 public boolean doCheckpoint() { 376 CountDownLatch latch = null; 377 boolean fullCheckpoint; 378 synchronized (this) { 379 latch = nextCheckpointCountDownLatch; 380 nextCheckpointCountDownLatch = new CountDownLatch(1); 381 fullCheckpoint = this.fullCheckPoint; 382 this.fullCheckPoint = false; 383 } 384 try { 385 386 LOG.debug("Checkpoint started."); 387 RecordLocation newMark = null; 388 389 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size()); 390 391 // 392 // We do many partial checkpoints (fullCheckpoint==false) to move 393 // topic messages 394 // to long term store as soon as possible. 395 // 396 // We want to avoid doing that for queue messages since removes the 397 // come in the same 398 // checkpoint cycle will nullify the previous message add. 399 // Therefore, we only 400 // checkpoint queues on the fullCheckpoint cycles. 401 // 402 if (fullCheckpoint) { 403 Iterator<JournalMessageStore> iterator = queues.values().iterator(); 404 while (iterator.hasNext()) { 405 try { 406 final JournalMessageStore ms = iterator.next(); 407 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 408 public RecordLocation call() throws Exception { 409 return ms.checkpoint(); 410 } 411 }); 412 futureTasks.add(task); 413 checkpointExecutor.execute(task); 414 } catch (Exception e) { 415 LOG.error("Failed to checkpoint a message store: " + e, e); 416 } 417 } 418 } 419 420 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator(); 421 while (iterator.hasNext()) { 422 try { 423 final JournalTopicMessageStore ms = iterator.next(); 424 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 425 public RecordLocation call() throws Exception { 426 return ms.checkpoint(); 427 } 428 }); 429 futureTasks.add(task); 430 checkpointExecutor.execute(task); 431 } catch (Exception e) { 432 LOG.error("Failed to checkpoint a message store: " + e, e); 433 } 434 } 435 436 try { 437 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) { 438 FutureTask<RecordLocation> ft = iter.next(); 439 RecordLocation mark = ft.get(); 440 // We only set a newMark on full checkpoints. 441 if (fullCheckpoint) { 442 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 443 newMark = mark; 444 } 445 } 446 } 447 } catch (Throwable e) { 448 LOG.error("Failed to checkpoint a message store: " + e, e); 449 } 450 451 if (fullCheckpoint) { 452 try { 453 if (newMark != null) { 454 LOG.debug("Marking journal at: " + newMark); 455 journal.setMark(newMark, true); 456 } 457 } catch (Exception e) { 458 LOG.error("Failed to mark the Journal: " + e, e); 459 } 460 461 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 462 // We may be check pointing more often than the 463 // checkpointInterval if under high use 464 // But we don't want to clean up the db that often. 465 long now = System.currentTimeMillis(); 466 if (now > lastCleanup + checkpointInterval) { 467 lastCleanup = now; 468 ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); 469 } 470 } 471 } 472 473 LOG.debug("Checkpoint done."); 474 } finally { 475 latch.countDown(); 476 } 477 synchronized (this) { 478 return this.fullCheckPoint; 479 } 480 481 } 482 483 /** 484 * @param location 485 * @return 486 * @throws IOException 487 */ 488 public DataStructure readCommand(RecordLocation location) throws IOException { 489 try { 490 Packet packet = journal.read(location); 491 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet)); 492 } catch (InvalidRecordLocationException e) { 493 throw createReadException(location, e); 494 } catch (IOException e) { 495 throw createReadException(location, e); 496 } 497 } 498 499 /** 500 * Move all the messages that were in the journal into long term storage. We 501 * just replay and do a checkpoint. 502 * 503 * @throws IOException 504 * @throws IOException 505 * @throws InvalidRecordLocationException 506 * @throws IllegalStateException 507 */ 508 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { 509 510 RecordLocation pos = null; 511 int transactionCounter = 0; 512 513 LOG.info("Journal Recovery Started from: " + journal); 514 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 515 516 // While we have records in the journal. 517 while ((pos = journal.getNextRecordLocation(pos)) != null) { 518 Packet data = journal.read(pos); 519 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data)); 520 521 if (c instanceof Message) { 522 Message message = (Message)c; 523 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination()); 524 if (message.isInTransaction()) { 525 transactionStore.addMessage(store, message, pos); 526 } else { 527 store.replayAddMessage(context, message); 528 transactionCounter++; 529 } 530 } else { 531 switch (c.getDataStructureType()) { 532 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 533 JournalQueueAck command = (JournalQueueAck)c; 534 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination()); 535 if (command.getMessageAck().isInTransaction()) { 536 transactionStore.removeMessage(store, command.getMessageAck(), pos); 537 } else { 538 store.replayRemoveMessage(context, command.getMessageAck()); 539 transactionCounter++; 540 } 541 } 542 break; 543 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 544 JournalTopicAck command = (JournalTopicAck)c; 545 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination()); 546 if (command.getTransactionId() != null) { 547 transactionStore.acknowledge(store, command, pos); 548 } else { 549 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); 550 transactionCounter++; 551 } 552 } 553 break; 554 case JournalTransaction.DATA_STRUCTURE_TYPE: { 555 JournalTransaction command = (JournalTransaction)c; 556 try { 557 // Try to replay the packet. 558 switch (command.getType()) { 559 case JournalTransaction.XA_PREPARE: 560 transactionStore.replayPrepare(command.getTransactionId()); 561 break; 562 case JournalTransaction.XA_COMMIT: 563 case JournalTransaction.LOCAL_COMMIT: 564 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 565 if (tx == null) { 566 break; // We may be trying to replay a commit 567 } 568 // that 569 // was already committed. 570 571 // Replay the committed operations. 572 tx.getOperations(); 573 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 574 TxOperation op = (TxOperation)iter.next(); 575 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 576 op.store.replayAddMessage(context, (Message)op.data); 577 } 578 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 579 op.store.replayRemoveMessage(context, (MessageAck)op.data); 580 } 581 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { 582 JournalTopicAck ack = (JournalTopicAck)op.data; 583 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()); 584 } 585 } 586 transactionCounter++; 587 break; 588 case JournalTransaction.LOCAL_ROLLBACK: 589 case JournalTransaction.XA_ROLLBACK: 590 transactionStore.replayRollback(command.getTransactionId()); 591 break; 592 default: 593 throw new IOException("Invalid journal command type: " + command.getType()); 594 } 595 } catch (IOException e) { 596 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 597 } 598 } 599 break; 600 case JournalTrace.DATA_STRUCTURE_TYPE: 601 JournalTrace trace = (JournalTrace)c; 602 LOG.debug("TRACE Entry: " + trace.getMessage()); 603 break; 604 default: 605 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 606 } 607 } 608 } 609 610 RecordLocation location = writeTraceMessage("RECOVERED", true); 611 journal.setMark(location, true); 612 613 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 614 } 615 616 private IOException createReadException(RecordLocation location, Exception e) { 617 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 618 } 619 620 protected IOException createWriteException(DataStructure packet, Exception e) { 621 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 622 } 623 624 protected IOException createWriteException(String command, Exception e) { 625 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 626 } 627 628 protected IOException createRecoveryFailedException(Exception e) { 629 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 630 } 631 632 /** 633 * @param command 634 * @param sync 635 * @return 636 * @throws IOException 637 */ 638 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { 639 if (started.get()) { 640 try { 641 return journal.write(toPacket(wireFormat.marshal(command)), sync); 642 } catch (IOException ioe) { 643 LOG.error("Cannot write to the journal", ioe); 644 brokerService.handleIOException(ioe); 645 throw ioe; 646 } 647 } 648 throw new IOException("closed"); 649 } 650 651 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { 652 JournalTrace trace = new JournalTrace(); 653 trace.setMessage(message); 654 return writeCommand(trace, sync); 655 } 656 657 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 658 newPercentUsage = (newPercentUsage / 10) * 10; 659 oldPercentUsage = (oldPercentUsage / 10) * 10; 660 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 661 boolean sync = newPercentUsage >= 90; 662 checkpoint(sync, true); 663 } 664 } 665 666 public JournalTransactionStore getTransactionStore() { 667 return transactionStore; 668 } 669 670 public void deleteAllMessages() throws IOException { 671 try { 672 JournalTrace trace = new JournalTrace(); 673 trace.setMessage("DELETED"); 674 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); 675 journal.setMark(location, true); 676 LOG.info("Journal deleted: "); 677 } catch (IOException e) { 678 throw e; 679 } catch (Throwable e) { 680 throw IOExceptionSupport.create(e); 681 } 682 longTermPersistence.deleteAllMessages(); 683 } 684 685 public SystemUsage getUsageManager() { 686 return usageManager; 687 } 688 689 public int getMaxCheckpointMessageAddSize() { 690 return maxCheckpointMessageAddSize; 691 } 692 693 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 694 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 695 } 696 697 public int getMaxCheckpointWorkers() { 698 return maxCheckpointWorkers; 699 } 700 701 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { 702 this.maxCheckpointWorkers = maxCheckpointWorkers; 703 } 704 705 public boolean isUseExternalMessageReferences() { 706 return false; 707 } 708 709 public void setUseExternalMessageReferences(boolean enable) { 710 if (enable) { 711 throw new IllegalArgumentException("The journal does not support message references."); 712 } 713 } 714 715 public Packet toPacket(ByteSequence sequence) { 716 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); 717 } 718 719 public ByteSequence toByteSequence(Packet packet) { 720 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); 721 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); 722 } 723 724 public void setBrokerName(String brokerName) { 725 longTermPersistence.setBrokerName(brokerName); 726 } 727 728 @Override 729 public String toString() { 730 return "JournalPersistenceAdapator(" + longTermPersistence + ")"; 731 } 732 733 public void setDirectory(File dir) { 734 this.directory=dir; 735 } 736 737 public File getDirectory(){ 738 return directory; 739 } 740 741 public long size(){ 742 return 0; 743 } 744 745 public void setBrokerService(BrokerService brokerService) { 746 this.brokerService = brokerService; 747 PersistenceAdapter pa = getLongTermPersistence(); 748 if( pa instanceof BrokerServiceAware ) { 749 ((BrokerServiceAware)pa).setBrokerService(brokerService); 750 } 751 } 752 753 public long getLastProducerSequenceId(ProducerId id) { 754 return -1; 755 } 756 757}