001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Set; 024import java.util.concurrent.Callable; 025 026import javax.management.ObjectName; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.LockableServiceSupport; 031import org.apache.activemq.broker.Locker; 032import org.apache.activemq.broker.jmx.AnnotatedMBean; 033import org.apache.activemq.broker.jmx.PersistenceAdapterView; 034import org.apache.activemq.broker.scheduler.JobSchedulerStore; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQQueue; 037import org.apache.activemq.command.ActiveMQTopic; 038import org.apache.activemq.command.LocalTransactionId; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.protobuf.Buffer; 043import org.apache.activemq.store.JournaledStore; 044import org.apache.activemq.store.MessageStore; 045import org.apache.activemq.store.NoLocalSubscriptionAware; 046import org.apache.activemq.store.PersistenceAdapter; 047import org.apache.activemq.store.SharedFileLocker; 048import org.apache.activemq.store.TopicMessageStore; 049import org.apache.activemq.store.TransactionIdTransformer; 050import org.apache.activemq.store.TransactionIdTransformerAware; 051import org.apache.activemq.store.TransactionStore; 052import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 053import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 054import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 055import org.apache.activemq.usage.SystemUsage; 056import org.apache.activemq.util.ServiceStopper; 057 058/** 059 * An implementation of {@link PersistenceAdapter} designed for use with 060 * KahaDB - Embedded Lightweight Non-Relational Database 061 * 062 * @org.apache.xbean.XBean element="kahaDB" 063 * 064 */ 065public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, 066 JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware { 067 068 private final KahaDBStore letter = new KahaDBStore(); 069 070 /** 071 * @param context 072 * @throws IOException 073 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 074 */ 075 @Override 076 public void beginTransaction(ConnectionContext context) throws IOException { 077 this.letter.beginTransaction(context); 078 } 079 080 /** 081 * @param sync 082 * @throws IOException 083 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 084 */ 085 @Override 086 public void checkpoint(boolean sync) throws IOException { 087 this.letter.checkpoint(sync); 088 } 089 090 /** 091 * @param context 092 * @throws IOException 093 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 094 */ 095 @Override 096 public void commitTransaction(ConnectionContext context) throws IOException { 097 this.letter.commitTransaction(context); 098 } 099 100 /** 101 * @param destination 102 * @return MessageStore 103 * @throws IOException 104 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 105 */ 106 @Override 107 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 108 return this.letter.createQueueMessageStore(destination); 109 } 110 111 /** 112 * @param destination 113 * @return TopicMessageStore 114 * @throws IOException 115 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 116 */ 117 @Override 118 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 119 return this.letter.createTopicMessageStore(destination); 120 } 121 122 /** 123 * @return TransactionStore 124 * @throws IOException 125 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 126 */ 127 @Override 128 public TransactionStore createTransactionStore() throws IOException { 129 return this.letter.createTransactionStore(); 130 } 131 132 /** 133 * @throws IOException 134 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 135 */ 136 @Override 137 public void deleteAllMessages() throws IOException { 138 this.letter.deleteAllMessages(); 139 } 140 141 /** 142 * @return destinations 143 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 144 */ 145 @Override 146 public Set<ActiveMQDestination> getDestinations() { 147 return this.letter.getDestinations(); 148 } 149 150 /** 151 * @return lastMessageBrokerSequenceId 152 * @throws IOException 153 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 154 */ 155 @Override 156 public long getLastMessageBrokerSequenceId() throws IOException { 157 return this.letter.getLastMessageBrokerSequenceId(); 158 } 159 160 @Override 161 public long getLastProducerSequenceId(ProducerId id) throws IOException { 162 return this.letter.getLastProducerSequenceId(id); 163 } 164 165 /** 166 * @param destination 167 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 168 */ 169 @Override 170 public void removeQueueMessageStore(ActiveMQQueue destination) { 171 this.letter.removeQueueMessageStore(destination); 172 } 173 174 /** 175 * @param destination 176 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 177 */ 178 @Override 179 public void removeTopicMessageStore(ActiveMQTopic destination) { 180 this.letter.removeTopicMessageStore(destination); 181 } 182 183 /** 184 * @param context 185 * @throws IOException 186 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 187 */ 188 @Override 189 public void rollbackTransaction(ConnectionContext context) throws IOException { 190 this.letter.rollbackTransaction(context); 191 } 192 193 /** 194 * @param brokerName 195 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 196 */ 197 @Override 198 public void setBrokerName(String brokerName) { 199 this.letter.setBrokerName(brokerName); 200 } 201 202 /** 203 * @param usageManager 204 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 205 */ 206 @Override 207 public void setUsageManager(SystemUsage usageManager) { 208 this.letter.setUsageManager(usageManager); 209 } 210 211 /** 212 * @return the size of the store 213 * @see org.apache.activemq.store.PersistenceAdapter#size() 214 */ 215 @Override 216 public long size() { 217 return this.letter.size(); 218 } 219 220 /** 221 * @throws Exception 222 * @see org.apache.activemq.Service#start() 223 */ 224 @Override 225 public void doStart() throws Exception { 226 this.letter.start(); 227 228 if (brokerService != null && brokerService.isUseJmx()) { 229 PersistenceAdapterView view = new PersistenceAdapterView(this); 230 view.setInflightTransactionViewCallable(new Callable<String>() { 231 @Override 232 public String call() throws Exception { 233 return letter.getTransactions(); 234 } 235 }); 236 view.setDataViewCallable(new Callable<String>() { 237 @Override 238 public String call() throws Exception { 239 return letter.getJournal().getFileMap().keySet().toString(); 240 } 241 }); 242 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, 243 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); 244 } 245 } 246 247 /** 248 * @throws Exception 249 * @see org.apache.activemq.Service#stop() 250 */ 251 @Override 252 public void doStop(ServiceStopper stopper) throws Exception { 253 this.letter.stop(); 254 255 if (brokerService != null && brokerService.isUseJmx()) { 256 ObjectName brokerObjectName = brokerService.getBrokerObjectName(); 257 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString())); 258 } 259 } 260 261 /** 262 * Get the journalMaxFileLength 263 * 264 * @return the journalMaxFileLength 265 */ 266 @Override 267 public int getJournalMaxFileLength() { 268 return this.letter.getJournalMaxFileLength(); 269 } 270 271 /** 272 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 273 * be used 274 * 275 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 276 */ 277 public void setJournalMaxFileLength(int journalMaxFileLength) { 278 this.letter.setJournalMaxFileLength(journalMaxFileLength); 279 } 280 281 /** 282 * Set the max number of producers (LRU cache) to track for duplicate sends 283 */ 284 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 285 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 286 } 287 288 public int getMaxFailoverProducersToTrack() { 289 return this.letter.getMaxFailoverProducersToTrack(); 290 } 291 292 /** 293 * set the audit window depth for duplicate suppression (should exceed the max transaction 294 * batch) 295 */ 296 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 297 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 298 } 299 300 public int getFailoverProducersAuditDepth() { 301 return this.letter.getFailoverProducersAuditDepth(); 302 } 303 304 /** 305 * Get the checkpointInterval 306 * 307 * @return the checkpointInterval 308 */ 309 public long getCheckpointInterval() { 310 return this.letter.getCheckpointInterval(); 311 } 312 313 /** 314 * Set the checkpointInterval 315 * 316 * @param checkpointInterval 317 * the checkpointInterval to set 318 */ 319 public void setCheckpointInterval(long checkpointInterval) { 320 this.letter.setCheckpointInterval(checkpointInterval); 321 } 322 323 /** 324 * Get the cleanupInterval 325 * 326 * @return the cleanupInterval 327 */ 328 public long getCleanupInterval() { 329 return this.letter.getCleanupInterval(); 330 } 331 332 /** 333 * Set the cleanupInterval 334 * 335 * @param cleanupInterval 336 * the cleanupInterval to set 337 */ 338 public void setCleanupInterval(long cleanupInterval) { 339 this.letter.setCleanupInterval(cleanupInterval); 340 } 341 342 /** 343 * Get the indexWriteBatchSize 344 * 345 * @return the indexWriteBatchSize 346 */ 347 public int getIndexWriteBatchSize() { 348 return this.letter.getIndexWriteBatchSize(); 349 } 350 351 /** 352 * Set the indexWriteBatchSize 353 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 354 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 355 * @param indexWriteBatchSize 356 * the indexWriteBatchSize to set 357 */ 358 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 359 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 360 } 361 362 /** 363 * Get the journalMaxWriteBatchSize 364 * 365 * @return the journalMaxWriteBatchSize 366 */ 367 public int getJournalMaxWriteBatchSize() { 368 return this.letter.getJournalMaxWriteBatchSize(); 369 } 370 371 /** 372 * Set the journalMaxWriteBatchSize 373 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 374 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 375 * @param journalMaxWriteBatchSize 376 * the journalMaxWriteBatchSize to set 377 */ 378 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 379 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 380 } 381 382 /** 383 * Get the enableIndexWriteAsync 384 * 385 * @return the enableIndexWriteAsync 386 */ 387 public boolean isEnableIndexWriteAsync() { 388 return this.letter.isEnableIndexWriteAsync(); 389 } 390 391 /** 392 * Set the enableIndexWriteAsync 393 * 394 * @param enableIndexWriteAsync 395 * the enableIndexWriteAsync to set 396 */ 397 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 398 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 399 } 400 401 /** 402 * Get the directory 403 * 404 * @return the directory 405 */ 406 @Override 407 public File getDirectory() { 408 return this.letter.getDirectory(); 409 } 410 411 /** 412 * @param dir 413 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 414 */ 415 @Override 416 public void setDirectory(File dir) { 417 this.letter.setDirectory(dir); 418 } 419 420 /** 421 * @return the currently configured location of the KahaDB index files. 422 */ 423 public File getIndexDirectory() { 424 return this.letter.getIndexDirectory(); 425 } 426 427 /** 428 * Sets the directory where KahaDB index files should be written. 429 * 430 * @param indexDirectory 431 * the directory where the KahaDB store index files should be written. 432 */ 433 public void setIndexDirectory(File indexDirectory) { 434 this.letter.setIndexDirectory(indexDirectory); 435 } 436 437 /** 438 * Get the enableJournalDiskSyncs 439 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 440 * @return the enableJournalDiskSyncs 441 */ 442 public boolean isEnableJournalDiskSyncs() { 443 return this.letter.isEnableJournalDiskSyncs(); 444 } 445 446 /** 447 * Set the enableJournalDiskSyncs 448 * 449 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 450 * @param enableJournalDiskSyncs 451 * the enableJournalDiskSyncs to set 452 */ 453 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 454 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 455 } 456 457 /** 458 * @return 459 */ 460 public String getJournalDiskSyncStrategy() { 461 return letter.getJournalDiskSyncStrategy(); 462 } 463 464 /** 465 * @param journalDiskSyncStrategy 466 */ 467 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 468 letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 469 } 470 471 /** 472 * @return 473 */ 474 public long getJournalDiskSyncInterval() { 475 return letter.getJournalDiskSyncInterval(); 476 } 477 478 /** 479 * @param journalDiskSyncInterval 480 */ 481 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 482 letter.setJournalDiskSyncInterval(journalDiskSyncInterval); 483 } 484 485 /** 486 * Get the indexCacheSize 487 * 488 * @return the indexCacheSize 489 */ 490 public int getIndexCacheSize() { 491 return this.letter.getIndexCacheSize(); 492 } 493 494 /** 495 * Set the indexCacheSize 496 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 497 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 498 * @param indexCacheSize 499 * the indexCacheSize to set 500 */ 501 public void setIndexCacheSize(int indexCacheSize) { 502 this.letter.setIndexCacheSize(indexCacheSize); 503 } 504 505 /** 506 * Get the ignoreMissingJournalfiles 507 * 508 * @return the ignoreMissingJournalfiles 509 */ 510 public boolean isIgnoreMissingJournalfiles() { 511 return this.letter.isIgnoreMissingJournalfiles(); 512 } 513 514 /** 515 * Set the ignoreMissingJournalfiles 516 * 517 * @param ignoreMissingJournalfiles 518 * the ignoreMissingJournalfiles to set 519 */ 520 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 521 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 522 } 523 524 public boolean isChecksumJournalFiles() { 525 return letter.isChecksumJournalFiles(); 526 } 527 528 public boolean isCheckForCorruptJournalFiles() { 529 return letter.isCheckForCorruptJournalFiles(); 530 } 531 532 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 533 letter.setChecksumJournalFiles(checksumJournalFiles); 534 } 535 536 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 537 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 538 } 539 540 @Override 541 public void setBrokerService(BrokerService brokerService) { 542 super.setBrokerService(brokerService); 543 letter.setBrokerService(brokerService); 544 } 545 546 public String getPreallocationScope() { 547 return letter.getPreallocationScope(); 548 } 549 550 public void setPreallocationScope(String preallocationScope) { 551 this.letter.setPreallocationScope(preallocationScope); 552 } 553 554 public String getPreallocationStrategy() { 555 return letter.getPreallocationStrategy(); 556 } 557 558 public void setPreallocationStrategy(String preallocationStrategy) { 559 this.letter.setPreallocationStrategy(preallocationStrategy); 560 } 561 562 public boolean isArchiveDataLogs() { 563 return letter.isArchiveDataLogs(); 564 } 565 566 public void setArchiveDataLogs(boolean archiveDataLogs) { 567 letter.setArchiveDataLogs(archiveDataLogs); 568 } 569 570 public File getDirectoryArchive() { 571 return letter.getDirectoryArchive(); 572 } 573 574 public void setDirectoryArchive(File directoryArchive) { 575 letter.setDirectoryArchive(directoryArchive); 576 } 577 578 public boolean isConcurrentStoreAndDispatchQueues() { 579 return letter.isConcurrentStoreAndDispatchQueues(); 580 } 581 582 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 583 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 584 } 585 586 public boolean isConcurrentStoreAndDispatchTopics() { 587 return letter.isConcurrentStoreAndDispatchTopics(); 588 } 589 590 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 591 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 592 } 593 594 public int getMaxAsyncJobs() { 595 return letter.getMaxAsyncJobs(); 596 } 597 /** 598 * @param maxAsyncJobs 599 * the maxAsyncJobs to set 600 */ 601 public void setMaxAsyncJobs(int maxAsyncJobs) { 602 letter.setMaxAsyncJobs(maxAsyncJobs); 603 } 604 605 /** 606 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 607 * 608 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 609 */ 610 @Deprecated 611 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { 612 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); 613 } 614 615 public boolean getForceRecoverIndex() { 616 return letter.getForceRecoverIndex(); 617 } 618 619 public void setForceRecoverIndex(boolean forceRecoverIndex) { 620 letter.setForceRecoverIndex(forceRecoverIndex); 621 } 622 623 public boolean isArchiveCorruptedIndex() { 624 return letter.isArchiveCorruptedIndex(); 625 } 626 627 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 628 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 629 } 630 631 public float getIndexLFUEvictionFactor() { 632 return letter.getIndexLFUEvictionFactor(); 633 } 634 635 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 636 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 637 } 638 639 public boolean isUseIndexLFRUEviction() { 640 return letter.isUseIndexLFRUEviction(); 641 } 642 643 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 644 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 645 } 646 647 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 648 letter.setEnableIndexDiskSyncs(diskSyncs); 649 } 650 651 public boolean isEnableIndexDiskSyncs() { 652 return letter.isEnableIndexDiskSyncs(); 653 } 654 655 public void setEnableIndexRecoveryFile(boolean enable) { 656 letter.setEnableIndexRecoveryFile(enable); 657 } 658 659 public boolean isEnableIndexRecoveryFile() { 660 return letter.isEnableIndexRecoveryFile(); 661 } 662 663 public void setEnableIndexPageCaching(boolean enable) { 664 letter.setEnableIndexPageCaching(enable); 665 } 666 667 public boolean isEnableIndexPageCaching() { 668 return letter.isEnableIndexPageCaching(); 669 } 670 671 public int getCompactAcksAfterNoGC() { 672 return letter.getCompactAcksAfterNoGC(); 673 } 674 675 /** 676 * Sets the number of GC cycles where no journal logs were removed before an attempt to 677 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 678 * <p> 679 * A value of -1 will disable this feature. 680 * 681 * @param compactAcksAfterNoGC 682 * Number of empty GC cycles before we rewrite old ACKS. 683 */ 684 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 685 this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC); 686 } 687 688 public boolean isCompactAcksIgnoresStoreGrowth() { 689 return this.letter.isCompactAcksIgnoresStoreGrowth(); 690 } 691 692 /** 693 * Configure if Ack compaction will occur regardless of continued growth of the 694 * journal logs meaning that the store has not run out of space yet. Because the 695 * compaction operation can be costly this value is defaulted to off and the Ack 696 * compaction is only done when it seems that the store cannot grow and larger. 697 * 698 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 699 */ 700 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 701 this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); 702 } 703 704 /** 705 * Returns whether Ack compaction is enabled 706 * 707 * @return enableAckCompaction 708 */ 709 public boolean isEnableAckCompaction() { 710 return letter.isEnableAckCompaction(); 711 } 712 713 /** 714 * Configure if the Ack compaction task should be enabled to run 715 * 716 * @param enableAckCompaction 717 */ 718 public void setEnableAckCompaction(boolean enableAckCompaction) { 719 letter.setEnableAckCompaction(enableAckCompaction); 720 } 721 722 /** 723 * Whether non-blocking subscription statistics have been enabled 724 * 725 * @return 726 */ 727 public boolean isEnableSubscriptionStatistics() { 728 return letter.isEnableSubscriptionStatistics(); 729 } 730 731 /** 732 * Enable caching statistics for each subscription to allow non-blocking 733 * retrieval of metrics. This could incur some overhead to compute if there are a lot 734 * of subscriptions. 735 * 736 * @param enableSubscriptionStatistics 737 */ 738 public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { 739 letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics); 740 } 741 742 public KahaDBStore getStore() { 743 return letter; 744 } 745 746 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 747 if (txid == null) { 748 return null; 749 } 750 KahaTransactionInfo rc = new KahaTransactionInfo(); 751 752 if (txid.isLocalTransaction()) { 753 LocalTransactionId t = (LocalTransactionId) txid; 754 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 755 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 756 kahaTxId.setTransactionId(t.getValue()); 757 rc.setLocalTransactionId(kahaTxId); 758 } else { 759 XATransactionId t = (XATransactionId) txid; 760 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 761 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 762 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 763 kahaTxId.setFormatId(t.getFormatId()); 764 rc.setXaTransactionId(kahaTxId); 765 } 766 return rc; 767 } 768 769 @Override 770 public Locker createDefaultLocker() throws IOException { 771 SharedFileLocker locker = new SharedFileLocker(); 772 locker.configure(this); 773 return locker; 774 } 775 776 @Override 777 public void init() throws Exception {} 778 779 @Override 780 public String toString() { 781 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 782 return "KahaDBPersistenceAdapter[" + path + "]"; 783 } 784 785 @Override 786 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 787 getStore().setTransactionIdTransformer(transactionIdTransformer); 788 } 789 790 @Override 791 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 792 return this.letter.createJobSchedulerStore(); 793 } 794 795 /* (non-Javadoc) 796 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 797 */ 798 @Override 799 public boolean isPersistNoLocal() { 800 return this.letter.isPersistNoLocal(); 801 } 802}