001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import org.apache.activemq.broker.BrokerService;
020    import org.apache.activemq.broker.BrokerServiceAware;
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.command.ActiveMQDestination;
023    import org.apache.activemq.command.ActiveMQQueue;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.LocalTransactionId;
026    import org.apache.activemq.command.ProducerId;
027    import org.apache.activemq.command.TransactionId;
028    import org.apache.activemq.command.XATransactionId;
029    import org.apache.activemq.protobuf.Buffer;
030    import org.apache.activemq.store.MessageStore;
031    import org.apache.activemq.store.PersistenceAdapter;
032    import org.apache.activemq.store.TopicMessageStore;
033    import org.apache.activemq.store.TransactionStore;
034    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
035    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
036    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
037    import org.apache.activemq.usage.SystemUsage;
038    
039    import java.io.File;
040    import java.io.IOException;
041    import java.util.Set;
042    
043    /**
044     * An implementation of {@link PersistenceAdapter} designed for use with
045     * KahaDB - Embedded Lightweight Non-Relational Database
046     * 
047     * @org.apache.xbean.XBean element="kahaDB"
048     * 
049     */
050    public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
051        private final KahaDBStore letter = new KahaDBStore();
052    
053        /**
054         * @param context
055         * @throws IOException
056         * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
057         */
058        public void beginTransaction(ConnectionContext context) throws IOException {
059            this.letter.beginTransaction(context);
060        }
061    
062        /**
063         * @param sync
064         * @throws IOException
065         * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
066         */
067        public void checkpoint(boolean sync) throws IOException {
068            this.letter.checkpoint(sync);
069        }
070    
071        /**
072         * @param context
073         * @throws IOException
074         * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
075         */
076        public void commitTransaction(ConnectionContext context) throws IOException {
077            this.letter.commitTransaction(context);
078        }
079    
080        /**
081         * @param destination
082         * @return MessageStore
083         * @throws IOException
084         * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
085         */
086        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
087            return this.letter.createQueueMessageStore(destination);
088        }
089    
090        /**
091         * @param destination
092         * @return TopicMessageStore
093         * @throws IOException
094         * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
095         */
096        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
097            return this.letter.createTopicMessageStore(destination);
098        }
099    
100        /**
101         * @return TransactionStore
102         * @throws IOException
103         * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
104         */
105        public TransactionStore createTransactionStore() throws IOException {
106            return this.letter.createTransactionStore();
107        }
108    
109        /**
110         * @throws IOException
111         * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
112         */
113        public void deleteAllMessages() throws IOException {
114            this.letter.deleteAllMessages();
115        }
116    
117        /**
118         * @return destinations
119         * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
120         */
121        public Set<ActiveMQDestination> getDestinations() {
122            return this.letter.getDestinations();
123        }
124    
125        /**
126         * @return lastMessageBrokerSequenceId
127         * @throws IOException
128         * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
129         */
130        public long getLastMessageBrokerSequenceId() throws IOException {
131            return this.letter.getLastMessageBrokerSequenceId();
132        }
133    
134        public long getLastProducerSequenceId(ProducerId id) throws IOException {
135            return this.letter.getLastProducerSequenceId(id);
136        }
137    
138        /**
139         * @param destination
140         * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
141         */
142        public void removeQueueMessageStore(ActiveMQQueue destination) {
143            this.letter.removeQueueMessageStore(destination);
144        }
145    
146        /**
147         * @param destination
148         * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
149         */
150        public void removeTopicMessageStore(ActiveMQTopic destination) {
151            this.letter.removeTopicMessageStore(destination);
152        }
153    
154        /**
155         * @param context
156         * @throws IOException
157         * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
158         */
159        public void rollbackTransaction(ConnectionContext context) throws IOException {
160            this.letter.rollbackTransaction(context);
161        }
162    
163        /**
164         * @param brokerName
165         * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
166         */
167        public void setBrokerName(String brokerName) {
168            this.letter.setBrokerName(brokerName);
169        }
170    
171        /**
172         * @param usageManager
173         * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
174         */
175        public void setUsageManager(SystemUsage usageManager) {
176            this.letter.setUsageManager(usageManager);
177        }
178    
179        /**
180         * @return the size of the store
181         * @see org.apache.activemq.store.PersistenceAdapter#size()
182         */
183        public long size() {
184            return this.letter.size();
185        }
186    
187        /**
188         * @throws Exception
189         * @see org.apache.activemq.Service#start()
190         */
191        public void start() throws Exception {
192            this.letter.start();
193        }
194    
195        /**
196         * @throws Exception
197         * @see org.apache.activemq.Service#stop()
198         */
199        public void stop() throws Exception {
200            this.letter.stop();
201        }
202    
203        /**
204         * Get the journalMaxFileLength
205         * 
206         * @return the journalMaxFileLength
207         */
208        public int getJournalMaxFileLength() {
209            return this.letter.getJournalMaxFileLength();
210        }
211    
212        /**
213         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
214         * be used
215         * 
216         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
217         */
218        public void setJournalMaxFileLength(int journalMaxFileLength) {
219            this.letter.setJournalMaxFileLength(journalMaxFileLength);
220        }
221    
222        /**
223         * Set the max number of producers (LRU cache) to track for duplicate sends
224         */
225        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
226            this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
227        }
228        
229        public int getMaxFailoverProducersToTrack() {
230            return this.letter.getMaxFailoverProducersToTrack();
231        }
232    
233        /**
234         * set the audit window depth for duplicate suppression (should exceed the max transaction
235         * batch)
236         */
237        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
238            this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
239        }
240        
241        public int getFailoverProducersAuditDepth() {
242            return this.letter.getFailoverProducersAuditDepth();
243        }
244        
245        /**
246         * Get the checkpointInterval
247         * 
248         * @return the checkpointInterval
249         */
250        public long getCheckpointInterval() {
251            return this.letter.getCheckpointInterval();
252        }
253    
254        /**
255         * Set the checkpointInterval
256         * 
257         * @param checkpointInterval
258         *            the checkpointInterval to set
259         */
260        public void setCheckpointInterval(long checkpointInterval) {
261            this.letter.setCheckpointInterval(checkpointInterval);
262        }
263    
264        /**
265         * Get the cleanupInterval
266         * 
267         * @return the cleanupInterval
268         */
269        public long getCleanupInterval() {
270            return this.letter.getCleanupInterval();
271        }
272    
273        /**
274         * Set the cleanupInterval
275         * 
276         * @param cleanupInterval
277         *            the cleanupInterval to set
278         */
279        public void setCleanupInterval(long cleanupInterval) {
280            this.letter.setCleanupInterval(cleanupInterval);
281        }
282    
283        /**
284         * Get the indexWriteBatchSize
285         * 
286         * @return the indexWriteBatchSize
287         */
288        public int getIndexWriteBatchSize() {
289            return this.letter.getIndexWriteBatchSize();
290        }
291    
292        /**
293         * Set the indexWriteBatchSize
294         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
295         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
296         * @param indexWriteBatchSize
297         *            the indexWriteBatchSize to set
298         */
299        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
300            this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
301        }
302    
303        /**
304         * Get the journalMaxWriteBatchSize
305         * 
306         * @return the journalMaxWriteBatchSize
307         */
308        public int getJournalMaxWriteBatchSize() {
309            return this.letter.getJournalMaxWriteBatchSize();
310        }
311    
312        /**
313         * Set the journalMaxWriteBatchSize
314         *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
315         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
316         * @param journalMaxWriteBatchSize
317         *            the journalMaxWriteBatchSize to set
318         */
319        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
320            this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
321        }
322    
323        /**
324         * Get the enableIndexWriteAsync
325         * 
326         * @return the enableIndexWriteAsync
327         */
328        public boolean isEnableIndexWriteAsync() {
329            return this.letter.isEnableIndexWriteAsync();
330        }
331    
332        /**
333         * Set the enableIndexWriteAsync
334         * 
335         * @param enableIndexWriteAsync
336         *            the enableIndexWriteAsync to set
337         */
338        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
339            this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
340        }
341    
342        /**
343         * Get the directory
344         * 
345         * @return the directory
346         */
347        public File getDirectory() {
348            return this.letter.getDirectory();
349        }
350    
351        /**
352         * @param dir
353         * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
354         */
355        public void setDirectory(File dir) {
356            this.letter.setDirectory(dir);
357        }
358    
359        /**
360         * Get the enableJournalDiskSyncs
361         * 
362         * @return the enableJournalDiskSyncs
363         */
364        public boolean isEnableJournalDiskSyncs() {
365            return this.letter.isEnableJournalDiskSyncs();
366        }
367    
368        /**
369         * Set the enableJournalDiskSyncs
370         * 
371         * @param enableJournalDiskSyncs
372         *            the enableJournalDiskSyncs to set
373         */
374        public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
375            this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
376        }
377    
378        /**
379         * Get the indexCacheSize
380         * 
381         * @return the indexCacheSize
382         */
383        public int getIndexCacheSize() {
384            return this.letter.getIndexCacheSize();
385        }
386    
387        /**
388         * Set the indexCacheSize
389         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
390         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
391         * @param indexCacheSize
392         *            the indexCacheSize to set
393         */
394        public void setIndexCacheSize(int indexCacheSize) {
395            this.letter.setIndexCacheSize(indexCacheSize);
396        }
397    
398        /**
399         * Get the ignoreMissingJournalfiles
400         * 
401         * @return the ignoreMissingJournalfiles
402         */
403        public boolean isIgnoreMissingJournalfiles() {
404            return this.letter.isIgnoreMissingJournalfiles();
405        }
406    
407        /**
408         * Set the ignoreMissingJournalfiles
409         * 
410         * @param ignoreMissingJournalfiles
411         *            the ignoreMissingJournalfiles to set
412         */
413        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
414            this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
415        }
416    
417        public boolean isChecksumJournalFiles() {
418            return letter.isChecksumJournalFiles();
419        }
420    
421        public boolean isCheckForCorruptJournalFiles() {
422            return letter.isCheckForCorruptJournalFiles();
423        }
424    
425        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
426            letter.setChecksumJournalFiles(checksumJournalFiles);
427        }
428    
429        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
430            letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
431        }
432    
433        public void setBrokerService(BrokerService brokerService) {
434            letter.setBrokerService(brokerService);
435        }
436    
437        public boolean isArchiveDataLogs() {
438            return letter.isArchiveDataLogs();
439        }
440    
441        public void setArchiveDataLogs(boolean archiveDataLogs) {
442            letter.setArchiveDataLogs(archiveDataLogs);
443        }
444    
445        public File getDirectoryArchive() {
446            return letter.getDirectoryArchive();
447        }
448    
449        public void setDirectoryArchive(File directoryArchive) {
450            letter.setDirectoryArchive(directoryArchive);
451        }
452    
453        public boolean isConcurrentStoreAndDispatchQueues() {
454            return letter.isConcurrentStoreAndDispatchQueues();
455        }
456    
457        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
458            letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
459        }
460    
461        public boolean isConcurrentStoreAndDispatchTopics() {
462            return letter.isConcurrentStoreAndDispatchTopics();
463        }
464    
465        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
466            letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
467        }
468    
469        public int getMaxAsyncJobs() {
470            return letter.getMaxAsyncJobs();
471        }
472        /**
473         * @param maxAsyncJobs
474         *            the maxAsyncJobs to set
475         */
476        public void setMaxAsyncJobs(int maxAsyncJobs) {
477            letter.setMaxAsyncJobs(maxAsyncJobs);
478        }
479        
480        /**
481         * @return the databaseLockedWaitDelay
482         */
483        public int getDatabaseLockedWaitDelay() {
484            return letter.getDatabaseLockedWaitDelay();
485        }
486    
487        /**
488         * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
489         */
490        public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
491           letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
492        }
493    
494        public boolean getForceRecoverIndex() {
495            return letter.getForceRecoverIndex();
496        }
497    
498        public void setForceRecoverIndex(boolean forceRecoverIndex) {
499            letter.setForceRecoverIndex(forceRecoverIndex);
500        }
501    
502        public boolean isArchiveCorruptedIndex() {
503            return letter.isArchiveCorruptedIndex();
504        }
505    
506        public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
507            letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
508        }
509    
510        /**
511         * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
512         * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)  true
513         */
514        public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
515            letter.setRewriteOnRedelivery(rewriteOnRedelivery);
516        }
517    
518        public boolean isRewriteOnRedelivery() {
519            return letter.isRewriteOnRedelivery();
520        }
521    
522        public float getIndexLFUEvictionFactor() {
523            return letter.getIndexLFUEvictionFactor();
524        }
525    
526        public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
527            letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
528        }
529    
530        public boolean isUseIndexLFRUEviction() {
531            return letter.isUseIndexLFRUEviction();
532        }
533    
534        public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
535            letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
536        }
537    
538        public void setEnableIndexDiskSyncs(boolean diskSyncs) {
539            letter.setEnableIndexDiskSyncs(diskSyncs);
540        }
541    
542        public boolean isEnableIndexDiskSyncs() {
543            return letter.isEnableIndexDiskSyncs();
544        }
545    
546        public void setEnableIndexRecoveryFile(boolean enable) {
547            letter.setEnableIndexRecoveryFile(enable);
548        }
549    
550        public boolean  isEnableIndexRecoveryFile() {
551            return letter.isEnableIndexRecoveryFile();
552        }
553    
554        public void setEnableIndexPageCaching(boolean enable) {
555            letter.setEnableIndexPageCaching(enable);
556        }
557    
558        public boolean isEnableIndexPageCaching() {
559            return letter.isEnableIndexPageCaching();
560        }
561    
562        public KahaDBStore getStore() {
563            return letter;
564        }
565    
566        public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
567            if (txid == null) {
568                return null;
569            }
570            KahaTransactionInfo rc = new KahaTransactionInfo();
571    
572            if (txid.isLocalTransaction()) {
573                LocalTransactionId t = (LocalTransactionId) txid;
574                KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
575                kahaTxId.setConnectionId(t.getConnectionId().getValue());
576                kahaTxId.setTransacitonId(t.getValue());
577                rc.setLocalTransacitonId(kahaTxId);
578            } else {
579                XATransactionId t = (XATransactionId) txid;
580                KahaXATransactionId kahaTxId = new KahaXATransactionId();
581                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
582                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
583                kahaTxId.setFormatId(t.getFormatId());
584                rc.setXaTransacitonId(kahaTxId);
585            }
586            return rc;
587        }
588    
589        @Override
590        public String toString() {
591            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
592            return "KahaDBPersistenceAdapter[" + path + "]";
593        }
594    
595    }