001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.sql.Connection;
022    import java.sql.SQLException;
023    import java.util.Collections;
024    import java.util.Set;
025    import java.util.concurrent.ScheduledFuture;
026    import java.util.concurrent.ScheduledThreadPoolExecutor;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.TimeUnit;
029    
030    import javax.sql.DataSource;
031    
032    import org.apache.activemq.ActiveMQMessageAudit;
033    import org.apache.activemq.broker.BrokerService;
034    import org.apache.activemq.broker.BrokerServiceAware;
035    import org.apache.activemq.broker.ConnectionContext;
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ActiveMQQueue;
038    import org.apache.activemq.command.ActiveMQTopic;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessageId;
041    import org.apache.activemq.command.ProducerId;
042    import org.apache.activemq.openwire.OpenWireFormat;
043    import org.apache.activemq.store.MessageStore;
044    import org.apache.activemq.store.PersistenceAdapter;
045    import org.apache.activemq.store.TopicMessageStore;
046    import org.apache.activemq.store.TransactionStore;
047    import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
048    import org.apache.activemq.store.memory.MemoryTransactionStore;
049    import org.apache.activemq.usage.SystemUsage;
050    import org.apache.activemq.util.ByteSequence;
051    import org.apache.activemq.util.FactoryFinder;
052    import org.apache.activemq.util.IOExceptionSupport;
053    import org.apache.activemq.util.LongSequenceGenerator;
054    import org.apache.activemq.wireformat.WireFormat;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    /**
059     * A {@link PersistenceAdapter} implementation using JDBC for persistence
060     * storage.
061     * 
062     * This persistence adapter will correctly remember prepared XA transactions,
063     * but it will not keep track of local transaction commits so that operations
064     * performed against the Message store are done as a single uow.
065     * 
066     * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
067     * 
068     * 
069     */
070    public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
071        BrokerServiceAware {
072    
073        private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
074        private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
075                                                                       "META-INF/services/org/apache/activemq/store/jdbc/");
076        private static FactoryFinder lockFactoryFinder = new FactoryFinder(
077                                                                        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
078    
079        private WireFormat wireFormat = new OpenWireFormat();
080        private BrokerService brokerService;
081        private Statements statements;
082        private JDBCAdapter adapter;
083        private MemoryTransactionStore transactionStore;
084        private ScheduledThreadPoolExecutor clockDaemon;
085        private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
086        private int cleanupPeriod = 1000 * 60 * 5;
087        private boolean useExternalMessageReferences;
088        private boolean useDatabaseLock = true;
089        private long lockKeepAlivePeriod = 1000*30;
090        private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
091        private DatabaseLocker databaseLocker;
092        private boolean createTablesOnStartup = true;
093        private DataSource lockDataSource;
094        private int transactionIsolation;
095        private File directory;
096        
097        protected int maxProducersToAudit=1024;
098        protected int maxAuditDepth=1000;
099        protected boolean enableAudit=false;
100        protected int auditRecoveryDepth = 1024;
101        protected ActiveMQMessageAudit audit;
102        
103        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
104        protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
105    
106        public JDBCPersistenceAdapter() {
107        }
108    
109        public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
110            super(ds);
111            this.wireFormat = wireFormat;
112        }
113    
114        public Set<ActiveMQDestination> getDestinations() {
115            TransactionContext c = null;
116            try {
117                c = getTransactionContext();
118                return getAdapter().doGetDestinations(c);
119            } catch (IOException e) {
120                return emptyDestinationSet();
121            } catch (SQLException e) {
122                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
123                return emptyDestinationSet();
124            } finally {
125                if (c != null) {
126                    try {
127                        c.close();
128                    } catch (Throwable e) {
129                    }
130                }
131            }
132        }
133    
134        @SuppressWarnings("unchecked")
135        private Set<ActiveMQDestination> emptyDestinationSet() {
136            return Collections.EMPTY_SET;
137        }
138        
139        protected void createMessageAudit() {
140            if (enableAudit && audit == null) {
141                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
142                TransactionContext c = null;
143                
144                try {
145                    c = getTransactionContext();
146                    getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
147                        public void messageId(MessageId id) {
148                            audit.isDuplicate(id);
149                        }
150                    });
151                } catch (Exception e) {
152                    LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
153                } finally {
154                    if (c != null) {
155                        try {
156                            c.close();
157                        } catch (Throwable e) {
158                        }
159                    }
160                }
161            }
162        }
163        
164        public void initSequenceIdGenerator() {
165            TransactionContext c = null;
166            try {
167                c = getTransactionContext();
168                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
169                    public void messageId(MessageId id) {
170                        audit.isDuplicate(id);
171                    }
172                });
173            } catch (Exception e) {
174                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
175            } finally {
176                if (c != null) {
177                    try {
178                        c.close();
179                    } catch (Throwable e) {
180                    }
181                }
182            }
183            
184        }
185    
186        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
187            MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
188            if (transactionStore != null) {
189                rc = transactionStore.proxy(rc);
190            }
191            return rc;
192        }
193    
194        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
195            TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
196            if (transactionStore != null) {
197                rc = transactionStore.proxy(rc);
198            }
199            return rc;
200        }
201    
202        /**
203         * Cleanup method to remove any state associated with the given destination
204         * @param destination Destination to forget
205         */
206        public void removeQueueMessageStore(ActiveMQQueue destination) {
207            if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
208                try {
209                    removeConsumerDestination(destination);
210                } catch (IOException ioe) {
211                    LOG.error("Failed to remove consumer destination: " + destination, ioe);
212                }
213            }
214        }
215    
216        private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
217            TransactionContext c = getTransactionContext();
218            try {
219                String id = destination.getQualifiedName();
220                getAdapter().doDeleteSubscription(c, destination, id, id);
221            } catch (SQLException e) {
222                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
223                throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
224            } finally {
225                c.close();
226            }
227        }
228    
229        /**
230         * Cleanup method to remove any state associated with the given destination
231         * No state retained.... nothing to do
232         *
233         * @param destination Destination to forget
234         */
235        public void removeTopicMessageStore(ActiveMQTopic destination) {
236        }
237    
238        public TransactionStore createTransactionStore() throws IOException {
239            if (transactionStore == null) {
240                transactionStore = new MemoryTransactionStore(this);
241            }
242            return this.transactionStore;
243        }
244    
245        public long getLastMessageBrokerSequenceId() throws IOException {
246            TransactionContext c = getTransactionContext();
247            try {
248                long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
249                sequenceGenerator.setLastSequenceId(seq);
250                long brokerSeq = 0;
251                if (seq != 0) {
252                    byte[] msg = getAdapter().doGetMessageById(c, seq);
253                    if (msg != null) {
254                        Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
255                        brokerSeq = last.getMessageId().getBrokerSequenceId();
256                    } else {
257                       LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
258                    }
259                }
260                return brokerSeq;
261            } catch (SQLException e) {
262                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
263                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
264            } finally {
265                c.close();
266            }
267        }
268        
269        public long getLastProducerSequenceId(ProducerId id) throws IOException {
270            TransactionContext c = getTransactionContext();
271            try {
272                return getAdapter().doGetLastProducerSequenceId(c, id);
273            } catch (SQLException e) {
274                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
275                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
276            } finally {
277                c.close();
278            }
279        }
280    
281    
282        public void start() throws Exception {
283            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
284    
285            if (isCreateTablesOnStartup()) {
286                TransactionContext transactionContext = getTransactionContext();
287                transactionContext.begin();
288                try {
289                    try {
290                        getAdapter().doCreateTables(transactionContext);
291                    } catch (SQLException e) {
292                        LOG.warn("Cannot create tables due to: " + e);
293                        JDBCPersistenceAdapter.log("Failure Details: ", e);
294                    }
295                } finally {
296                    transactionContext.commit();
297                }
298            }
299    
300            if (isUseDatabaseLock()) {
301                DatabaseLocker service = getDatabaseLocker();
302                if (service == null) {
303                    LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
304                } else {
305                    service.start();
306                    if (lockKeepAlivePeriod > 0) {
307                        keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
308                            public void run() {
309                                databaseLockKeepAlive();
310                            }
311                        }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
312                    }
313                    if (brokerService != null) {
314                        brokerService.getBroker().nowMasterBroker();
315                    }
316                }
317            }
318    
319            // Cleanup the db periodically.
320            if (cleanupPeriod > 0) {
321                cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
322                    public void run() {
323                        cleanup();
324                    }
325                }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
326            }
327            
328            createMessageAudit();
329        }
330    
331        public synchronized void stop() throws Exception {
332            if (cleanupTicket != null) {
333                cleanupTicket.cancel(true);
334                cleanupTicket = null;
335            }
336            if (keepAliveTicket != null) {
337                keepAliveTicket.cancel(false);
338                keepAliveTicket = null;
339            }
340            
341            // do not shutdown clockDaemon as it may kill the thread initiating shutdown
342            DatabaseLocker service = getDatabaseLocker();
343            if (service != null) {
344                service.stop();
345            }
346        }
347    
348        public void cleanup() {
349            TransactionContext c = null;
350            try {
351                LOG.debug("Cleaning up old messages.");
352                c = getTransactionContext();
353                getAdapter().doDeleteOldMessages(c);
354            } catch (IOException e) {
355                LOG.warn("Old message cleanup failed due to: " + e, e);
356            } catch (SQLException e) {
357                LOG.warn("Old message cleanup failed due to: " + e);
358                JDBCPersistenceAdapter.log("Failure Details: ", e);
359            } finally {
360                if (c != null) {
361                    try {
362                        c.close();
363                    } catch (Throwable e) {
364                    }
365                }
366                LOG.debug("Cleanup done.");
367            }
368        }
369    
370        public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
371            this.clockDaemon = clockDaemon;
372        }
373    
374        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
375            if (clockDaemon == null) {
376                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
377                    public Thread newThread(Runnable runnable) {
378                        Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
379                        thread.setDaemon(true);
380                        return thread;
381                    }
382                });
383            }
384            return clockDaemon;
385        }
386    
387        public JDBCAdapter getAdapter() throws IOException {
388            if (adapter == null) {
389                setAdapter(createAdapter());
390            }
391            return adapter;
392        }
393    
394        public DatabaseLocker getDatabaseLocker() throws IOException {
395            if (databaseLocker == null && isUseDatabaseLock()) {
396                setDatabaseLocker(loadDataBaseLocker());
397            }
398            return databaseLocker;
399        }
400    
401        /**
402         * Sets the database locker strategy to use to lock the database on startup
403         * @throws IOException 
404         */
405        public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
406            databaseLocker = locker;
407            databaseLocker.setPersistenceAdapter(this);
408            databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
409        }
410    
411        public DataSource getLockDataSource() throws IOException {
412            if (lockDataSource == null) {
413                lockDataSource = getDataSource();
414                if (lockDataSource == null) {
415                    throw new IllegalArgumentException(
416                            "No dataSource property has been configured");
417                }
418            } else {
419                LOG.info("Using a separate dataSource for locking: "
420                        + lockDataSource);
421            }
422            return lockDataSource;
423        }
424        
425        public void setLockDataSource(DataSource dataSource) {
426            this.lockDataSource = dataSource;
427        }
428    
429        public BrokerService getBrokerService() {
430            return brokerService;
431        }
432    
433        public void setBrokerService(BrokerService brokerService) {
434            this.brokerService = brokerService;
435        }
436    
437        /**
438         * @throws IOException
439         */
440        protected JDBCAdapter createAdapter() throws IOException {
441           
442            adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
443           
444            // Use the default JDBC adapter if the
445            // Database type is not recognized.
446            if (adapter == null) {
447                adapter = new DefaultJDBCAdapter();
448                LOG.debug("Using default JDBC Adapter: " + adapter);
449            }
450            return adapter;
451        }
452    
453        private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
454            Object adapter = null;
455            TransactionContext c = getTransactionContext();
456            try {
457                try {
458                    // Make the filename file system safe.
459                    String dirverName = c.getConnection().getMetaData().getDriverName();
460                    dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
461    
462                    try {
463                        adapter = finder.newInstance(dirverName);
464                        LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
465                    } catch (Throwable e) {
466                        LOG.info("Database " + kind + " driver override not found for : [" + dirverName
467                                 + "].  Will use default implementation.");
468                    }
469                } catch (SQLException e) {
470                    LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
471                              + e.getMessage());
472                    JDBCPersistenceAdapter.log("Failure Details: ", e);
473                }
474            } finally {
475                c.close();
476            }
477            return adapter;
478        }
479    
480        public void setAdapter(JDBCAdapter adapter) {
481            this.adapter = adapter;
482            this.adapter.setStatements(getStatements());
483            this.adapter.setMaxRows(getMaxRows());
484        }
485    
486        public WireFormat getWireFormat() {
487            return wireFormat;
488        }
489    
490        public void setWireFormat(WireFormat wireFormat) {
491            this.wireFormat = wireFormat;
492        }
493    
494        public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
495            if (context == null) {
496                return getTransactionContext();
497            } else {
498                TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
499                if (answer == null) {
500                    answer = getTransactionContext();
501                    context.setLongTermStoreContext(answer);
502                }
503                return answer;
504            }
505        }
506    
507        public TransactionContext getTransactionContext() throws IOException {
508            TransactionContext answer = new TransactionContext(this);
509            if (transactionIsolation > 0) {
510                answer.setTransactionIsolation(transactionIsolation);
511            }
512            return answer;
513        }
514    
515        public void beginTransaction(ConnectionContext context) throws IOException {
516            TransactionContext transactionContext = getTransactionContext(context);
517            transactionContext.begin();
518        }
519    
520        public void commitTransaction(ConnectionContext context) throws IOException {
521            TransactionContext transactionContext = getTransactionContext(context);
522            transactionContext.commit();
523        }
524    
525        public void rollbackTransaction(ConnectionContext context) throws IOException {
526            TransactionContext transactionContext = getTransactionContext(context);
527            transactionContext.rollback();
528        }
529    
530        public int getCleanupPeriod() {
531            return cleanupPeriod;
532        }
533    
534        /**
535         * Sets the number of milliseconds until the database is attempted to be
536         * cleaned up for durable topics
537         */
538        public void setCleanupPeriod(int cleanupPeriod) {
539            this.cleanupPeriod = cleanupPeriod;
540        }
541    
542        public void deleteAllMessages() throws IOException {
543            TransactionContext c = getTransactionContext();
544            try {
545                getAdapter().doDropTables(c);
546                getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
547                getAdapter().doCreateTables(c);
548                LOG.info("Persistence store purged.");
549            } catch (SQLException e) {
550                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
551                throw IOExceptionSupport.create(e);
552            } finally {
553                c.close();
554            }
555        }
556    
557        public boolean isUseExternalMessageReferences() {
558            return useExternalMessageReferences;
559        }
560    
561        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
562            this.useExternalMessageReferences = useExternalMessageReferences;
563        }
564    
565        public boolean isCreateTablesOnStartup() {
566            return createTablesOnStartup;
567        }
568    
569        /**
570         * Sets whether or not tables are created on startup
571         */
572        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
573            this.createTablesOnStartup = createTablesOnStartup;
574        }
575    
576        public boolean isUseDatabaseLock() {
577            return useDatabaseLock;
578        }
579    
580        /**
581         * Sets whether or not an exclusive database lock should be used to enable
582         * JDBC Master/Slave. Enabled by default.
583         */
584        public void setUseDatabaseLock(boolean useDatabaseLock) {
585            this.useDatabaseLock = useDatabaseLock;
586        }
587    
588        public static void log(String msg, SQLException e) {
589            String s = msg + e.getMessage();
590            while (e.getNextException() != null) {
591                e = e.getNextException();
592                s += ", due to: " + e.getMessage();
593            }
594            LOG.warn(s, e);
595        }
596    
597        public Statements getStatements() {
598            if (statements == null) {
599                statements = new Statements();
600            }
601            return statements;
602        }
603    
604        public void setStatements(Statements statements) {
605            this.statements = statements;
606        }
607    
608        /**
609         * @param usageManager The UsageManager that is controlling the
610         *                destination's memory usage.
611         */
612        public void setUsageManager(SystemUsage usageManager) {
613        }
614    
615        protected void databaseLockKeepAlive() {
616            boolean stop = false;
617            try {
618                DatabaseLocker locker = getDatabaseLocker();
619                if (locker != null) {
620                    if (!locker.keepAlive()) {
621                        stop = true;
622                    }
623                }
624            } catch (IOException e) {
625                LOG.error("Failed to get database when trying keepalive: " + e, e);
626            }
627            if (stop) {
628                stopBroker();
629            }
630        }
631    
632        protected void stopBroker() {
633            // we can no longer keep the lock so lets fail
634            LOG.info("No longer able to keep the exclusive lock so giving up being a master");
635            try {
636                brokerService.stop();
637            } catch (Exception e) {
638                LOG.warn("Failure occurred while stopping broker");
639            }
640        }
641    
642        protected DatabaseLocker loadDataBaseLocker() throws IOException {
643            DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");       
644            if (locker == null) {
645                locker = new DefaultDatabaseLocker();
646                LOG.debug("Using default JDBC Locker: " + locker);
647            }
648            return locker;
649        }
650    
651        public void setBrokerName(String brokerName) {
652        }
653    
654        public String toString() {
655            return "JDBCPersistenceAdapter(" + super.toString() + ")";
656        }
657    
658        public void setDirectory(File dir) {
659            this.directory=dir;
660        }
661        
662        public File getDirectory(){
663            if (this.directory==null && brokerService != null){
664                this.directory=brokerService.getBrokerDataDirectory();
665            }
666            return this.directory;
667        }
668    
669        // interesting bit here is proof that DB is ok
670        public void checkpoint(boolean sync) throws IOException {
671            // by pass TransactionContext to avoid IO Exception handler
672            Connection connection = null;
673            try {
674                connection = getDataSource().getConnection();
675            } catch (SQLException e) {
676                LOG.debug("Could not get JDBC connection for checkpoint: " + e);
677                throw IOExceptionSupport.create(e);
678            } finally {
679                if (connection != null) {
680                    try {
681                        connection.close();
682                    } catch (Throwable ignored) {
683                    }
684                }
685            }
686        }
687    
688        public long size(){
689            return 0;
690        }
691    
692        public long getLockKeepAlivePeriod() {
693            return lockKeepAlivePeriod;
694        }
695    
696        public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
697            this.lockKeepAlivePeriod = lockKeepAlivePeriod;
698        }
699    
700        public long getLockAcquireSleepInterval() {
701            return lockAcquireSleepInterval;
702        }
703    
704        /**
705         * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
706         * not applied if DataBaseLocker is injected.
707         */
708        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
709            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
710        }
711        
712        /**
713         * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
714         * This allowable dirty isolation level may not be achievable in clustered DB environments
715         * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
716         * see isolation level constants in {@link java.sql.Connection}
717         * @param transactionIsolation the isolation level to use
718         */
719        public void setTransactionIsolation(int transactionIsolation) {
720            this.transactionIsolation = transactionIsolation;
721        }
722    
723            public int getMaxProducersToAudit() {
724                    return maxProducersToAudit;
725            }
726    
727            public void setMaxProducersToAudit(int maxProducersToAudit) {
728                    this.maxProducersToAudit = maxProducersToAudit;
729            }
730    
731            public int getMaxAuditDepth() {
732                    return maxAuditDepth;
733            }
734    
735            public void setMaxAuditDepth(int maxAuditDepth) {
736                    this.maxAuditDepth = maxAuditDepth;
737            }
738    
739            public boolean isEnableAudit() {
740                    return enableAudit;
741            }
742    
743            public void setEnableAudit(boolean enableAudit) {
744                    this.enableAudit = enableAudit;
745            }
746    
747        public int getAuditRecoveryDepth() {
748            return auditRecoveryDepth;
749        }
750    
751        public void setAuditRecoveryDepth(int auditRecoveryDepth) {
752            this.auditRecoveryDepth = auditRecoveryDepth;
753        }
754    
755        public long getNextSequenceId() {
756            synchronized(sequenceGenerator) {
757                return sequenceGenerator.getNextSequenceId();
758            }
759        }
760    
761        public int getMaxRows() {
762            return maxRows;
763        }
764    
765        /*
766         * the max rows return from queries, with sparse selectors this may need to be increased
767         */
768        public void setMaxRows(int maxRows) {
769            this.maxRows = maxRows;
770        }
771    }