001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.File;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.LinkedHashMap;
030import java.util.TreeMap;
031import java.util.Map.Entry;
032import java.util.concurrent.atomic.AtomicBoolean;
033
034import org.apache.activemq.command.SubscriptionInfo;
035import org.apache.activemq.command.TransactionId;
036import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
037import org.apache.activemq.store.kahadb.data.KahaDestination;
038import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
039import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
040import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
041import org.apache.activemq.util.ByteSequence;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.kahadb.index.BTreeIndex;
045import org.apache.kahadb.page.PageFile;
046import org.apache.kahadb.page.Transaction;
047import org.apache.kahadb.util.LongMarshaller;
048import org.apache.kahadb.util.Marshaller;
049import org.apache.kahadb.util.StringMarshaller;
050import org.apache.kahadb.util.VariableMarshaller;
051
052public class TempMessageDatabase {
053
054    private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class);
055
056    public static final int CLOSED_STATE = 1;
057    public static final int OPEN_STATE = 2;
058
059    protected BTreeIndex<String, StoredDestination> destinations;
060    protected PageFile pageFile;
061
062    protected File directory;
063    
064    boolean enableIndexWriteAsync = true;
065    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
066    
067    protected AtomicBoolean started = new AtomicBoolean();
068    protected AtomicBoolean opened = new AtomicBoolean();
069
070    public TempMessageDatabase() {
071    }
072
073    public void start() throws Exception {
074        if (started.compareAndSet(false, true)) {
075                load();
076        }
077    }
078
079    public void stop() throws Exception {
080        if (started.compareAndSet(true, false)) {
081            unload();
082        }
083    }
084
085        private void loadPageFile() throws IOException {
086                synchronized (indexMutex) {
087                    final PageFile pageFile = getPageFile();
088            pageFile.load();
089            pageFile.tx().execute(new Transaction.Closure<IOException>() {
090                public void execute(Transaction tx) throws IOException {
091                    destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
092                    destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
093                    destinations.setValueMarshaller(new StoredDestinationMarshaller());
094                    destinations.load(tx);
095                }
096            });
097            pageFile.flush();
098            storedDestinations.clear();
099        }
100        }
101        
102        /**
103         * @throws IOException
104         */
105        public void open() throws IOException {
106                if( opened.compareAndSet(false, true) ) {
107                loadPageFile();
108                }
109        }
110        
111    public void load() throws IOException {
112        synchronized (indexMutex) {
113                open();
114            pageFile.unload();
115            pageFile.delete();
116            loadPageFile();
117        }
118    }
119
120    
121        public void close() throws IOException, InterruptedException {
122                if( opened.compareAndSet(true, false)) {
123                synchronized (indexMutex) {
124                    pageFile.unload();
125                }
126                }
127        }
128        
129    public void unload() throws IOException, InterruptedException {
130        synchronized (indexMutex) {
131            if( pageFile.isLoaded() ) {
132                close();
133            }
134        }
135    }
136
137    public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
138        if (txid!=null) {
139            synchronized (indexMutex) {
140                ArrayList<Operation> inflightTx = getInflightTx(txid);
141                inflightTx.add(new AddOpperation(command, data));
142            }
143        } else {
144            synchronized (indexMutex) {
145                pageFile.tx().execute(new Transaction.Closure<IOException>() {
146                    public void execute(Transaction tx) throws IOException {
147                        upadateIndex(tx, command, data);
148                    }
149                });
150            }
151        }
152    }
153
154    public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
155        if (txid!=null) {
156            synchronized (indexMutex) {
157                ArrayList<Operation> inflightTx = getInflightTx(txid);
158                inflightTx.add(new RemoveOpperation(command));
159            }
160        } else {
161            synchronized (indexMutex) {
162                pageFile.tx().execute(new Transaction.Closure<IOException>() {
163                    public void execute(Transaction tx) throws IOException {
164                        updateIndex(tx, command);
165                    }
166                });
167            }
168        }
169
170    }
171
172    public void process(final KahaRemoveDestinationCommand command) throws IOException {
173        synchronized (indexMutex) {
174            pageFile.tx().execute(new Transaction.Closure<IOException>() {
175                public void execute(Transaction tx) throws IOException {
176                    updateIndex(tx, command);
177                }
178            });
179        }
180    }
181
182    public void process(final KahaSubscriptionCommand command) throws IOException {
183        synchronized (indexMutex) {
184            pageFile.tx().execute(new Transaction.Closure<IOException>() {
185                public void execute(Transaction tx) throws IOException {
186                    updateIndex(tx, command);
187                }
188            });
189        }
190    }
191
192    public void processCommit(TransactionId key) throws IOException {
193        synchronized (indexMutex) {
194            ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
195            if (inflightTx == null) {
196                inflightTx = preparedTransactions.remove(key);
197            }
198            if (inflightTx == null) {
199                return;
200            }
201
202            final ArrayList<Operation> messagingTx = inflightTx;
203            pageFile.tx().execute(new Transaction.Closure<IOException>() {
204                public void execute(Transaction tx) throws IOException {
205                    for (Operation op : messagingTx) {
206                        op.execute(tx);
207                    }
208                }
209            });
210        }
211    }
212
213    public void processPrepare(TransactionId key) {
214        synchronized (indexMutex) {
215            ArrayList<Operation> tx = inflightTransactions.remove(key);
216            if (tx != null) {
217                preparedTransactions.put(key, tx);
218            }
219        }
220    }
221
222    public void processRollback(TransactionId key) {
223        synchronized (indexMutex) {
224            ArrayList<Operation> tx = inflightTransactions.remove(key);
225            if (tx == null) {
226                preparedTransactions.remove(key);
227            }
228        }
229    }
230
231    // /////////////////////////////////////////////////////////////////
232    // These methods do the actual index updates.
233    // /////////////////////////////////////////////////////////////////
234
235    protected final Object indexMutex = new Object();
236        private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
237
238    private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
239        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
240
241        // Skip adding the message to the index if this is a topic and there are
242        // no subscriptions.
243        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
244            return;
245        }
246
247        // Add the message.
248        long id = sd.nextMessageId++;
249        Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
250        if( previous == null ) {
251            sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
252        } else {
253            // restore the previous value.. Looks like this was a redo of a previously
254            // added message.  We don't want to assing it a new id as the other indexes would 
255            // be wrong..
256            sd.messageIdIndex.put(tx, command.getMessageId(), previous);
257        }
258    }
259
260    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
261        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
262        if (!command.hasSubscriptionKey()) {
263            
264            // In the queue case we just remove the message from the index..
265            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
266            if (sequenceId != null) {
267                sd.orderIndex.remove(tx, sequenceId);
268            }
269        } else {
270            // In the topic case we need remove the message once it's been acked
271            // by all the subs
272            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
273
274            // Make sure it's a valid message id...
275            if (sequence != null) {
276                String subscriptionKey = command.getSubscriptionKey();
277                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
278
279                // The following method handles deleting un-referenced messages.
280                removeAckByteSequence(tx, sd, subscriptionKey, prev);
281
282                // Add it to the new location set.
283                addAckByteSequence(sd, sequence, subscriptionKey);
284            }
285
286        }
287    }
288
289    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
290        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
291        sd.orderIndex.clear(tx);
292        sd.orderIndex.unload(tx);
293        tx.free(sd.orderIndex.getPageId());
294        
295        sd.messageIdIndex.clear(tx);
296        sd.messageIdIndex.unload(tx);
297        tx.free(sd.messageIdIndex.getPageId());
298
299        if (sd.subscriptions != null) {
300            sd.subscriptions.clear(tx);
301            sd.subscriptions.unload(tx);
302            tx.free(sd.subscriptions.getPageId());
303
304            sd.subscriptionAcks.clear(tx);
305            sd.subscriptionAcks.unload(tx);
306            tx.free(sd.subscriptionAcks.getPageId());
307        }
308
309        String key = key(command.getDestination());
310        storedDestinations.remove(key);
311        destinations.remove(tx, key);
312    }
313
314    private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
315        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
316
317        // If set then we are creating it.. otherwise we are destroying the sub
318        if (command.hasSubscriptionInfo()) {
319            String subscriptionKey = command.getSubscriptionKey();
320            sd.subscriptions.put(tx, subscriptionKey, command);
321            long ackByteSequence=-1;
322            if (!command.getRetroactive()) {
323                ackByteSequence = sd.nextMessageId-1;
324            }
325
326            sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
327            addAckByteSequence(sd, ackByteSequence, subscriptionKey);
328        } else {
329            // delete the sub...
330            String subscriptionKey = command.getSubscriptionKey();
331            sd.subscriptions.remove(tx, subscriptionKey);
332            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
333            if( prev!=null ) {
334                removeAckByteSequence(tx, sd, subscriptionKey, prev);
335            }
336        }
337
338    }
339    
340    public HashSet<Integer> getJournalFilesBeingReplicated() {
341                return journalFilesBeingReplicated;
342        }
343
344    // /////////////////////////////////////////////////////////////////
345    // StoredDestination related implementation methods.
346    // /////////////////////////////////////////////////////////////////
347
348
349        private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
350
351    class StoredSubscription {
352        SubscriptionInfo subscriptionInfo;
353        String lastAckId;
354        ByteSequence lastAckByteSequence;
355        ByteSequence cursor;
356    }
357    
358    static class MessageRecord {
359        final String messageId;
360        final ByteSequence data;
361        
362        public MessageRecord(String messageId, ByteSequence location) {
363            this.messageId=messageId;
364            this.data=location;
365        }
366        
367        @Override
368        public String toString() {
369            return "["+messageId+","+data+"]";
370        }
371    }
372    
373    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> {
374        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
375        
376        public MessageRecord readPayload(DataInput dataIn) throws IOException {
377            return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
378        }
379
380        public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
381            dataOut.writeUTF(object.messageId);
382            ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
383        }
384    }
385    
386    static class StoredDestination {
387        long nextMessageId;
388        BTreeIndex<Long, MessageRecord> orderIndex;
389        BTreeIndex<String, Long> messageIdIndex;
390
391        // These bits are only set for Topics
392        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
393        BTreeIndex<String, Long> subscriptionAcks;
394        HashMap<String, Long> subscriptionCursors;
395        TreeMap<Long, HashSet<String>> ackPositions;
396    }
397
398    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
399        public Class<StoredDestination> getType() {
400            return StoredDestination.class;
401        }
402
403        public StoredDestination readPayload(DataInput dataIn) throws IOException {
404            StoredDestination value = new StoredDestination();
405            value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
406            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
407
408            if (dataIn.readBoolean()) {
409                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
410                value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
411            }
412            return value;
413        }
414
415        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
416            dataOut.writeLong(value.orderIndex.getPageId());
417            dataOut.writeLong(value.messageIdIndex.getPageId());
418            if (value.subscriptions != null) {
419                dataOut.writeBoolean(true);
420                dataOut.writeLong(value.subscriptions.getPageId());
421                dataOut.writeLong(value.subscriptionAcks.getPageId());
422            } else {
423                dataOut.writeBoolean(false);
424            }
425        }
426    }
427
428    static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> {
429        final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
430
431        public ByteSequence readPayload(DataInput dataIn) throws IOException {
432                byte data[] = new byte[dataIn.readInt()];
433                dataIn.readFully(data);
434            return new ByteSequence(data);
435        }
436
437        public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
438            dataOut.writeInt(object.getLength());
439            dataOut.write(object.getData(), object.getOffset(), object.getLength());
440        }
441    }
442
443    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
444        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
445
446        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
447            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
448            rc.mergeFramed((InputStream)dataIn);
449            return rc;
450        }
451
452        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
453            object.writeFramed((OutputStream)dataOut);
454        }
455    }
456
457    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
458        String key = key(destination);
459        StoredDestination rc = storedDestinations.get(key);
460        if (rc == null) {
461            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
462            rc = loadStoredDestination(tx, key, topic);
463            // Cache it. We may want to remove/unload destinations from the
464            // cache that are not used for a while
465            // to reduce memory usage.
466            storedDestinations.put(key, rc);
467        }
468        return rc;
469    }
470
471    /**
472     * @param tx
473     * @param key
474     * @param topic
475     * @return
476     * @throws IOException
477     */
478    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
479        // Try to load the existing indexes..
480        StoredDestination rc = destinations.get(tx, key);
481        if (rc == null) {
482            // Brand new destination.. allocate indexes for it.
483            rc = new StoredDestination();
484            rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
485            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
486
487            if (topic) {
488                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
489                rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
490            }
491            destinations.put(tx, key, rc);
492        }
493
494        // Configure the marshalers and load.
495        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
496        rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
497        rc.orderIndex.load(tx);
498
499        // Figure out the next key using the last entry in the destination.
500        Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
501        if( lastEntry!=null ) {
502            rc.nextMessageId = lastEntry.getKey()+1;
503        }
504
505        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
506        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
507        rc.messageIdIndex.load(tx);
508        
509        // If it was a topic...
510        if (topic) {
511
512            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
513            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
514            rc.subscriptions.load(tx);
515
516            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
517            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
518            rc.subscriptionAcks.load(tx);
519
520            rc.ackPositions = new TreeMap<Long, HashSet<String>>();
521            rc.subscriptionCursors = new HashMap<String, Long>();
522
523            for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
524                Entry<String, Long> entry = iterator.next();
525                addAckByteSequence(rc, entry.getValue(), entry.getKey());
526            }
527
528        }
529        return rc;
530    }
531
532    /**
533     * @param sd
534     * @param messageSequence
535     * @param subscriptionKey
536     */
537    private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
538        HashSet<String> hs = sd.ackPositions.get(messageSequence);
539        if (hs == null) {
540            hs = new HashSet<String>();
541            sd.ackPositions.put(messageSequence, hs);
542        }
543        hs.add(subscriptionKey);
544    }
545
546    /**
547     * @param tx
548     * @param sd
549     * @param subscriptionKey
550     * @param sequenceId
551     * @throws IOException
552     */
553    private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
554        // Remove the sub from the previous location set..
555        if (sequenceId != null) {
556            HashSet<String> hs = sd.ackPositions.get(sequenceId);
557            if (hs != null) {
558                hs.remove(subscriptionKey);
559                if (hs.isEmpty()) {
560                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
561                    sd.ackPositions.remove(sequenceId);
562
563                    // Did we just empty out the first set in the
564                    // ordered list of ack locations? Then it's time to
565                    // delete some messages.
566                    if (hs == firstSet) {
567
568                        // Find all the entries that need to get deleted.
569                        ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
570                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
571                            Entry<Long, MessageRecord> entry = iterator.next();
572                            if (entry.getKey().compareTo(sequenceId) <= 0) {
573                                // We don't do the actually delete while we are
574                                // iterating the BTree since
575                                // iterating would fail.
576                                deletes.add(entry);
577                            }
578                        }
579
580                        // Do the actual deletes.
581                        for (Entry<Long, MessageRecord> entry : deletes) {
582                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
583                            sd.orderIndex.remove(tx,entry.getKey());
584                        }
585                    }
586                }
587            }
588        }
589    }
590
591    private String key(KahaDestination destination) {
592        return destination.getType().getNumber() + ":" + destination.getName();
593    }
594
595    // /////////////////////////////////////////////////////////////////
596    // Transaction related implementation methods.
597    // /////////////////////////////////////////////////////////////////
598    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
599    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
600 
601    private ArrayList<Operation> getInflightTx(TransactionId key) {
602        ArrayList<Operation> tx = inflightTransactions.get(key);
603        if (tx == null) {
604            tx = new ArrayList<Operation>();
605            inflightTransactions.put(key, tx);
606        }
607        return tx;
608    }
609
610    abstract class Operation {
611        abstract public void execute(Transaction tx) throws IOException;
612    }
613
614    class AddOpperation extends Operation {
615        final KahaAddMessageCommand command;
616                private final ByteSequence data;
617
618        public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
619            this.command = command;
620                        this.data = location;
621        }
622
623        public void execute(Transaction tx) throws IOException {
624            upadateIndex(tx, command, data);
625        }
626
627        public KahaAddMessageCommand getCommand() {
628            return command;
629        }
630    }
631
632    class RemoveOpperation extends Operation {
633        final KahaRemoveMessageCommand command;
634
635        public RemoveOpperation(KahaRemoveMessageCommand command) {
636            this.command = command;
637        }
638
639        public void execute(Transaction tx) throws IOException {
640            updateIndex(tx, command);
641        }
642
643        public KahaRemoveMessageCommand getCommand() {
644            return command;
645        }
646    }
647
648    // /////////////////////////////////////////////////////////////////
649    // Initialization related implementation methods.
650    // /////////////////////////////////////////////////////////////////
651
652    private PageFile createPageFile() {
653        PageFile index = new PageFile(directory, "temp-db");
654        index.setEnableWriteThread(isEnableIndexWriteAsync());
655        index.setWriteBatchSize(getIndexWriteBatchSize());
656        index.setEnableDiskSyncs(false);
657        index.setEnableRecoveryFile(false);
658        return index;
659    }
660
661    public File getDirectory() {
662        return directory;
663    }
664
665    public void setDirectory(File directory) {
666        this.directory = directory;
667    }
668    
669    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
670        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
671    }
672
673    public int getIndexWriteBatchSize() {
674        return setIndexWriteBatchSize;
675    }
676    
677    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
678        this.enableIndexWriteAsync = enableIndexWriteAsync;
679    }
680    
681    boolean isEnableIndexWriteAsync() {
682        return enableIndexWriteAsync;
683    }
684        
685    public PageFile getPageFile() {
686        if (pageFile == null) {
687            pageFile = createPageFile();
688        }
689                return pageFile;
690        }
691
692}