001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.page;
018    
019    import java.io.*;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Collection;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.LinkedHashMap;
026    import java.util.Map;
027    import java.util.Map.Entry;
028    import java.util.Properties;
029    import java.util.TreeMap;
030    import java.util.concurrent.CountDownLatch;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicLong;
033    import java.util.zip.Adler32;
034    import java.util.zip.Checksum;
035    
036    import org.apache.kahadb.util.DataByteArrayOutputStream;
037    import org.apache.kahadb.util.IOExceptionSupport;
038    import org.apache.kahadb.util.IOHelper;
039    import org.apache.kahadb.util.IntrospectionSupport;
040    import org.apache.kahadb.util.LFUCache;
041    import org.apache.kahadb.util.LRUCache;
042    import org.apache.kahadb.util.Sequence;
043    import org.apache.kahadb.util.SequenceSet;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
049     * be externally synchronized.
050     * <p/>
051     * The file has 3 parts:
052     * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
053     * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
054     * Page Space: The pages in the page file.
055     */
056    public class PageFile {
057    
058        private static final String PAGEFILE_SUFFIX = ".data";
059        private static final String RECOVERY_FILE_SUFFIX = ".redo";
060        private static final String FREE_FILE_SUFFIX = ".free";
061    
062        // 4k Default page size.
063        public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
064        public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
065        public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
066    
067        private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
068        private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
069    
070        // Recovery header is (long offset)
071        private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
072    
073        // A PageFile will use a couple of files in this directory
074        private File directory;
075        // And the file names in that directory will be based on this name.
076        private final String name;
077    
078        // File handle used for reading pages..
079        private RandomAccessFile readFile;
080        // File handle used for writing pages..
081        private RandomAccessFile writeFile;
082        // File handle used for writing pages..
083        private RandomAccessFile recoveryFile;
084    
085        // The size of pages
086        private int pageSize = DEFAULT_PAGE_SIZE;
087    
088        // The minimum number of space allocated to the recovery file in number of pages.
089        private int recoveryFileMinPageCount = 1000;
090        // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
091        // to this max size as soon as  possible.
092        private int recoveryFileMaxPageCount = 10000;
093        // The number of pages in the current recovery buffer
094        private int recoveryPageCount;
095    
096        private AtomicBoolean loaded = new AtomicBoolean();
097        // The number of pages we are aiming to write every time we
098        // write to disk.
099        int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
100    
101        // We keep a cache of pages recently used?
102        private Map<Long, Page> pageCache;
103        // The cache of recently used pages.
104        private boolean enablePageCaching = true;
105        // How many pages will we keep in the cache?
106        private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
107    
108        // Should first log the page write to the recovery buffer? Avoids partial
109        // page write failures..
110        private boolean enableRecoveryFile = true;
111        // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
112        private boolean enableDiskSyncs = true;
113        // Will writes be done in an async thread?
114        private boolean enabledWriteThread = false;
115    
116        // These are used if enableAsyncWrites==true
117        private AtomicBoolean stopWriter = new AtomicBoolean();
118        private Thread writerThread;
119        private CountDownLatch checkpointLatch;
120    
121        // Keeps track of writes that are being written to disk.
122        private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
123    
124        // Keeps track of free pages.
125        private final AtomicLong nextFreePageId = new AtomicLong();
126        private SequenceSet freeList = new SequenceSet();
127    
128        private AtomicLong nextTxid = new AtomicLong();
129    
130        // Persistent settings stored in the page file.
131        private MetaData metaData;
132    
133        private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
134    
135        private boolean useLFRUEviction = false;
136        private float LFUEvictionFactor = 0.2f;
137    
138        /**
139         * Use to keep track of updated pages which have not yet been committed.
140         */
141        static class PageWrite {
142            Page page;
143            byte[] current;
144            byte[] diskBound;
145            long currentLocation = -1;
146            long diskBoundLocation = -1;
147            File tmpFile;
148            int length;
149    
150            public PageWrite(Page page, byte[] data) {
151                this.page = page;
152                current = data;
153            }
154    
155            public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
156                this.page = page;
157                this.currentLocation = currentLocation;
158                this.tmpFile = tmpFile;
159                this.length = length;
160            }
161    
162            public void setCurrent(Page page, byte[] data) {
163                this.page = page;
164                current = data;
165                currentLocation = -1;
166                diskBoundLocation = -1;
167            }
168    
169            public void setCurrentLocation(Page page, long location, int length) {
170                this.page = page;
171                this.currentLocation = location;
172                this.length = length;
173                this.current = null;
174            }
175    
176            @Override
177            public String toString() {
178                return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
179            }
180    
181            @SuppressWarnings("unchecked")
182            public Page getPage() {
183                return page;
184            }
185    
186            public byte[] getDiskBound() throws IOException {
187                if (diskBound == null && diskBoundLocation != -1) {
188                    diskBound = new byte[length];
189                    RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
190                    file.seek(diskBoundLocation);
191                    file.read(diskBound);
192                    file.close();
193                    diskBoundLocation = -1;
194                }
195                return diskBound;
196            }
197    
198            void begin() {
199                if (currentLocation != -1) {
200                    diskBoundLocation = currentLocation;
201                } else {
202                    diskBound = current;
203                }
204                current = null;
205                currentLocation = -1;
206            }
207    
208            /**
209             * @return true if there is no pending writes to do.
210             */
211            boolean done() {
212                diskBoundLocation = -1;
213                diskBound = null;
214                return current == null || currentLocation == -1;
215            }
216    
217            boolean isDone() {
218                return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
219            }
220        }
221    
222        /**
223         * The MetaData object hold the persistent data associated with a PageFile object.
224         */
225        public static class MetaData {
226    
227            String fileType;
228            String fileTypeVersion;
229    
230            long metaDataTxId = -1;
231            int pageSize;
232            boolean cleanShutdown;
233            long lastTxId;
234            long freePages;
235    
236            public String getFileType() {
237                return fileType;
238            }
239    
240            public void setFileType(String fileType) {
241                this.fileType = fileType;
242            }
243    
244            public String getFileTypeVersion() {
245                return fileTypeVersion;
246            }
247    
248            public void setFileTypeVersion(String version) {
249                this.fileTypeVersion = version;
250            }
251    
252            public long getMetaDataTxId() {
253                return metaDataTxId;
254            }
255    
256            public void setMetaDataTxId(long metaDataTxId) {
257                this.metaDataTxId = metaDataTxId;
258            }
259    
260            public int getPageSize() {
261                return pageSize;
262            }
263    
264            public void setPageSize(int pageSize) {
265                this.pageSize = pageSize;
266            }
267    
268            public boolean isCleanShutdown() {
269                return cleanShutdown;
270            }
271    
272            public void setCleanShutdown(boolean cleanShutdown) {
273                this.cleanShutdown = cleanShutdown;
274            }
275    
276            public long getLastTxId() {
277                return lastTxId;
278            }
279    
280            public void setLastTxId(long lastTxId) {
281                this.lastTxId = lastTxId;
282            }
283    
284            public long getFreePages() {
285                return freePages;
286            }
287    
288            public void setFreePages(long value) {
289                this.freePages = value;
290            }
291        }
292    
293        public Transaction tx() {
294            assertLoaded();
295            return new Transaction(this);
296        }
297    
298        /**
299         * Creates a PageFile in the specified directory who's data files are named by name.
300         */
301        public PageFile(File directory, String name) {
302            this.directory = directory;
303            this.name = name;
304        }
305    
306        /**
307         * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
308         *
309         * @throws IOException           if the files cannot be deleted.
310         * @throws IllegalStateException if this PageFile is loaded
311         */
312        public void delete() throws IOException {
313            if (loaded.get()) {
314                throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
315            }
316            delete(getMainPageFile());
317            delete(getFreeFile());
318            delete(getRecoveryFile());
319        }
320    
321        public void archive() throws IOException {
322            if (loaded.get()) {
323                throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
324            }
325            long timestamp = System.currentTimeMillis();
326            archive(getMainPageFile(), String.valueOf(timestamp));
327            archive(getFreeFile(), String.valueOf(timestamp));
328            archive(getRecoveryFile(), String.valueOf(timestamp));
329        }
330    
331        /**
332         * @param file
333         * @throws IOException
334         */
335        private void delete(File file) throws IOException {
336            if (file.exists() && !file.delete()) {
337                throw new IOException("Could not delete: " + file.getPath());
338            }
339        }
340    
341        private void archive(File file, String suffix) throws IOException {
342            if (file.exists()) {
343                File archive = new File(file.getPath() + "-" + suffix);
344                if (!file.renameTo(archive)) {
345                    throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
346                }
347            }
348        }
349    
350        /**
351         * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
352         * first time the page file is loaded, then this creates the page file in the file system.
353         *
354         * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
355         *                               there was a disk error.
356         * @throws IllegalStateException If the page file was already loaded.
357         */
358        public void load() throws IOException, IllegalStateException {
359            if (loaded.compareAndSet(false, true)) {
360    
361                if (enablePageCaching) {
362                    if (isUseLFRUEviction()) {
363                        pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
364                    } else {
365                        pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
366                    }
367                }
368    
369                File file = getMainPageFile();
370                IOHelper.mkdirs(file.getParentFile());
371                writeFile = new RandomAccessFile(file, "rw");
372                readFile = new RandomAccessFile(file, "r");
373    
374                if (readFile.length() > 0) {
375                    // Load the page size setting cause that can't change once the file is created.
376                    loadMetaData();
377                    pageSize = metaData.getPageSize();
378                } else {
379                    // Store the page size setting cause that can't change once the file is created.
380                    metaData = new MetaData();
381                    metaData.setFileType(PageFile.class.getName());
382                    metaData.setFileTypeVersion("1");
383                    metaData.setPageSize(getPageSize());
384                    metaData.setCleanShutdown(true);
385                    metaData.setFreePages(-1);
386                    metaData.setLastTxId(0);
387                    storeMetaData();
388                }
389    
390                if (enableRecoveryFile) {
391                    recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
392                }
393    
394                if (metaData.isCleanShutdown()) {
395                    nextTxid.set(metaData.getLastTxId() + 1);
396                    if (metaData.getFreePages() > 0) {
397                        loadFreeList();
398                    }
399                } else {
400                    LOG.debug(toString() + ", Recovering page file...");
401                    nextTxid.set(redoRecoveryUpdates());
402    
403                    // Scan all to find the free pages.
404                    freeList = new SequenceSet();
405                    for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
406                        Page page = i.next();
407                        if (page.getType() == Page.PAGE_FREE_TYPE) {
408                            freeList.add(page.getPageId());
409                        }
410                    }
411                }
412    
413                metaData.setCleanShutdown(false);
414                storeMetaData();
415                getFreeFile().delete();
416    
417                if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
418                    writeFile.setLength(PAGE_FILE_HEADER_SIZE);
419                }
420                nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
421                startWriter();
422    
423            } else {
424                throw new IllegalStateException("Cannot load the page file when it is already loaded.");
425            }
426        }
427    
428    
429        /**
430         * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
431         * once unloaded, you can no longer use the page file to read or write Pages.
432         *
433         * @throws IOException           if there was a disk error occurred while closing the down the page file.
434         * @throws IllegalStateException if the PageFile is not loaded
435         */
436        public void unload() throws IOException {
437            if (loaded.compareAndSet(true, false)) {
438                flush();
439                try {
440                    stopWriter();
441                } catch (InterruptedException e) {
442                    throw new InterruptedIOException();
443                }
444    
445                if (freeList.isEmpty()) {
446                    metaData.setFreePages(0);
447                } else {
448                    storeFreeList();
449                    metaData.setFreePages(freeList.size());
450                }
451    
452                metaData.setLastTxId(nextTxid.get() - 1);
453                metaData.setCleanShutdown(true);
454                storeMetaData();
455    
456                if (readFile != null) {
457                    readFile.close();
458                    readFile = null;
459                    writeFile.close();
460                    writeFile = null;
461                    if (enableRecoveryFile) {
462                        recoveryFile.close();
463                        recoveryFile = null;
464                    }
465                    freeList.clear();
466                    if (pageCache != null) {
467                        pageCache = null;
468                    }
469                    synchronized (writes) {
470                        writes.clear();
471                    }
472                }
473            } else {
474                throw new IllegalStateException("Cannot unload the page file when it is not loaded");
475            }
476        }
477    
478        public boolean isLoaded() {
479            return loaded.get();
480        }
481    
482        /**
483         * Flush and sync all write buffers to disk.
484         *
485         * @throws IOException If an disk error occurred.
486         */
487        public void flush() throws IOException {
488    
489            if (enabledWriteThread && stopWriter.get()) {
490                throw new IOException("Page file already stopped: checkpointing is not allowed");
491            }
492    
493            // Setup a latch that gets notified when all buffered writes hits the disk.
494            CountDownLatch checkpointLatch;
495            synchronized (writes) {
496                if (writes.isEmpty()) {
497                    return;
498                }
499                if (enabledWriteThread) {
500                    if (this.checkpointLatch == null) {
501                        this.checkpointLatch = new CountDownLatch(1);
502                    }
503                    checkpointLatch = this.checkpointLatch;
504                    writes.notify();
505                } else {
506                    writeBatch();
507                    return;
508                }
509            }
510            try {
511                checkpointLatch.await();
512            } catch (InterruptedException e) {
513                InterruptedIOException ioe = new InterruptedIOException();
514                ioe.initCause(e);
515                throw ioe;
516            }
517        }
518    
519    
520        public String toString() {
521            return "Page File: " + getMainPageFile();
522        }
523    
524        ///////////////////////////////////////////////////////////////////
525        // Private Implementation Methods
526        ///////////////////////////////////////////////////////////////////
527        private File getMainPageFile() {
528            return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
529        }
530    
531        public File getFreeFile() {
532            return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
533        }
534    
535        public File getRecoveryFile() {
536            return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
537        }
538    
539        public long toOffset(long pageId) {
540            return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
541        }
542    
543        private void loadMetaData() throws IOException {
544    
545            ByteArrayInputStream is;
546            MetaData v1 = new MetaData();
547            MetaData v2 = new MetaData();
548            try {
549                Properties p = new Properties();
550                byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
551                readFile.seek(0);
552                readFile.readFully(d);
553                is = new ByteArrayInputStream(d);
554                p.load(is);
555                IntrospectionSupport.setProperties(v1, p);
556            } catch (IOException e) {
557                v1 = null;
558            }
559    
560            try {
561                Properties p = new Properties();
562                byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
563                readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
564                readFile.readFully(d);
565                is = new ByteArrayInputStream(d);
566                p.load(is);
567                IntrospectionSupport.setProperties(v2, p);
568            } catch (IOException e) {
569                v2 = null;
570            }
571    
572            if (v1 == null && v2 == null) {
573                throw new IOException("Could not load page file meta data");
574            }
575    
576            if (v1 == null || v1.metaDataTxId < 0) {
577                metaData = v2;
578            } else if (v2 == null || v1.metaDataTxId < 0) {
579                metaData = v1;
580            } else if (v1.metaDataTxId == v2.metaDataTxId) {
581                metaData = v1; // use the first since the 2nd could be a partial..
582            } else {
583                metaData = v2; // use the second cause the first is probably a partial.
584            }
585        }
586    
587        private void storeMetaData() throws IOException {
588            // Convert the metadata into a property format
589            metaData.metaDataTxId++;
590            Properties p = new Properties();
591            IntrospectionSupport.getProperties(metaData, p, null);
592    
593            ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
594            p.store(os, "");
595            if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
596                throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
597            }
598            // Fill the rest with space...
599            byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
600            Arrays.fill(filler, (byte) ' ');
601            os.write(filler);
602            os.flush();
603    
604            byte[] d = os.toByteArray();
605    
606            // So we don't loose it.. write it 2 times...
607            writeFile.seek(0);
608            writeFile.write(d);
609            writeFile.getFD().sync();
610            writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
611            writeFile.write(d);
612            writeFile.getFD().sync();
613        }
614    
615        private void storeFreeList() throws IOException {
616            FileOutputStream os = new FileOutputStream(getFreeFile());
617            DataOutputStream dos = new DataOutputStream(os);
618            SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
619            dos.close();
620        }
621    
622        private void loadFreeList() throws IOException {
623            freeList.clear();
624            FileInputStream is = new FileInputStream(getFreeFile());
625            DataInputStream dis = new DataInputStream(is);
626            freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
627            dis.close();
628        }
629    
630        ///////////////////////////////////////////////////////////////////
631        // Property Accessors
632        ///////////////////////////////////////////////////////////////////
633    
634        /**
635         * Is the recovery buffer used to double buffer page writes.  Enabled by default.
636         *
637         * @return is the recovery buffer enabled.
638         */
639        public boolean isEnableRecoveryFile() {
640            return enableRecoveryFile;
641        }
642    
643        /**
644         * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
645         * may potentially cause partial page writes which can lead to page file corruption.
646         */
647        public void setEnableRecoveryFile(boolean doubleBuffer) {
648            assertNotLoaded();
649            this.enableRecoveryFile = doubleBuffer;
650        }
651    
652        /**
653         * @return Are page writes synced to disk?
654         */
655        public boolean isEnableDiskSyncs() {
656            return enableDiskSyncs;
657        }
658    
659        /**
660         * Allows you enable syncing writes to disk.
661         */
662        public void setEnableDiskSyncs(boolean syncWrites) {
663            assertNotLoaded();
664            this.enableDiskSyncs = syncWrites;
665        }
666    
667        /**
668         * @return the page size
669         */
670        public int getPageSize() {
671            return this.pageSize;
672        }
673    
674        /**
675         * @return the amount of content data that a page can hold.
676         */
677        public int getPageContentSize() {
678            return this.pageSize - Page.PAGE_HEADER_SIZE;
679        }
680    
681        /**
682         * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
683         * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
684         * can no longer be changed.
685         *
686         * @param pageSize the pageSize to set
687         * @throws IllegalStateException once the page file is loaded.
688         */
689        public void setPageSize(int pageSize) throws IllegalStateException {
690            assertNotLoaded();
691            this.pageSize = pageSize;
692        }
693    
694        /**
695         * @return true if read page caching is enabled
696         */
697        public boolean isEnablePageCaching() {
698            return this.enablePageCaching;
699        }
700    
701        /**
702         * @param enablePageCaching allows you to enable read page caching
703         */
704        public void setEnablePageCaching(boolean enablePageCaching) {
705            assertNotLoaded();
706            this.enablePageCaching = enablePageCaching;
707        }
708    
709        /**
710         * @return the maximum number of pages that will get stored in the read page cache.
711         */
712        public int getPageCacheSize() {
713            return this.pageCacheSize;
714        }
715    
716        /**
717         * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
718         */
719        public void setPageCacheSize(int pageCacheSize) {
720            assertNotLoaded();
721            this.pageCacheSize = pageCacheSize;
722        }
723    
724        public boolean isEnabledWriteThread() {
725            return enabledWriteThread;
726        }
727    
728        public void setEnableWriteThread(boolean enableAsyncWrites) {
729            assertNotLoaded();
730            this.enabledWriteThread = enableAsyncWrites;
731        }
732    
733        public long getDiskSize() throws IOException {
734            return toOffset(nextFreePageId.get());
735        }
736    
737        /**
738         * @return the number of pages allocated in the PageFile
739         */
740        public long getPageCount() {
741            return nextFreePageId.get();
742        }
743    
744        public int getRecoveryFileMinPageCount() {
745            return recoveryFileMinPageCount;
746        }
747    
748        public long getFreePageCount() {
749            assertLoaded();
750            return freeList.rangeSize();
751        }
752    
753        public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
754            assertNotLoaded();
755            this.recoveryFileMinPageCount = recoveryFileMinPageCount;
756        }
757    
758        public int getRecoveryFileMaxPageCount() {
759            return recoveryFileMaxPageCount;
760        }
761    
762        public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
763            assertNotLoaded();
764            this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
765        }
766    
767        public int getWriteBatchSize() {
768            return writeBatchSize;
769        }
770    
771        public void setWriteBatchSize(int writeBatchSize) {
772            this.writeBatchSize = writeBatchSize;
773        }
774    
775        public float getLFUEvictionFactor() {
776            return LFUEvictionFactor;
777        }
778    
779        public void setLFUEvictionFactor(float LFUEvictionFactor) {
780            this.LFUEvictionFactor = LFUEvictionFactor;
781        }
782    
783        public boolean isUseLFRUEviction() {
784            return useLFRUEviction;
785        }
786    
787        public void setUseLFRUEviction(boolean useLFRUEviction) {
788            this.useLFRUEviction = useLFRUEviction;
789        }
790    
791        ///////////////////////////////////////////////////////////////////
792        // Package Protected Methods exposed to Transaction
793        ///////////////////////////////////////////////////////////////////
794    
795        /**
796         * @throws IllegalStateException if the page file is not loaded.
797         */
798        void assertLoaded() throws IllegalStateException {
799            if (!loaded.get()) {
800                throw new IllegalStateException("PageFile is not loaded");
801            }
802        }
803    
804        void assertNotLoaded() throws IllegalStateException {
805            if (loaded.get()) {
806                throw new IllegalStateException("PageFile is loaded");
807            }
808        }
809    
810        /**
811         * Allocates a block of free pages that you can write data to.
812         *
813         * @param count the number of sequential pages to allocate
814         * @return the first page of the sequential set.
815         * @throws IOException           If an disk error occurred.
816         * @throws IllegalStateException if the PageFile is not loaded
817         */
818        <T> Page<T> allocate(int count) throws IOException {
819            assertLoaded();
820            if (count <= 0) {
821                throw new IllegalArgumentException("The allocation count must be larger than zero");
822            }
823    
824            Sequence seq = freeList.removeFirstSequence(count);
825    
826            // We may need to create new free pages...
827            if (seq == null) {
828    
829                Page<T> first = null;
830                int c = count;
831    
832                // Perform the id's only once....
833                long pageId = nextFreePageId.getAndAdd(count);
834                long writeTxnId = nextTxid.getAndAdd(count);
835    
836                while (c-- > 0) {
837                    Page<T> page = new Page<T>(pageId++);
838                    page.makeFree(writeTxnId++);
839    
840                    if (first == null) {
841                        first = page;
842                    }
843    
844                    addToCache(page);
845                    DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
846                    page.write(out);
847                    write(page, out.getData());
848    
849                    // LOG.debug("allocate writing: "+page.getPageId());
850                }
851    
852                return first;
853            }
854    
855            Page<T> page = new Page<T>(seq.getFirst());
856            page.makeFree(0);
857            // LOG.debug("allocated: "+page.getPageId());
858            return page;
859        }
860    
861        long getNextWriteTransactionId() {
862            return nextTxid.incrementAndGet();
863        }
864    
865        void readPage(long pageId, byte[] data) throws IOException {
866            readFile.seek(toOffset(pageId));
867            readFile.readFully(data);
868        }
869    
870        public void freePage(long pageId) {
871            freeList.add(pageId);
872            removeFromCache(pageId);
873        }
874    
875        @SuppressWarnings("unchecked")
876        private <T> void write(Page<T> page, byte[] data) throws IOException {
877            final PageWrite write = new PageWrite(page, data);
878            Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
879                public Long getKey() {
880                    return write.getPage().getPageId();
881                }
882    
883                public PageWrite getValue() {
884                    return write;
885                }
886    
887                public PageWrite setValue(PageWrite value) {
888                    return null;
889                }
890            };
891            Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
892            write(Arrays.asList(entries));
893        }
894    
895        void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
896            synchronized (writes) {
897                if (enabledWriteThread) {
898                    while (writes.size() >= writeBatchSize && !stopWriter.get()) {
899                        try {
900                            writes.wait();
901                        } catch (InterruptedException e) {
902                            Thread.currentThread().interrupt();
903                            throw new InterruptedIOException();
904                        }
905                    }
906                }
907    
908                boolean longTx = false;
909    
910                for (Map.Entry<Long, PageWrite> entry : updates) {
911                    Long key = entry.getKey();
912                    PageWrite value = entry.getValue();
913                    PageWrite write = writes.get(key);
914                    if (write == null) {
915                        writes.put(key, value);
916                    } else {
917                        if (value.currentLocation != -1) {
918                            write.setCurrentLocation(value.page, value.currentLocation, value.length);
919                            write.tmpFile = value.tmpFile;
920                            longTx = true;
921                        } else {
922                            write.setCurrent(value.page, value.current);
923                        }
924                    }
925                }
926    
927                // Once we start approaching capacity, notify the writer to start writing
928                // sync immediately for long txs
929                if (longTx || canStartWriteBatch()) {
930    
931                    if (enabledWriteThread) {
932                        writes.notify();
933                    } else {
934                        writeBatch();
935                    }
936                }
937            }
938        }
939    
940        private boolean canStartWriteBatch() {
941            int capacityUsed = ((writes.size() * 100) / writeBatchSize);
942            if (enabledWriteThread) {
943                // The constant 10 here controls how soon write batches start going to disk..
944                // would be nice to figure out how to auto tune that value.  Make to small and
945                // we reduce through put because we are locking the write mutex too often doing writes
946                return capacityUsed >= 10 || checkpointLatch != null;
947            } else {
948                return capacityUsed >= 80 || checkpointLatch != null;
949            }
950        }
951    
952        ///////////////////////////////////////////////////////////////////
953        // Cache Related operations
954        ///////////////////////////////////////////////////////////////////
955        @SuppressWarnings("unchecked")
956        <T> Page<T> getFromCache(long pageId) {
957            synchronized (writes) {
958                PageWrite pageWrite = writes.get(pageId);
959                if (pageWrite != null) {
960                    return pageWrite.page;
961                }
962            }
963    
964            Page<T> result = null;
965            if (enablePageCaching) {
966                result = pageCache.get(pageId);
967            }
968            return result;
969        }
970    
971        void addToCache(Page page) {
972            if (enablePageCaching) {
973                pageCache.put(page.getPageId(), page);
974            }
975        }
976    
977        void removeFromCache(long pageId) {
978            if (enablePageCaching) {
979                pageCache.remove(pageId);
980            }
981        }
982    
983        ///////////////////////////////////////////////////////////////////
984        // Internal Double write implementation follows...
985        ///////////////////////////////////////////////////////////////////
986    
987        private void pollWrites() {
988            try {
989                while (!stopWriter.get()) {
990                    // Wait for a notification...
991                    synchronized (writes) {
992                        writes.notifyAll();
993    
994                        // If there is not enough to write, wait for a notification...
995                        while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
996                            writes.wait(100);
997                        }
998    
999                        if (writes.isEmpty()) {
1000                            releaseCheckpointWaiter();
1001                        }
1002                    }
1003                    writeBatch();
1004                }
1005            } catch (Throwable e) {
1006                LOG.info("An exception was raised while performing poll writes", e);
1007            } finally {
1008                releaseCheckpointWaiter();
1009            }
1010        }
1011    
1012        private void writeBatch() throws IOException {
1013    
1014            CountDownLatch checkpointLatch;
1015            ArrayList<PageWrite> batch;
1016            synchronized (writes) {
1017                // If there is not enough to write, wait for a notification...
1018    
1019                batch = new ArrayList<PageWrite>(writes.size());
1020                // build a write batch from the current write cache.
1021                for (PageWrite write : writes.values()) {
1022                    batch.add(write);
1023                    // Move the current write to the diskBound write, this lets folks update the
1024                    // page again without blocking for this write.
1025                    write.begin();
1026                    if (write.diskBound == null && write.diskBoundLocation == -1) {
1027                        batch.remove(write);
1028                    }
1029                }
1030    
1031                // Grab on to the existing checkpoint latch cause once we do this write we can
1032                // release the folks that were waiting for those writes to hit disk.
1033                checkpointLatch = this.checkpointLatch;
1034                this.checkpointLatch = null;
1035            }
1036    
1037            Checksum checksum = new Adler32();
1038            if (enableRecoveryFile) {
1039                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1040            }
1041            for (PageWrite w : batch) {
1042                if (enableRecoveryFile) {
1043                    try {
1044                        checksum.update(w.getDiskBound(), 0, pageSize);
1045                    } catch (Throwable t) {
1046                        throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1047                    }
1048                    recoveryFile.writeLong(w.page.getPageId());
1049                    recoveryFile.write(w.getDiskBound(), 0, pageSize);
1050                }
1051    
1052                writeFile.seek(toOffset(w.page.getPageId()));
1053                writeFile.write(w.getDiskBound(), 0, pageSize);
1054                w.done();
1055            }
1056    
1057            try {
1058                if (enableRecoveryFile) {
1059                    // Can we shrink the recovery buffer??
1060                    if (recoveryPageCount > recoveryFileMaxPageCount) {
1061                        int t = Math.max(recoveryFileMinPageCount, batch.size());
1062                        recoveryFile.setLength(recoveryFileSizeForPages(t));
1063                    }
1064    
1065                    // Record the page writes in the recovery buffer.
1066                    recoveryFile.seek(0);
1067                    // Store the next tx id...
1068                    recoveryFile.writeLong(nextTxid.get());
1069                    // Store the checksum for thw write batch so that on recovery we
1070                    // know if we have a consistent
1071                    // write batch on disk.
1072                    recoveryFile.writeLong(checksum.getValue());
1073                    // Write the # of pages that will follow
1074                    recoveryFile.writeInt(batch.size());
1075                }
1076    
1077                if (enableDiskSyncs) {
1078                    // Sync to make sure recovery buffer writes land on disk..
1079                    if (enableRecoveryFile) {
1080                        recoveryFile.getFD().sync();
1081                    }
1082                    writeFile.getFD().sync();
1083                }
1084            } finally {
1085                synchronized (writes) {
1086                    for (PageWrite w : batch) {
1087                        // If there are no more pending writes, then remove it from
1088                        // the write cache.
1089                        if (w.isDone()) {
1090                            writes.remove(w.page.getPageId());
1091                            if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1092                                if (!w.tmpFile.delete()) {
1093                                    throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1094                                }
1095                                tmpFilesForRemoval.remove(w.tmpFile);
1096                            }
1097                        }
1098                    }
1099                }
1100    
1101                if (checkpointLatch != null) {
1102                    checkpointLatch.countDown();
1103                }
1104            }
1105        }
1106    
1107        public void removeTmpFile(File file) {
1108            tmpFilesForRemoval.add(file);
1109        }
1110    
1111        private long recoveryFileSizeForPages(int pageCount) {
1112            return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1113        }
1114    
1115        private void releaseCheckpointWaiter() {
1116            if (checkpointLatch != null) {
1117                checkpointLatch.countDown();
1118                checkpointLatch = null;
1119            }
1120        }
1121    
1122        /**
1123         * Inspects the recovery buffer and re-applies any
1124         * partially applied page writes.
1125         *
1126         * @return the next transaction id that can be used.
1127         */
1128        private long redoRecoveryUpdates() throws IOException {
1129            if (!enableRecoveryFile) {
1130                return 0;
1131            }
1132            recoveryPageCount = 0;
1133    
1134            // Are we initializing the recovery file?
1135            if (recoveryFile.length() == 0) {
1136                // Write an empty header..
1137                recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1138                // Preallocate the minium size for better performance.
1139                recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1140                return 0;
1141            }
1142    
1143            // How many recovery pages do we have in the recovery buffer?
1144            recoveryFile.seek(0);
1145            long nextTxId = recoveryFile.readLong();
1146            long expectedChecksum = recoveryFile.readLong();
1147            int pageCounter = recoveryFile.readInt();
1148    
1149            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1150            Checksum checksum = new Adler32();
1151            LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1152            try {
1153                for (int i = 0; i < pageCounter; i++) {
1154                    long offset = recoveryFile.readLong();
1155                    byte[] data = new byte[pageSize];
1156                    if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1157                        // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1158                        return nextTxId;
1159                    }
1160                    checksum.update(data, 0, pageSize);
1161                    batch.put(offset, data);
1162                }
1163            } catch (Exception e) {
1164                // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1165                // as the pages should still be consistent.
1166                LOG.debug("Redo buffer was not fully intact: ", e);
1167                return nextTxId;
1168            }
1169    
1170            recoveryPageCount = pageCounter;
1171    
1172            // If the checksum is not valid then the recovery buffer was partially written to disk.
1173            if (checksum.getValue() != expectedChecksum) {
1174                return nextTxId;
1175            }
1176    
1177            // Re-apply all the writes in the recovery buffer.
1178            for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1179                writeFile.seek(toOffset(e.getKey()));
1180                writeFile.write(e.getValue());
1181            }
1182    
1183            // And sync it to disk
1184            writeFile.getFD().sync();
1185            return nextTxId;
1186        }
1187    
1188        private void startWriter() {
1189            synchronized (writes) {
1190                if (enabledWriteThread) {
1191                    stopWriter.set(false);
1192                    writerThread = new Thread("KahaDB Page Writer") {
1193                        @Override
1194                        public void run() {
1195                            pollWrites();
1196                        }
1197                    };
1198                    writerThread.setPriority(Thread.MAX_PRIORITY);
1199                    writerThread.setDaemon(true);
1200                    writerThread.start();
1201                }
1202            }
1203        }
1204    
1205        private void stopWriter() throws InterruptedException {
1206            if (enabledWriteThread) {
1207                stopWriter.set(true);
1208                writerThread.join();
1209            }
1210        }
1211    
1212        public File getFile() {
1213            return getMainPageFile();
1214        }
1215    
1216        public File getDirectory() {
1217            return directory;
1218        }
1219    }