001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.File;
020import java.io.IOException;
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.TimeUnit;
029
030import javax.sql.DataSource;
031
032import org.apache.activemq.ActiveMQMessageAudit;
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.command.ActiveMQDestination;
037import org.apache.activemq.command.ActiveMQQueue;
038import org.apache.activemq.command.ActiveMQTopic;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageId;
041import org.apache.activemq.command.ProducerId;
042import org.apache.activemq.openwire.OpenWireFormat;
043import org.apache.activemq.store.MessageStore;
044import org.apache.activemq.store.PersistenceAdapter;
045import org.apache.activemq.store.TopicMessageStore;
046import org.apache.activemq.store.TransactionStore;
047import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
048import org.apache.activemq.store.memory.MemoryTransactionStore;
049import org.apache.activemq.usage.SystemUsage;
050import org.apache.activemq.util.ByteSequence;
051import org.apache.activemq.util.FactoryFinder;
052import org.apache.activemq.util.IOExceptionSupport;
053import org.apache.activemq.util.LongSequenceGenerator;
054import org.apache.activemq.wireformat.WireFormat;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * A {@link PersistenceAdapter} implementation using JDBC for persistence
060 * storage.
061 * 
062 * This persistence adapter will correctly remember prepared XA transactions,
063 * but it will not keep track of local transaction commits so that operations
064 * performed against the Message store are done as a single uow.
065 * 
066 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
067 * 
068 * 
069 */
070public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
071    BrokerServiceAware {
072
073    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
074    private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
075                                                                   "META-INF/services/org/apache/activemq/store/jdbc/");
076    private static FactoryFinder lockFactoryFinder = new FactoryFinder(
077                                                                    "META-INF/services/org/apache/activemq/store/jdbc/lock/");
078
079    private WireFormat wireFormat = new OpenWireFormat();
080    private BrokerService brokerService;
081    private Statements statements;
082    private JDBCAdapter adapter;
083    private MemoryTransactionStore transactionStore;
084    private ScheduledThreadPoolExecutor clockDaemon;
085    private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
086    private int cleanupPeriod = 1000 * 60 * 5;
087    private boolean useExternalMessageReferences;
088    private boolean useDatabaseLock = true;
089    private long lockKeepAlivePeriod = 1000*30;
090    private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
091    private DatabaseLocker databaseLocker;
092    private boolean createTablesOnStartup = true;
093    private DataSource lockDataSource;
094    private int transactionIsolation;
095    private File directory;
096    
097    protected int maxProducersToAudit=1024;
098    protected int maxAuditDepth=1000;
099    protected boolean enableAudit=false;
100    protected int auditRecoveryDepth = 1024;
101    protected ActiveMQMessageAudit audit;
102    
103    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
104    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
105
106    public JDBCPersistenceAdapter() {
107    }
108
109    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
110        super(ds);
111        this.wireFormat = wireFormat;
112    }
113
114    public Set<ActiveMQDestination> getDestinations() {
115        TransactionContext c = null;
116        try {
117            c = getTransactionContext();
118            return getAdapter().doGetDestinations(c);
119        } catch (IOException e) {
120            return emptyDestinationSet();
121        } catch (SQLException e) {
122            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
123            return emptyDestinationSet();
124        } finally {
125            if (c != null) {
126                try {
127                    c.close();
128                } catch (Throwable e) {
129                }
130            }
131        }
132    }
133
134    @SuppressWarnings("unchecked")
135    private Set<ActiveMQDestination> emptyDestinationSet() {
136        return Collections.EMPTY_SET;
137    }
138    
139    protected void createMessageAudit() {
140        if (enableAudit && audit == null) {
141            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
142            TransactionContext c = null;
143            
144            try {
145                c = getTransactionContext();
146                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
147                    public void messageId(MessageId id) {
148                        audit.isDuplicate(id);
149                    }
150                });
151            } catch (Exception e) {
152                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
153            } finally {
154                if (c != null) {
155                    try {
156                        c.close();
157                    } catch (Throwable e) {
158                    }
159                }
160            }
161        }
162    }
163    
164    public void initSequenceIdGenerator() {
165        TransactionContext c = null;
166        try {
167            c = getTransactionContext();
168            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
169                public void messageId(MessageId id) {
170                    audit.isDuplicate(id);
171                }
172            });
173        } catch (Exception e) {
174            LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
175        } finally {
176            if (c != null) {
177                try {
178                    c.close();
179                } catch (Throwable e) {
180                }
181            }
182        }
183        
184    }
185
186    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
187        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
188        if (transactionStore != null) {
189            rc = transactionStore.proxy(rc);
190        }
191        return rc;
192    }
193
194    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
195        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
196        if (transactionStore != null) {
197            rc = transactionStore.proxy(rc);
198        }
199        return rc;
200    }
201
202    /**
203     * Cleanup method to remove any state associated with the given destination
204     * @param destination Destination to forget
205     */
206    public void removeQueueMessageStore(ActiveMQQueue destination) {
207        if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
208            try {
209                removeConsumerDestination(destination);
210            } catch (IOException ioe) {
211                LOG.error("Failed to remove consumer destination: " + destination, ioe);
212            }
213        }
214    }
215
216    private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
217        TransactionContext c = getTransactionContext();
218        try {
219            String id = destination.getQualifiedName();
220            getAdapter().doDeleteSubscription(c, destination, id, id);
221        } catch (SQLException e) {
222            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
223            throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
224        } finally {
225            c.close();
226        }
227    }
228
229    /**
230     * Cleanup method to remove any state associated with the given destination
231     * No state retained.... nothing to do
232     *
233     * @param destination Destination to forget
234     */
235    public void removeTopicMessageStore(ActiveMQTopic destination) {
236    }
237
238    public TransactionStore createTransactionStore() throws IOException {
239        if (transactionStore == null) {
240            transactionStore = new MemoryTransactionStore(this);
241        }
242        return this.transactionStore;
243    }
244
245    public long getLastMessageBrokerSequenceId() throws IOException {
246        TransactionContext c = getTransactionContext();
247        try {
248            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
249            sequenceGenerator.setLastSequenceId(seq);
250            long brokerSeq = 0;
251            if (seq != 0) {
252                byte[] msg = getAdapter().doGetMessageById(c, seq);
253                if (msg != null) {
254                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
255                    brokerSeq = last.getMessageId().getBrokerSequenceId();
256                } else {
257                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
258                }
259            }
260            return brokerSeq;
261        } catch (SQLException e) {
262            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
263            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
264        } finally {
265            c.close();
266        }
267    }
268    
269    public long getLastProducerSequenceId(ProducerId id) throws IOException {
270        TransactionContext c = getTransactionContext();
271        try {
272            return getAdapter().doGetLastProducerSequenceId(c, id);
273        } catch (SQLException e) {
274            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
275            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
276        } finally {
277            c.close();
278        }
279    }
280
281
282    public void start() throws Exception {
283        getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
284
285        if (isCreateTablesOnStartup()) {
286            TransactionContext transactionContext = getTransactionContext();
287            transactionContext.begin();
288            try {
289                try {
290                    getAdapter().doCreateTables(transactionContext);
291                } catch (SQLException e) {
292                    LOG.warn("Cannot create tables due to: " + e);
293                    JDBCPersistenceAdapter.log("Failure Details: ", e);
294                }
295            } finally {
296                transactionContext.commit();
297            }
298        }
299
300        if (isUseDatabaseLock()) {
301            DatabaseLocker service = getDatabaseLocker();
302            if (service == null) {
303                LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
304            } else {
305                service.start();
306                if (lockKeepAlivePeriod > 0) {
307                    keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
308                        public void run() {
309                            databaseLockKeepAlive();
310                        }
311                    }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
312                }
313                if (brokerService != null) {
314                    brokerService.getBroker().nowMasterBroker();
315                }
316            }
317        }
318
319        // Cleanup the db periodically.
320        if (cleanupPeriod > 0) {
321            cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
322                public void run() {
323                    cleanup();
324                }
325            }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
326        }
327        
328        createMessageAudit();
329    }
330
331    public synchronized void stop() throws Exception {
332        if (cleanupTicket != null) {
333            cleanupTicket.cancel(true);
334            cleanupTicket = null;
335        }
336        if (keepAliveTicket != null) {
337            keepAliveTicket.cancel(false);
338            keepAliveTicket = null;
339        }
340        
341        // do not shutdown clockDaemon as it may kill the thread initiating shutdown
342        DatabaseLocker service = getDatabaseLocker();
343        if (service != null) {
344            service.stop();
345        }
346    }
347
348    public void cleanup() {
349        TransactionContext c = null;
350        try {
351            LOG.debug("Cleaning up old messages.");
352            c = getTransactionContext();
353            getAdapter().doDeleteOldMessages(c);
354        } catch (IOException e) {
355            LOG.warn("Old message cleanup failed due to: " + e, e);
356        } catch (SQLException e) {
357            LOG.warn("Old message cleanup failed due to: " + e);
358            JDBCPersistenceAdapter.log("Failure Details: ", e);
359        } finally {
360            if (c != null) {
361                try {
362                    c.close();
363                } catch (Throwable e) {
364                }
365            }
366            LOG.debug("Cleanup done.");
367        }
368    }
369
370    public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
371        this.clockDaemon = clockDaemon;
372    }
373
374    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
375        if (clockDaemon == null) {
376            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
377                public Thread newThread(Runnable runnable) {
378                    Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
379                    thread.setDaemon(true);
380                    return thread;
381                }
382            });
383        }
384        return clockDaemon;
385    }
386
387    public JDBCAdapter getAdapter() throws IOException {
388        if (adapter == null) {
389            setAdapter(createAdapter());
390        }
391        return adapter;
392    }
393
394    public DatabaseLocker getDatabaseLocker() throws IOException {
395        if (databaseLocker == null && isUseDatabaseLock()) {
396            setDatabaseLocker(loadDataBaseLocker());
397        }
398        return databaseLocker;
399    }
400
401    /**
402     * Sets the database locker strategy to use to lock the database on startup
403     * @throws IOException 
404     */
405    public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
406        databaseLocker = locker;
407        databaseLocker.setPersistenceAdapter(this);
408        databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
409    }
410
411    public DataSource getLockDataSource() throws IOException {
412        if (lockDataSource == null) {
413            lockDataSource = getDataSource();
414            if (lockDataSource == null) {
415                throw new IllegalArgumentException(
416                        "No dataSource property has been configured");
417            }
418        } else {
419            LOG.info("Using a separate dataSource for locking: "
420                    + lockDataSource);
421        }
422        return lockDataSource;
423    }
424    
425    public void setLockDataSource(DataSource dataSource) {
426        this.lockDataSource = dataSource;
427    }
428
429    public BrokerService getBrokerService() {
430        return brokerService;
431    }
432
433    public void setBrokerService(BrokerService brokerService) {
434        this.brokerService = brokerService;
435    }
436
437    /**
438     * @throws IOException
439     */
440    protected JDBCAdapter createAdapter() throws IOException {
441       
442        adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
443       
444        // Use the default JDBC adapter if the
445        // Database type is not recognized.
446        if (adapter == null) {
447            adapter = new DefaultJDBCAdapter();
448            LOG.debug("Using default JDBC Adapter: " + adapter);
449        }
450        return adapter;
451    }
452
453    private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
454        Object adapter = null;
455        TransactionContext c = getTransactionContext();
456        try {
457            try {
458                // Make the filename file system safe.
459                String dirverName = c.getConnection().getMetaData().getDriverName();
460                dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
461
462                try {
463                    adapter = finder.newInstance(dirverName);
464                    LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
465                } catch (Throwable e) {
466                    LOG.info("Database " + kind + " driver override not found for : [" + dirverName
467                             + "].  Will use default implementation.");
468                }
469            } catch (SQLException e) {
470                LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
471                          + e.getMessage());
472                JDBCPersistenceAdapter.log("Failure Details: ", e);
473            }
474        } finally {
475            c.close();
476        }
477        return adapter;
478    }
479
480    public void setAdapter(JDBCAdapter adapter) {
481        this.adapter = adapter;
482        this.adapter.setStatements(getStatements());
483        this.adapter.setMaxRows(getMaxRows());
484    }
485
486    public WireFormat getWireFormat() {
487        return wireFormat;
488    }
489
490    public void setWireFormat(WireFormat wireFormat) {
491        this.wireFormat = wireFormat;
492    }
493
494    public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
495        if (context == null) {
496            return getTransactionContext();
497        } else {
498            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
499            if (answer == null) {
500                answer = getTransactionContext();
501                context.setLongTermStoreContext(answer);
502            }
503            return answer;
504        }
505    }
506
507    public TransactionContext getTransactionContext() throws IOException {
508        TransactionContext answer = new TransactionContext(this);
509        if (transactionIsolation > 0) {
510            answer.setTransactionIsolation(transactionIsolation);
511        }
512        return answer;
513    }
514
515    public void beginTransaction(ConnectionContext context) throws IOException {
516        TransactionContext transactionContext = getTransactionContext(context);
517        transactionContext.begin();
518    }
519
520    public void commitTransaction(ConnectionContext context) throws IOException {
521        TransactionContext transactionContext = getTransactionContext(context);
522        transactionContext.commit();
523    }
524
525    public void rollbackTransaction(ConnectionContext context) throws IOException {
526        TransactionContext transactionContext = getTransactionContext(context);
527        transactionContext.rollback();
528    }
529
530    public int getCleanupPeriod() {
531        return cleanupPeriod;
532    }
533
534    /**
535     * Sets the number of milliseconds until the database is attempted to be
536     * cleaned up for durable topics
537     */
538    public void setCleanupPeriod(int cleanupPeriod) {
539        this.cleanupPeriod = cleanupPeriod;
540    }
541
542    public void deleteAllMessages() throws IOException {
543        TransactionContext c = getTransactionContext();
544        try {
545            getAdapter().doDropTables(c);
546            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
547            getAdapter().doCreateTables(c);
548            LOG.info("Persistence store purged.");
549        } catch (SQLException e) {
550            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
551            throw IOExceptionSupport.create(e);
552        } finally {
553            c.close();
554        }
555    }
556
557    public boolean isUseExternalMessageReferences() {
558        return useExternalMessageReferences;
559    }
560
561    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
562        this.useExternalMessageReferences = useExternalMessageReferences;
563    }
564
565    public boolean isCreateTablesOnStartup() {
566        return createTablesOnStartup;
567    }
568
569    /**
570     * Sets whether or not tables are created on startup
571     */
572    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
573        this.createTablesOnStartup = createTablesOnStartup;
574    }
575
576    public boolean isUseDatabaseLock() {
577        return useDatabaseLock;
578    }
579
580    /**
581     * Sets whether or not an exclusive database lock should be used to enable
582     * JDBC Master/Slave. Enabled by default.
583     */
584    public void setUseDatabaseLock(boolean useDatabaseLock) {
585        this.useDatabaseLock = useDatabaseLock;
586    }
587
588    public static void log(String msg, SQLException e) {
589        String s = msg + e.getMessage();
590        while (e.getNextException() != null) {
591            e = e.getNextException();
592            s += ", due to: " + e.getMessage();
593        }
594        LOG.warn(s, e);
595    }
596
597    public Statements getStatements() {
598        if (statements == null) {
599            statements = new Statements();
600        }
601        return statements;
602    }
603
604    public void setStatements(Statements statements) {
605        this.statements = statements;
606    }
607
608    /**
609     * @param usageManager The UsageManager that is controlling the
610     *                destination's memory usage.
611     */
612    public void setUsageManager(SystemUsage usageManager) {
613    }
614
615    protected void databaseLockKeepAlive() {
616        boolean stop = false;
617        try {
618            DatabaseLocker locker = getDatabaseLocker();
619            if (locker != null) {
620                if (!locker.keepAlive()) {
621                    stop = true;
622                }
623            }
624        } catch (IOException e) {
625            LOG.error("Failed to get database when trying keepalive: " + e, e);
626        }
627        if (stop) {
628            stopBroker();
629        }
630    }
631
632    protected void stopBroker() {
633        // we can no longer keep the lock so lets fail
634        LOG.info("No longer able to keep the exclusive lock so giving up being a master");
635        try {
636            brokerService.stop();
637        } catch (Exception e) {
638            LOG.warn("Failure occurred while stopping broker");
639        }
640    }
641
642    protected DatabaseLocker loadDataBaseLocker() throws IOException {
643        DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");       
644        if (locker == null) {
645            locker = new DefaultDatabaseLocker();
646            LOG.debug("Using default JDBC Locker: " + locker);
647        }
648        return locker;
649    }
650
651    public void setBrokerName(String brokerName) {
652    }
653
654    public String toString() {
655        return "JDBCPersistenceAdapter(" + super.toString() + ")";
656    }
657
658    public void setDirectory(File dir) {
659        this.directory=dir;
660    }
661    
662    public File getDirectory(){
663        if (this.directory==null && brokerService != null){
664            this.directory=brokerService.getBrokerDataDirectory();
665        }
666        return this.directory;
667    }
668
669    // interesting bit here is proof that DB is ok
670    public void checkpoint(boolean sync) throws IOException {
671        // by pass TransactionContext to avoid IO Exception handler
672        Connection connection = null;
673        try {
674            connection = getDataSource().getConnection();
675        } catch (SQLException e) {
676            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
677            throw IOExceptionSupport.create(e);
678        } finally {
679            if (connection != null) {
680                try {
681                    connection.close();
682                } catch (Throwable ignored) {
683                }
684            }
685        }
686    }
687
688    public long size(){
689        return 0;
690    }
691
692    public long getLockKeepAlivePeriod() {
693        return lockKeepAlivePeriod;
694    }
695
696    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
697        this.lockKeepAlivePeriod = lockKeepAlivePeriod;
698    }
699
700    public long getLockAcquireSleepInterval() {
701        return lockAcquireSleepInterval;
702    }
703
704    /**
705     * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
706     * not applied if DataBaseLocker is injected.
707     */
708    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
709        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
710    }
711    
712    /**
713     * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
714     * This allowable dirty isolation level may not be achievable in clustered DB environments
715     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
716     * see isolation level constants in {@link java.sql.Connection}
717     * @param transactionIsolation the isolation level to use
718     */
719    public void setTransactionIsolation(int transactionIsolation) {
720        this.transactionIsolation = transactionIsolation;
721    }
722
723        public int getMaxProducersToAudit() {
724                return maxProducersToAudit;
725        }
726
727        public void setMaxProducersToAudit(int maxProducersToAudit) {
728                this.maxProducersToAudit = maxProducersToAudit;
729        }
730
731        public int getMaxAuditDepth() {
732                return maxAuditDepth;
733        }
734
735        public void setMaxAuditDepth(int maxAuditDepth) {
736                this.maxAuditDepth = maxAuditDepth;
737        }
738
739        public boolean isEnableAudit() {
740                return enableAudit;
741        }
742
743        public void setEnableAudit(boolean enableAudit) {
744                this.enableAudit = enableAudit;
745        }
746
747    public int getAuditRecoveryDepth() {
748        return auditRecoveryDepth;
749    }
750
751    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
752        this.auditRecoveryDepth = auditRecoveryDepth;
753    }
754
755    public long getNextSequenceId() {
756        synchronized(sequenceGenerator) {
757            return sequenceGenerator.getNextSequenceId();
758        }
759    }
760
761    public int getMaxRows() {
762        return maxRows;
763    }
764
765    /*
766     * the max rows return from queries, with sparse selectors this may need to be increased
767     */
768    public void setMaxRows(int maxRows) {
769        this.maxRows = maxRows;
770    }
771}