001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.sql.Connection; 022 import java.sql.SQLException; 023 import java.util.Collections; 024 import java.util.Set; 025 import java.util.concurrent.ScheduledFuture; 026 import java.util.concurrent.ScheduledThreadPoolExecutor; 027 import java.util.concurrent.ThreadFactory; 028 import java.util.concurrent.TimeUnit; 029 030 import javax.sql.DataSource; 031 032 import org.apache.activemq.ActiveMQMessageAudit; 033 import org.apache.activemq.broker.BrokerService; 034 import org.apache.activemq.broker.BrokerServiceAware; 035 import org.apache.activemq.broker.ConnectionContext; 036 import org.apache.activemq.command.ActiveMQDestination; 037 import org.apache.activemq.command.ActiveMQQueue; 038 import org.apache.activemq.command.ActiveMQTopic; 039 import org.apache.activemq.command.Message; 040 import org.apache.activemq.command.MessageId; 041 import org.apache.activemq.command.ProducerId; 042 import org.apache.activemq.openwire.OpenWireFormat; 043 import org.apache.activemq.store.MessageStore; 044 import org.apache.activemq.store.PersistenceAdapter; 045 import org.apache.activemq.store.TopicMessageStore; 046 import org.apache.activemq.store.TransactionStore; 047 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 048 import org.apache.activemq.store.memory.MemoryTransactionStore; 049 import org.apache.activemq.usage.SystemUsage; 050 import org.apache.activemq.util.ByteSequence; 051 import org.apache.activemq.util.FactoryFinder; 052 import org.apache.activemq.util.IOExceptionSupport; 053 import org.apache.activemq.util.LongSequenceGenerator; 054 import org.apache.activemq.wireformat.WireFormat; 055 import org.slf4j.Logger; 056 import org.slf4j.LoggerFactory; 057 058 /** 059 * A {@link PersistenceAdapter} implementation using JDBC for persistence 060 * storage. 061 * 062 * This persistence adapter will correctly remember prepared XA transactions, 063 * but it will not keep track of local transaction commits so that operations 064 * performed against the Message store are done as a single uow. 065 * 066 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter" 067 * 068 * 069 */ 070 public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, 071 BrokerServiceAware { 072 073 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class); 074 private static FactoryFinder adapterFactoryFinder = new FactoryFinder( 075 "META-INF/services/org/apache/activemq/store/jdbc/"); 076 private static FactoryFinder lockFactoryFinder = new FactoryFinder( 077 "META-INF/services/org/apache/activemq/store/jdbc/lock/"); 078 079 private WireFormat wireFormat = new OpenWireFormat(); 080 private BrokerService brokerService; 081 private Statements statements; 082 private JDBCAdapter adapter; 083 private MemoryTransactionStore transactionStore; 084 private ScheduledThreadPoolExecutor clockDaemon; 085 private ScheduledFuture<?> cleanupTicket, keepAliveTicket; 086 private int cleanupPeriod = 1000 * 60 * 5; 087 private boolean useExternalMessageReferences; 088 private boolean useDatabaseLock = true; 089 private long lockKeepAlivePeriod = 1000*30; 090 private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; 091 private DatabaseLocker databaseLocker; 092 private boolean createTablesOnStartup = true; 093 private DataSource lockDataSource; 094 private int transactionIsolation; 095 private File directory; 096 097 protected int maxProducersToAudit=1024; 098 protected int maxAuditDepth=1000; 099 protected boolean enableAudit=false; 100 protected int auditRecoveryDepth = 1024; 101 protected ActiveMQMessageAudit audit; 102 103 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 104 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; 105 106 public JDBCPersistenceAdapter() { 107 } 108 109 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 110 super(ds); 111 this.wireFormat = wireFormat; 112 } 113 114 public Set<ActiveMQDestination> getDestinations() { 115 TransactionContext c = null; 116 try { 117 c = getTransactionContext(); 118 return getAdapter().doGetDestinations(c); 119 } catch (IOException e) { 120 return emptyDestinationSet(); 121 } catch (SQLException e) { 122 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 123 return emptyDestinationSet(); 124 } finally { 125 if (c != null) { 126 try { 127 c.close(); 128 } catch (Throwable e) { 129 } 130 } 131 } 132 } 133 134 @SuppressWarnings("unchecked") 135 private Set<ActiveMQDestination> emptyDestinationSet() { 136 return Collections.EMPTY_SET; 137 } 138 139 protected void createMessageAudit() { 140 if (enableAudit && audit == null) { 141 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 142 TransactionContext c = null; 143 144 try { 145 c = getTransactionContext(); 146 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 147 public void messageId(MessageId id) { 148 audit.isDuplicate(id); 149 } 150 }); 151 } catch (Exception e) { 152 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 153 } finally { 154 if (c != null) { 155 try { 156 c.close(); 157 } catch (Throwable e) { 158 } 159 } 160 } 161 } 162 } 163 164 public void initSequenceIdGenerator() { 165 TransactionContext c = null; 166 try { 167 c = getTransactionContext(); 168 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 169 public void messageId(MessageId id) { 170 audit.isDuplicate(id); 171 } 172 }); 173 } catch (Exception e) { 174 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 175 } finally { 176 if (c != null) { 177 try { 178 c.close(); 179 } catch (Throwable e) { 180 } 181 } 182 } 183 184 } 185 186 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 187 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); 188 if (transactionStore != null) { 189 rc = transactionStore.proxy(rc); 190 } 191 return rc; 192 } 193 194 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 195 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit); 196 if (transactionStore != null) { 197 rc = transactionStore.proxy(rc); 198 } 199 return rc; 200 } 201 202 /** 203 * Cleanup method to remove any state associated with the given destination 204 * @param destination Destination to forget 205 */ 206 public void removeQueueMessageStore(ActiveMQQueue destination) { 207 if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) { 208 try { 209 removeConsumerDestination(destination); 210 } catch (IOException ioe) { 211 LOG.error("Failed to remove consumer destination: " + destination, ioe); 212 } 213 } 214 } 215 216 private void removeConsumerDestination(ActiveMQQueue destination) throws IOException { 217 TransactionContext c = getTransactionContext(); 218 try { 219 String id = destination.getQualifiedName(); 220 getAdapter().doDeleteSubscription(c, destination, id, id); 221 } catch (SQLException e) { 222 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 223 throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e); 224 } finally { 225 c.close(); 226 } 227 } 228 229 /** 230 * Cleanup method to remove any state associated with the given destination 231 * No state retained.... nothing to do 232 * 233 * @param destination Destination to forget 234 */ 235 public void removeTopicMessageStore(ActiveMQTopic destination) { 236 } 237 238 public TransactionStore createTransactionStore() throws IOException { 239 if (transactionStore == null) { 240 transactionStore = new MemoryTransactionStore(this); 241 } 242 return this.transactionStore; 243 } 244 245 public long getLastMessageBrokerSequenceId() throws IOException { 246 TransactionContext c = getTransactionContext(); 247 try { 248 long seq = getAdapter().doGetLastMessageStoreSequenceId(c); 249 sequenceGenerator.setLastSequenceId(seq); 250 long brokerSeq = 0; 251 if (seq != 0) { 252 byte[] msg = getAdapter().doGetMessageById(c, seq); 253 if (msg != null) { 254 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg)); 255 brokerSeq = last.getMessageId().getBrokerSequenceId(); 256 } else { 257 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!"); 258 } 259 } 260 return brokerSeq; 261 } catch (SQLException e) { 262 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 263 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 264 } finally { 265 c.close(); 266 } 267 } 268 269 public long getLastProducerSequenceId(ProducerId id) throws IOException { 270 TransactionContext c = getTransactionContext(); 271 try { 272 return getAdapter().doGetLastProducerSequenceId(c, id); 273 } catch (SQLException e) { 274 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 275 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 276 } finally { 277 c.close(); 278 } 279 } 280 281 282 public void start() throws Exception { 283 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 284 285 if (isCreateTablesOnStartup()) { 286 TransactionContext transactionContext = getTransactionContext(); 287 transactionContext.begin(); 288 try { 289 try { 290 getAdapter().doCreateTables(transactionContext); 291 } catch (SQLException e) { 292 LOG.warn("Cannot create tables due to: " + e); 293 JDBCPersistenceAdapter.log("Failure Details: ", e); 294 } 295 } finally { 296 transactionContext.commit(); 297 } 298 } 299 300 if (isUseDatabaseLock()) { 301 DatabaseLocker service = getDatabaseLocker(); 302 if (service == null) { 303 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter"); 304 } else { 305 service.start(); 306 if (lockKeepAlivePeriod > 0) { 307 keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() { 308 public void run() { 309 databaseLockKeepAlive(); 310 } 311 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS); 312 } 313 if (brokerService != null) { 314 brokerService.getBroker().nowMasterBroker(); 315 } 316 } 317 } 318 319 // Cleanup the db periodically. 320 if (cleanupPeriod > 0) { 321 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { 322 public void run() { 323 cleanup(); 324 } 325 }, 0, cleanupPeriod, TimeUnit.MILLISECONDS); 326 } 327 328 createMessageAudit(); 329 } 330 331 public synchronized void stop() throws Exception { 332 if (cleanupTicket != null) { 333 cleanupTicket.cancel(true); 334 cleanupTicket = null; 335 } 336 if (keepAliveTicket != null) { 337 keepAliveTicket.cancel(false); 338 keepAliveTicket = null; 339 } 340 341 // do not shutdown clockDaemon as it may kill the thread initiating shutdown 342 DatabaseLocker service = getDatabaseLocker(); 343 if (service != null) { 344 service.stop(); 345 } 346 } 347 348 public void cleanup() { 349 TransactionContext c = null; 350 try { 351 LOG.debug("Cleaning up old messages."); 352 c = getTransactionContext(); 353 getAdapter().doDeleteOldMessages(c); 354 } catch (IOException e) { 355 LOG.warn("Old message cleanup failed due to: " + e, e); 356 } catch (SQLException e) { 357 LOG.warn("Old message cleanup failed due to: " + e); 358 JDBCPersistenceAdapter.log("Failure Details: ", e); 359 } finally { 360 if (c != null) { 361 try { 362 c.close(); 363 } catch (Throwable e) { 364 } 365 } 366 LOG.debug("Cleanup done."); 367 } 368 } 369 370 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) { 371 this.clockDaemon = clockDaemon; 372 } 373 374 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 375 if (clockDaemon == null) { 376 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 377 public Thread newThread(Runnable runnable) { 378 Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer"); 379 thread.setDaemon(true); 380 return thread; 381 } 382 }); 383 } 384 return clockDaemon; 385 } 386 387 public JDBCAdapter getAdapter() throws IOException { 388 if (adapter == null) { 389 setAdapter(createAdapter()); 390 } 391 return adapter; 392 } 393 394 public DatabaseLocker getDatabaseLocker() throws IOException { 395 if (databaseLocker == null && isUseDatabaseLock()) { 396 setDatabaseLocker(loadDataBaseLocker()); 397 } 398 return databaseLocker; 399 } 400 401 /** 402 * Sets the database locker strategy to use to lock the database on startup 403 * @throws IOException 404 */ 405 public void setDatabaseLocker(DatabaseLocker locker) throws IOException { 406 databaseLocker = locker; 407 databaseLocker.setPersistenceAdapter(this); 408 databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval()); 409 } 410 411 public DataSource getLockDataSource() throws IOException { 412 if (lockDataSource == null) { 413 lockDataSource = getDataSource(); 414 if (lockDataSource == null) { 415 throw new IllegalArgumentException( 416 "No dataSource property has been configured"); 417 } 418 } else { 419 LOG.info("Using a separate dataSource for locking: " 420 + lockDataSource); 421 } 422 return lockDataSource; 423 } 424 425 public void setLockDataSource(DataSource dataSource) { 426 this.lockDataSource = dataSource; 427 } 428 429 public BrokerService getBrokerService() { 430 return brokerService; 431 } 432 433 public void setBrokerService(BrokerService brokerService) { 434 this.brokerService = brokerService; 435 } 436 437 /** 438 * @throws IOException 439 */ 440 protected JDBCAdapter createAdapter() throws IOException { 441 442 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter"); 443 444 // Use the default JDBC adapter if the 445 // Database type is not recognized. 446 if (adapter == null) { 447 adapter = new DefaultJDBCAdapter(); 448 LOG.debug("Using default JDBC Adapter: " + adapter); 449 } 450 return adapter; 451 } 452 453 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException { 454 Object adapter = null; 455 TransactionContext c = getTransactionContext(); 456 try { 457 try { 458 // Make the filename file system safe. 459 String dirverName = c.getConnection().getMetaData().getDriverName(); 460 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(); 461 462 try { 463 adapter = finder.newInstance(dirverName); 464 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass()); 465 } catch (Throwable e) { 466 LOG.info("Database " + kind + " driver override not found for : [" + dirverName 467 + "]. Will use default implementation."); 468 } 469 } catch (SQLException e) { 470 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: " 471 + e.getMessage()); 472 JDBCPersistenceAdapter.log("Failure Details: ", e); 473 } 474 } finally { 475 c.close(); 476 } 477 return adapter; 478 } 479 480 public void setAdapter(JDBCAdapter adapter) { 481 this.adapter = adapter; 482 this.adapter.setStatements(getStatements()); 483 this.adapter.setMaxRows(getMaxRows()); 484 } 485 486 public WireFormat getWireFormat() { 487 return wireFormat; 488 } 489 490 public void setWireFormat(WireFormat wireFormat) { 491 this.wireFormat = wireFormat; 492 } 493 494 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 495 if (context == null) { 496 return getTransactionContext(); 497 } else { 498 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext(); 499 if (answer == null) { 500 answer = getTransactionContext(); 501 context.setLongTermStoreContext(answer); 502 } 503 return answer; 504 } 505 } 506 507 public TransactionContext getTransactionContext() throws IOException { 508 TransactionContext answer = new TransactionContext(this); 509 if (transactionIsolation > 0) { 510 answer.setTransactionIsolation(transactionIsolation); 511 } 512 return answer; 513 } 514 515 public void beginTransaction(ConnectionContext context) throws IOException { 516 TransactionContext transactionContext = getTransactionContext(context); 517 transactionContext.begin(); 518 } 519 520 public void commitTransaction(ConnectionContext context) throws IOException { 521 TransactionContext transactionContext = getTransactionContext(context); 522 transactionContext.commit(); 523 } 524 525 public void rollbackTransaction(ConnectionContext context) throws IOException { 526 TransactionContext transactionContext = getTransactionContext(context); 527 transactionContext.rollback(); 528 } 529 530 public int getCleanupPeriod() { 531 return cleanupPeriod; 532 } 533 534 /** 535 * Sets the number of milliseconds until the database is attempted to be 536 * cleaned up for durable topics 537 */ 538 public void setCleanupPeriod(int cleanupPeriod) { 539 this.cleanupPeriod = cleanupPeriod; 540 } 541 542 public void deleteAllMessages() throws IOException { 543 TransactionContext c = getTransactionContext(); 544 try { 545 getAdapter().doDropTables(c); 546 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 547 getAdapter().doCreateTables(c); 548 LOG.info("Persistence store purged."); 549 } catch (SQLException e) { 550 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 551 throw IOExceptionSupport.create(e); 552 } finally { 553 c.close(); 554 } 555 } 556 557 public boolean isUseExternalMessageReferences() { 558 return useExternalMessageReferences; 559 } 560 561 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 562 this.useExternalMessageReferences = useExternalMessageReferences; 563 } 564 565 public boolean isCreateTablesOnStartup() { 566 return createTablesOnStartup; 567 } 568 569 /** 570 * Sets whether or not tables are created on startup 571 */ 572 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 573 this.createTablesOnStartup = createTablesOnStartup; 574 } 575 576 public boolean isUseDatabaseLock() { 577 return useDatabaseLock; 578 } 579 580 /** 581 * Sets whether or not an exclusive database lock should be used to enable 582 * JDBC Master/Slave. Enabled by default. 583 */ 584 public void setUseDatabaseLock(boolean useDatabaseLock) { 585 this.useDatabaseLock = useDatabaseLock; 586 } 587 588 public static void log(String msg, SQLException e) { 589 String s = msg + e.getMessage(); 590 while (e.getNextException() != null) { 591 e = e.getNextException(); 592 s += ", due to: " + e.getMessage(); 593 } 594 LOG.warn(s, e); 595 } 596 597 public Statements getStatements() { 598 if (statements == null) { 599 statements = new Statements(); 600 } 601 return statements; 602 } 603 604 public void setStatements(Statements statements) { 605 this.statements = statements; 606 } 607 608 /** 609 * @param usageManager The UsageManager that is controlling the 610 * destination's memory usage. 611 */ 612 public void setUsageManager(SystemUsage usageManager) { 613 } 614 615 protected void databaseLockKeepAlive() { 616 boolean stop = false; 617 try { 618 DatabaseLocker locker = getDatabaseLocker(); 619 if (locker != null) { 620 if (!locker.keepAlive()) { 621 stop = true; 622 } 623 } 624 } catch (IOException e) { 625 LOG.error("Failed to get database when trying keepalive: " + e, e); 626 } 627 if (stop) { 628 stopBroker(); 629 } 630 } 631 632 protected void stopBroker() { 633 // we can no longer keep the lock so lets fail 634 LOG.info("No longer able to keep the exclusive lock so giving up being a master"); 635 try { 636 brokerService.stop(); 637 } catch (Exception e) { 638 LOG.warn("Failure occurred while stopping broker"); 639 } 640 } 641 642 protected DatabaseLocker loadDataBaseLocker() throws IOException { 643 DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock"); 644 if (locker == null) { 645 locker = new DefaultDatabaseLocker(); 646 LOG.debug("Using default JDBC Locker: " + locker); 647 } 648 return locker; 649 } 650 651 public void setBrokerName(String brokerName) { 652 } 653 654 public String toString() { 655 return "JDBCPersistenceAdapter(" + super.toString() + ")"; 656 } 657 658 public void setDirectory(File dir) { 659 this.directory=dir; 660 } 661 662 public File getDirectory(){ 663 if (this.directory==null && brokerService != null){ 664 this.directory=brokerService.getBrokerDataDirectory(); 665 } 666 return this.directory; 667 } 668 669 // interesting bit here is proof that DB is ok 670 public void checkpoint(boolean sync) throws IOException { 671 // by pass TransactionContext to avoid IO Exception handler 672 Connection connection = null; 673 try { 674 connection = getDataSource().getConnection(); 675 } catch (SQLException e) { 676 LOG.debug("Could not get JDBC connection for checkpoint: " + e); 677 throw IOExceptionSupport.create(e); 678 } finally { 679 if (connection != null) { 680 try { 681 connection.close(); 682 } catch (Throwable ignored) { 683 } 684 } 685 } 686 } 687 688 public long size(){ 689 return 0; 690 } 691 692 public long getLockKeepAlivePeriod() { 693 return lockKeepAlivePeriod; 694 } 695 696 public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) { 697 this.lockKeepAlivePeriod = lockKeepAlivePeriod; 698 } 699 700 public long getLockAcquireSleepInterval() { 701 return lockAcquireSleepInterval; 702 } 703 704 /** 705 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker 706 * not applied if DataBaseLocker is injected. 707 */ 708 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { 709 this.lockAcquireSleepInterval = lockAcquireSleepInterval; 710 } 711 712 /** 713 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED 714 * This allowable dirty isolation level may not be achievable in clustered DB environments 715 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ 716 * see isolation level constants in {@link java.sql.Connection} 717 * @param transactionIsolation the isolation level to use 718 */ 719 public void setTransactionIsolation(int transactionIsolation) { 720 this.transactionIsolation = transactionIsolation; 721 } 722 723 public int getMaxProducersToAudit() { 724 return maxProducersToAudit; 725 } 726 727 public void setMaxProducersToAudit(int maxProducersToAudit) { 728 this.maxProducersToAudit = maxProducersToAudit; 729 } 730 731 public int getMaxAuditDepth() { 732 return maxAuditDepth; 733 } 734 735 public void setMaxAuditDepth(int maxAuditDepth) { 736 this.maxAuditDepth = maxAuditDepth; 737 } 738 739 public boolean isEnableAudit() { 740 return enableAudit; 741 } 742 743 public void setEnableAudit(boolean enableAudit) { 744 this.enableAudit = enableAudit; 745 } 746 747 public int getAuditRecoveryDepth() { 748 return auditRecoveryDepth; 749 } 750 751 public void setAuditRecoveryDepth(int auditRecoveryDepth) { 752 this.auditRecoveryDepth = auditRecoveryDepth; 753 } 754 755 public long getNextSequenceId() { 756 synchronized(sequenceGenerator) { 757 return sequenceGenerator.getNextSequenceId(); 758 } 759 } 760 761 public int getMaxRows() { 762 return maxRows; 763 } 764 765 /* 766 * the max rows return from queries, with sparse selectors this may need to be increased 767 */ 768 public void setMaxRows(int maxRows) { 769 this.maxRows = maxRows; 770 } 771 }