001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.page;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.io.RandomAccessFile;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.LinkedHashMap;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Properties;
038import java.util.TreeMap;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.zip.Adler32;
043import java.util.zip.Checksum;
044
045import org.apache.activemq.store.kahadb.disk.util.Sequence;
046import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
047import org.apache.activemq.util.DataByteArrayOutputStream;
048import org.apache.activemq.util.IOExceptionSupport;
049import org.apache.activemq.util.IOHelper;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.apache.activemq.util.LFUCache;
052import org.apache.activemq.util.LRUCache;
053import org.apache.activemq.util.RecoverableRandomAccessFile;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
059 * be externally synchronized.
060 * <p/>
061 * The file has 3 parts:
062 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
063 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
064 * Page Space: The pages in the page file.
065 */
066public class PageFile {
067
068    private static final String PAGEFILE_SUFFIX = ".data";
069    private static final String RECOVERY_FILE_SUFFIX = ".redo";
070    private static final String FREE_FILE_SUFFIX = ".free";
071
072    // 4k Default page size.
073    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
074    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
075    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
076
077    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
078    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
079
080    // Recovery header is (long offset)
081    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
082
083    // A PageFile will use a couple of files in this directory
084    private final File directory;
085    // And the file names in that directory will be based on this name.
086    private final String name;
087
088    // File handle used for reading pages..
089    private RecoverableRandomAccessFile readFile;
090    // File handle used for writing pages..
091    private RecoverableRandomAccessFile writeFile;
092    // File handle used for writing pages..
093    private RecoverableRandomAccessFile recoveryFile;
094
095    // The size of pages
096    private int pageSize = DEFAULT_PAGE_SIZE;
097
098    // The minimum number of space allocated to the recovery file in number of pages.
099    private int recoveryFileMinPageCount = 1000;
100    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
101    // to this max size as soon as  possible.
102    private int recoveryFileMaxPageCount = 10000;
103    // The number of pages in the current recovery buffer
104    private int recoveryPageCount;
105
106    private final AtomicBoolean loaded = new AtomicBoolean();
107    // The number of pages we are aiming to write every time we
108    // write to disk.
109    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
110
111    // We keep a cache of pages recently used?
112    private Map<Long, Page> pageCache;
113    // The cache of recently used pages.
114    private boolean enablePageCaching = true;
115    // How many pages will we keep in the cache?
116    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
117
118    // Should first log the page write to the recovery buffer? Avoids partial
119    // page write failures..
120    private boolean enableRecoveryFile = true;
121    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
122    private boolean enableDiskSyncs = true;
123    // Will writes be done in an async thread?
124    private boolean enabledWriteThread = false;
125
126    // These are used if enableAsyncWrites==true
127    private final AtomicBoolean stopWriter = new AtomicBoolean();
128    private Thread writerThread;
129    private CountDownLatch checkpointLatch;
130
131    // Keeps track of writes that are being written to disk.
132    private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
133
134    // Keeps track of free pages.
135    private final AtomicLong nextFreePageId = new AtomicLong();
136    private SequenceSet freeList = new SequenceSet();
137
138    private final AtomicLong nextTxid = new AtomicLong();
139
140    // Persistent settings stored in the page file.
141    private MetaData metaData;
142
143    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
144
145    private boolean useLFRUEviction = false;
146    private float LFUEvictionFactor = 0.2f;
147
148    /**
149     * Use to keep track of updated pages which have not yet been committed.
150     */
151    static class PageWrite {
152        Page page;
153        byte[] current;
154        byte[] diskBound;
155        long currentLocation = -1;
156        long diskBoundLocation = -1;
157        File tmpFile;
158        int length;
159
160        public PageWrite(Page page, byte[] data) {
161            this.page = page;
162            current = data;
163        }
164
165        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
166            this.page = page;
167            this.currentLocation = currentLocation;
168            this.tmpFile = tmpFile;
169            this.length = length;
170        }
171
172        public void setCurrent(Page page, byte[] data) {
173            this.page = page;
174            current = data;
175            currentLocation = -1;
176            diskBoundLocation = -1;
177        }
178
179        public void setCurrentLocation(Page page, long location, int length) {
180            this.page = page;
181            this.currentLocation = location;
182            this.length = length;
183            this.current = null;
184        }
185
186        @Override
187        public String toString() {
188            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
189        }
190
191        @SuppressWarnings("unchecked")
192        public Page getPage() {
193            return page;
194        }
195
196        public byte[] getDiskBound() throws IOException {
197            if (diskBound == null && diskBoundLocation != -1) {
198                diskBound = new byte[length];
199                try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
200                    file.seek(diskBoundLocation);
201                    file.read(diskBound);
202                }
203                diskBoundLocation = -1;
204            }
205            return diskBound;
206        }
207
208        void begin() {
209            if (currentLocation != -1) {
210                diskBoundLocation = currentLocation;
211            } else {
212                diskBound = current;
213            }
214            current = null;
215            currentLocation = -1;
216        }
217
218        /**
219         * @return true if there is no pending writes to do.
220         */
221        boolean done() {
222            diskBoundLocation = -1;
223            diskBound = null;
224            return current == null || currentLocation == -1;
225        }
226
227        boolean isDone() {
228            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
229        }
230    }
231
232    /**
233     * The MetaData object hold the persistent data associated with a PageFile object.
234     */
235    public static class MetaData {
236
237        String fileType;
238        String fileTypeVersion;
239
240        long metaDataTxId = -1;
241        int pageSize;
242        boolean cleanShutdown;
243        long lastTxId;
244        long freePages;
245
246        public String getFileType() {
247            return fileType;
248        }
249
250        public void setFileType(String fileType) {
251            this.fileType = fileType;
252        }
253
254        public String getFileTypeVersion() {
255            return fileTypeVersion;
256        }
257
258        public void setFileTypeVersion(String version) {
259            this.fileTypeVersion = version;
260        }
261
262        public long getMetaDataTxId() {
263            return metaDataTxId;
264        }
265
266        public void setMetaDataTxId(long metaDataTxId) {
267            this.metaDataTxId = metaDataTxId;
268        }
269
270        public int getPageSize() {
271            return pageSize;
272        }
273
274        public void setPageSize(int pageSize) {
275            this.pageSize = pageSize;
276        }
277
278        public boolean isCleanShutdown() {
279            return cleanShutdown;
280        }
281
282        public void setCleanShutdown(boolean cleanShutdown) {
283            this.cleanShutdown = cleanShutdown;
284        }
285
286        public long getLastTxId() {
287            return lastTxId;
288        }
289
290        public void setLastTxId(long lastTxId) {
291            this.lastTxId = lastTxId;
292        }
293
294        public long getFreePages() {
295            return freePages;
296        }
297
298        public void setFreePages(long value) {
299            this.freePages = value;
300        }
301    }
302
303    public Transaction tx() {
304        assertLoaded();
305        return new Transaction(this);
306    }
307
308    /**
309     * Creates a PageFile in the specified directory who's data files are named by name.
310     */
311    public PageFile(File directory, String name) {
312        this.directory = directory;
313        this.name = name;
314    }
315
316    /**
317     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
318     *
319     * @throws IOException           if the files cannot be deleted.
320     * @throws IllegalStateException if this PageFile is loaded
321     */
322    public void delete() throws IOException {
323        if (loaded.get()) {
324            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
325        }
326        delete(getMainPageFile());
327        delete(getFreeFile());
328        delete(getRecoveryFile());
329    }
330
331    public void archive() throws IOException {
332        if (loaded.get()) {
333            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
334        }
335        long timestamp = System.currentTimeMillis();
336        archive(getMainPageFile(), String.valueOf(timestamp));
337        archive(getFreeFile(), String.valueOf(timestamp));
338        archive(getRecoveryFile(), String.valueOf(timestamp));
339    }
340
341    /**
342     * @param file
343     * @throws IOException
344     */
345    private void delete(File file) throws IOException {
346        if (file.exists() && !file.delete()) {
347            throw new IOException("Could not delete: " + file.getPath());
348        }
349    }
350
351    private void archive(File file, String suffix) throws IOException {
352        if (file.exists()) {
353            File archive = new File(file.getPath() + "-" + suffix);
354            if (!file.renameTo(archive)) {
355                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
356            }
357        }
358    }
359
360    /**
361     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
362     * first time the page file is loaded, then this creates the page file in the file system.
363     *
364     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
365     *                               there was a disk error.
366     * @throws IllegalStateException If the page file was already loaded.
367     */
368    public void load() throws IOException, IllegalStateException {
369        if (loaded.compareAndSet(false, true)) {
370
371            if (enablePageCaching) {
372                if (isUseLFRUEviction()) {
373                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
374                } else {
375                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
376                }
377            }
378
379            File file = getMainPageFile();
380            IOHelper.mkdirs(file.getParentFile());
381            writeFile = new RecoverableRandomAccessFile(file, "rw");
382            readFile = new RecoverableRandomAccessFile(file, "r");
383
384            if (readFile.length() > 0) {
385                // Load the page size setting cause that can't change once the file is created.
386                loadMetaData();
387                pageSize = metaData.getPageSize();
388            } else {
389                // Store the page size setting cause that can't change once the file is created.
390                metaData = new MetaData();
391                metaData.setFileType(PageFile.class.getName());
392                metaData.setFileTypeVersion("1");
393                metaData.setPageSize(getPageSize());
394                metaData.setCleanShutdown(true);
395                metaData.setFreePages(-1);
396                metaData.setLastTxId(0);
397                storeMetaData();
398            }
399
400            if (enableRecoveryFile) {
401                recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
402            }
403
404            if (metaData.isCleanShutdown()) {
405                nextTxid.set(metaData.getLastTxId() + 1);
406                if (metaData.getFreePages() > 0) {
407                    loadFreeList();
408                }
409            } else {
410                LOG.debug(toString() + ", Recovering page file...");
411                nextTxid.set(redoRecoveryUpdates());
412
413                // Scan all to find the free pages.
414                freeList = new SequenceSet();
415                for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
416                    Page page = i.next();
417                    if (page.getType() == Page.PAGE_FREE_TYPE) {
418                        freeList.add(page.getPageId());
419                    }
420                }
421            }
422
423            metaData.setCleanShutdown(false);
424            storeMetaData();
425            getFreeFile().delete();
426
427            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
428                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
429            }
430            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
431            startWriter();
432
433        } else {
434            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
435        }
436    }
437
438
439    /**
440     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
441     * once unloaded, you can no longer use the page file to read or write Pages.
442     *
443     * @throws IOException           if there was a disk error occurred while closing the down the page file.
444     * @throws IllegalStateException if the PageFile is not loaded
445     */
446    public void unload() throws IOException {
447        if (loaded.compareAndSet(true, false)) {
448            flush();
449            try {
450                stopWriter();
451            } catch (InterruptedException e) {
452                throw new InterruptedIOException();
453            }
454
455            if (freeList.isEmpty()) {
456                metaData.setFreePages(0);
457            } else {
458                storeFreeList();
459                metaData.setFreePages(freeList.size());
460            }
461
462            metaData.setLastTxId(nextTxid.get() - 1);
463            metaData.setCleanShutdown(true);
464            storeMetaData();
465
466            if (readFile != null) {
467                readFile.close();
468                readFile = null;
469                writeFile.close();
470                writeFile = null;
471                if (enableRecoveryFile) {
472                    recoveryFile.close();
473                    recoveryFile = null;
474                }
475                freeList.clear();
476                if (pageCache != null) {
477                    pageCache = null;
478                }
479                synchronized (writes) {
480                    writes.clear();
481                }
482            }
483        } else {
484            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
485        }
486    }
487
488    public boolean isLoaded() {
489        return loaded.get();
490    }
491
492    /**
493     * Flush and sync all write buffers to disk.
494     *
495     * @throws IOException If an disk error occurred.
496     */
497    public void flush() throws IOException {
498
499        if (enabledWriteThread && stopWriter.get()) {
500            throw new IOException("Page file already stopped: checkpointing is not allowed");
501        }
502
503        // Setup a latch that gets notified when all buffered writes hits the disk.
504        CountDownLatch checkpointLatch;
505        synchronized (writes) {
506            if (writes.isEmpty()) {
507                return;
508            }
509            if (enabledWriteThread) {
510                if (this.checkpointLatch == null) {
511                    this.checkpointLatch = new CountDownLatch(1);
512                }
513                checkpointLatch = this.checkpointLatch;
514                writes.notify();
515            } else {
516                writeBatch();
517                return;
518            }
519        }
520        try {
521            checkpointLatch.await();
522        } catch (InterruptedException e) {
523            InterruptedIOException ioe = new InterruptedIOException();
524            ioe.initCause(e);
525            throw ioe;
526        }
527    }
528
529
530    @Override
531    public String toString() {
532        return "Page File: " + getMainPageFile();
533    }
534
535    ///////////////////////////////////////////////////////////////////
536    // Private Implementation Methods
537    ///////////////////////////////////////////////////////////////////
538    private File getMainPageFile() {
539        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
540    }
541
542    public File getFreeFile() {
543        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
544    }
545
546    public File getRecoveryFile() {
547        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
548    }
549
550    public long toOffset(long pageId) {
551        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
552    }
553
554    private void loadMetaData() throws IOException {
555
556        ByteArrayInputStream is;
557        MetaData v1 = new MetaData();
558        MetaData v2 = new MetaData();
559        try {
560            Properties p = new Properties();
561            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
562            readFile.seek(0);
563            readFile.readFully(d);
564            is = new ByteArrayInputStream(d);
565            p.load(is);
566            IntrospectionSupport.setProperties(v1, p);
567        } catch (IOException e) {
568            v1 = null;
569        }
570
571        try {
572            Properties p = new Properties();
573            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
574            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
575            readFile.readFully(d);
576            is = new ByteArrayInputStream(d);
577            p.load(is);
578            IntrospectionSupport.setProperties(v2, p);
579        } catch (IOException e) {
580            v2 = null;
581        }
582
583        if (v1 == null && v2 == null) {
584            throw new IOException("Could not load page file meta data");
585        }
586
587        if (v1 == null || v1.metaDataTxId < 0) {
588            metaData = v2;
589        } else if (v2 == null || v1.metaDataTxId < 0) {
590            metaData = v1;
591        } else if (v1.metaDataTxId == v2.metaDataTxId) {
592            metaData = v1; // use the first since the 2nd could be a partial..
593        } else {
594            metaData = v2; // use the second cause the first is probably a partial.
595        }
596    }
597
598    private void storeMetaData() throws IOException {
599        // Convert the metadata into a property format
600        metaData.metaDataTxId++;
601        Properties p = new Properties();
602        IntrospectionSupport.getProperties(metaData, p, null);
603
604        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
605        p.store(os, "");
606        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
607            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
608        }
609        // Fill the rest with space...
610        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
611        Arrays.fill(filler, (byte) ' ');
612        os.write(filler);
613        os.flush();
614
615        byte[] d = os.toByteArray();
616
617        // So we don't loose it.. write it 2 times...
618        writeFile.seek(0);
619        writeFile.write(d);
620        writeFile.sync();
621        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
622        writeFile.write(d);
623        writeFile.sync();
624    }
625
626    private void storeFreeList() throws IOException {
627        FileOutputStream os = new FileOutputStream(getFreeFile());
628        DataOutputStream dos = new DataOutputStream(os);
629        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
630        dos.close();
631    }
632
633    private void loadFreeList() throws IOException {
634        freeList.clear();
635        FileInputStream is = new FileInputStream(getFreeFile());
636        DataInputStream dis = new DataInputStream(is);
637        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
638        dis.close();
639    }
640
641    ///////////////////////////////////////////////////////////////////
642    // Property Accessors
643    ///////////////////////////////////////////////////////////////////
644
645    /**
646     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
647     *
648     * @return is the recovery buffer enabled.
649     */
650    public boolean isEnableRecoveryFile() {
651        return enableRecoveryFile;
652    }
653
654    /**
655     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
656     * may potentially cause partial page writes which can lead to page file corruption.
657     */
658    public void setEnableRecoveryFile(boolean doubleBuffer) {
659        assertNotLoaded();
660        this.enableRecoveryFile = doubleBuffer;
661    }
662
663    /**
664     * @return Are page writes synced to disk?
665     */
666    public boolean isEnableDiskSyncs() {
667        return enableDiskSyncs;
668    }
669
670    /**
671     * Allows you enable syncing writes to disk.
672     */
673    public void setEnableDiskSyncs(boolean syncWrites) {
674        assertNotLoaded();
675        this.enableDiskSyncs = syncWrites;
676    }
677
678    /**
679     * @return the page size
680     */
681    public int getPageSize() {
682        return this.pageSize;
683    }
684
685    /**
686     * @return the amount of content data that a page can hold.
687     */
688    public int getPageContentSize() {
689        return this.pageSize - Page.PAGE_HEADER_SIZE;
690    }
691
692    /**
693     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
694     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
695     * can no longer be changed.
696     *
697     * @param pageSize the pageSize to set
698     * @throws IllegalStateException once the page file is loaded.
699     */
700    public void setPageSize(int pageSize) throws IllegalStateException {
701        assertNotLoaded();
702        this.pageSize = pageSize;
703    }
704
705    /**
706     * @return true if read page caching is enabled
707     */
708    public boolean isEnablePageCaching() {
709        return this.enablePageCaching;
710    }
711
712    /**
713     * @param enablePageCaching allows you to enable read page caching
714     */
715    public void setEnablePageCaching(boolean enablePageCaching) {
716        assertNotLoaded();
717        this.enablePageCaching = enablePageCaching;
718    }
719
720    /**
721     * @return the maximum number of pages that will get stored in the read page cache.
722     */
723    public int getPageCacheSize() {
724        return this.pageCacheSize;
725    }
726
727    /**
728     * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
729     */
730    public void setPageCacheSize(int pageCacheSize) {
731        assertNotLoaded();
732        this.pageCacheSize = pageCacheSize;
733    }
734
735    public boolean isEnabledWriteThread() {
736        return enabledWriteThread;
737    }
738
739    public void setEnableWriteThread(boolean enableAsyncWrites) {
740        assertNotLoaded();
741        this.enabledWriteThread = enableAsyncWrites;
742    }
743
744    public long getDiskSize() throws IOException {
745        return toOffset(nextFreePageId.get());
746    }
747
748    /**
749     * @return the number of pages allocated in the PageFile
750     */
751    public long getPageCount() {
752        return nextFreePageId.get();
753    }
754
755    public int getRecoveryFileMinPageCount() {
756        return recoveryFileMinPageCount;
757    }
758
759    public long getFreePageCount() {
760        assertLoaded();
761        return freeList.rangeSize();
762    }
763
764    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
765        assertNotLoaded();
766        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
767    }
768
769    public int getRecoveryFileMaxPageCount() {
770        return recoveryFileMaxPageCount;
771    }
772
773    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
774        assertNotLoaded();
775        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
776    }
777
778    public int getWriteBatchSize() {
779        return writeBatchSize;
780    }
781
782    public void setWriteBatchSize(int writeBatchSize) {
783        this.writeBatchSize = writeBatchSize;
784    }
785
786    public float getLFUEvictionFactor() {
787        return LFUEvictionFactor;
788    }
789
790    public void setLFUEvictionFactor(float LFUEvictionFactor) {
791        this.LFUEvictionFactor = LFUEvictionFactor;
792    }
793
794    public boolean isUseLFRUEviction() {
795        return useLFRUEviction;
796    }
797
798    public void setUseLFRUEviction(boolean useLFRUEviction) {
799        this.useLFRUEviction = useLFRUEviction;
800    }
801
802    ///////////////////////////////////////////////////////////////////
803    // Package Protected Methods exposed to Transaction
804    ///////////////////////////////////////////////////////////////////
805
806    /**
807     * @throws IllegalStateException if the page file is not loaded.
808     */
809    void assertLoaded() throws IllegalStateException {
810        if (!loaded.get()) {
811            throw new IllegalStateException("PageFile is not loaded");
812        }
813    }
814
815    void assertNotLoaded() throws IllegalStateException {
816        if (loaded.get()) {
817            throw new IllegalStateException("PageFile is loaded");
818        }
819    }
820
821    /**
822     * Allocates a block of free pages that you can write data to.
823     *
824     * @param count the number of sequential pages to allocate
825     * @return the first page of the sequential set.
826     * @throws IOException           If an disk error occurred.
827     * @throws IllegalStateException if the PageFile is not loaded
828     */
829    <T> Page<T> allocate(int count) throws IOException {
830        assertLoaded();
831        if (count <= 0) {
832            throw new IllegalArgumentException("The allocation count must be larger than zero");
833        }
834
835        Sequence seq = freeList.removeFirstSequence(count);
836
837        // We may need to create new free pages...
838        if (seq == null) {
839
840            Page<T> first = null;
841            int c = count;
842
843            // Perform the id's only once....
844            long pageId = nextFreePageId.getAndAdd(count);
845            long writeTxnId = nextTxid.getAndAdd(count);
846
847            while (c-- > 0) {
848                Page<T> page = new Page<T>(pageId++);
849                page.makeFree(writeTxnId++);
850
851                if (first == null) {
852                    first = page;
853                }
854
855                addToCache(page);
856                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
857                page.write(out);
858                write(page, out.getData());
859
860                // LOG.debug("allocate writing: "+page.getPageId());
861            }
862
863            return first;
864        }
865
866        Page<T> page = new Page<T>(seq.getFirst());
867        page.makeFree(0);
868        // LOG.debug("allocated: "+page.getPageId());
869        return page;
870    }
871
872    long getNextWriteTransactionId() {
873        return nextTxid.incrementAndGet();
874    }
875
876    synchronized void readPage(long pageId, byte[] data) throws IOException {
877        readFile.seek(toOffset(pageId));
878        readFile.readFully(data);
879    }
880
881    public void freePage(long pageId) {
882        freeList.add(pageId);
883        removeFromCache(pageId);
884    }
885
886    @SuppressWarnings("unchecked")
887    private <T> void write(Page<T> page, byte[] data) throws IOException {
888        final PageWrite write = new PageWrite(page, data);
889        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
890            @Override
891            public Long getKey() {
892                return write.getPage().getPageId();
893            }
894
895            @Override
896            public PageWrite getValue() {
897                return write;
898            }
899
900            @Override
901            public PageWrite setValue(PageWrite value) {
902                return null;
903            }
904        };
905        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
906        write(Arrays.asList(entries));
907    }
908
909    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
910        synchronized (writes) {
911            if (enabledWriteThread) {
912                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
913                    try {
914                        writes.wait();
915                    } catch (InterruptedException e) {
916                        Thread.currentThread().interrupt();
917                        throw new InterruptedIOException();
918                    }
919                }
920            }
921
922            boolean longTx = false;
923
924            for (Map.Entry<Long, PageWrite> entry : updates) {
925                Long key = entry.getKey();
926                PageWrite value = entry.getValue();
927                PageWrite write = writes.get(key);
928                if (write == null) {
929                    writes.put(key, value);
930                } else {
931                    if (value.currentLocation != -1) {
932                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
933                        write.tmpFile = value.tmpFile;
934                        longTx = true;
935                    } else {
936                        write.setCurrent(value.page, value.current);
937                    }
938                }
939            }
940
941            // Once we start approaching capacity, notify the writer to start writing
942            // sync immediately for long txs
943            if (longTx || canStartWriteBatch()) {
944
945                if (enabledWriteThread) {
946                    writes.notify();
947                } else {
948                    writeBatch();
949                }
950            }
951        }
952    }
953
954    private boolean canStartWriteBatch() {
955        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
956        if (enabledWriteThread) {
957            // The constant 10 here controls how soon write batches start going to disk..
958            // would be nice to figure out how to auto tune that value.  Make to small and
959            // we reduce through put because we are locking the write mutex too often doing writes
960            return capacityUsed >= 10 || checkpointLatch != null;
961        } else {
962            return capacityUsed >= 80 || checkpointLatch != null;
963        }
964    }
965
966    ///////////////////////////////////////////////////////////////////
967    // Cache Related operations
968    ///////////////////////////////////////////////////////////////////
969    @SuppressWarnings("unchecked")
970    <T> Page<T> getFromCache(long pageId) {
971        synchronized (writes) {
972            PageWrite pageWrite = writes.get(pageId);
973            if (pageWrite != null) {
974                return pageWrite.page;
975            }
976        }
977
978        Page<T> result = null;
979        if (enablePageCaching) {
980            result = pageCache.get(pageId);
981        }
982        return result;
983    }
984
985    void addToCache(Page page) {
986        if (enablePageCaching) {
987            pageCache.put(page.getPageId(), page);
988        }
989    }
990
991    void removeFromCache(long pageId) {
992        if (enablePageCaching) {
993            pageCache.remove(pageId);
994        }
995    }
996
997    ///////////////////////////////////////////////////////////////////
998    // Internal Double write implementation follows...
999    ///////////////////////////////////////////////////////////////////
1000
1001    private void pollWrites() {
1002        try {
1003            while (!stopWriter.get()) {
1004                // Wait for a notification...
1005                synchronized (writes) {
1006                    writes.notifyAll();
1007
1008                    // If there is not enough to write, wait for a notification...
1009                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1010                        writes.wait(100);
1011                    }
1012
1013                    if (writes.isEmpty()) {
1014                        releaseCheckpointWaiter();
1015                    }
1016                }
1017                writeBatch();
1018            }
1019        } catch (Throwable e) {
1020            LOG.info("An exception was raised while performing poll writes", e);
1021        } finally {
1022            releaseCheckpointWaiter();
1023        }
1024    }
1025
1026    private void writeBatch() throws IOException {
1027
1028        CountDownLatch checkpointLatch;
1029        ArrayList<PageWrite> batch;
1030        synchronized (writes) {
1031            // If there is not enough to write, wait for a notification...
1032
1033            batch = new ArrayList<PageWrite>(writes.size());
1034            // build a write batch from the current write cache.
1035            for (PageWrite write : writes.values()) {
1036                batch.add(write);
1037                // Move the current write to the diskBound write, this lets folks update the
1038                // page again without blocking for this write.
1039                write.begin();
1040                if (write.diskBound == null && write.diskBoundLocation == -1) {
1041                    batch.remove(write);
1042                }
1043            }
1044
1045            // Grab on to the existing checkpoint latch cause once we do this write we can
1046            // release the folks that were waiting for those writes to hit disk.
1047            checkpointLatch = this.checkpointLatch;
1048            this.checkpointLatch = null;
1049        }
1050
1051        // First land the writes in the recovery file
1052        if (enableRecoveryFile) {
1053            Checksum checksum = new Adler32();
1054
1055            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1056
1057            for (PageWrite w : batch) {
1058                try {
1059                    checksum.update(w.getDiskBound(), 0, pageSize);
1060                } catch (Throwable t) {
1061                    throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1062                }
1063                recoveryFile.writeLong(w.page.getPageId());
1064                recoveryFile.write(w.getDiskBound(), 0, pageSize);
1065            }
1066
1067            // Can we shrink the recovery buffer??
1068            if (recoveryPageCount > recoveryFileMaxPageCount) {
1069                int t = Math.max(recoveryFileMinPageCount, batch.size());
1070                recoveryFile.setLength(recoveryFileSizeForPages(t));
1071            }
1072
1073            // Record the page writes in the recovery buffer.
1074            recoveryFile.seek(0);
1075            // Store the next tx id...
1076            recoveryFile.writeLong(nextTxid.get());
1077            // Store the checksum for thw write batch so that on recovery we
1078            // know if we have a consistent
1079            // write batch on disk.
1080            recoveryFile.writeLong(checksum.getValue());
1081            // Write the # of pages that will follow
1082            recoveryFile.writeInt(batch.size());
1083
1084            if (enableDiskSyncs) {
1085                recoveryFile.sync();
1086            }
1087        }
1088
1089        try {
1090            for (PageWrite w : batch) {
1091                writeFile.seek(toOffset(w.page.getPageId()));
1092                writeFile.write(w.getDiskBound(), 0, pageSize);
1093                w.done();
1094            }
1095
1096            if (enableDiskSyncs) {
1097                writeFile.sync();
1098            }
1099        } finally {
1100            synchronized (writes) {
1101                for (PageWrite w : batch) {
1102                    // If there are no more pending writes, then remove it from
1103                    // the write cache.
1104                    if (w.isDone()) {
1105                        writes.remove(w.page.getPageId());
1106                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1107                            if (!w.tmpFile.delete()) {
1108                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1109                            }
1110                            tmpFilesForRemoval.remove(w.tmpFile);
1111                        }
1112                    }
1113                }
1114            }
1115
1116            if (checkpointLatch != null) {
1117                checkpointLatch.countDown();
1118            }
1119        }
1120    }
1121
1122    public void removeTmpFile(File file) {
1123        tmpFilesForRemoval.add(file);
1124    }
1125
1126    private long recoveryFileSizeForPages(int pageCount) {
1127        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1128    }
1129
1130    private void releaseCheckpointWaiter() {
1131        if (checkpointLatch != null) {
1132            checkpointLatch.countDown();
1133            checkpointLatch = null;
1134        }
1135    }
1136
1137    /**
1138     * Inspects the recovery buffer and re-applies any
1139     * partially applied page writes.
1140     *
1141     * @return the next transaction id that can be used.
1142     */
1143    private long redoRecoveryUpdates() throws IOException {
1144        if (!enableRecoveryFile) {
1145            return 0;
1146        }
1147        recoveryPageCount = 0;
1148
1149        // Are we initializing the recovery file?
1150        if (recoveryFile.length() == 0) {
1151            // Write an empty header..
1152            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1153            // Preallocate the minium size for better performance.
1154            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1155            return 0;
1156        }
1157
1158        // How many recovery pages do we have in the recovery buffer?
1159        recoveryFile.seek(0);
1160        long nextTxId = recoveryFile.readLong();
1161        long expectedChecksum = recoveryFile.readLong();
1162        int pageCounter = recoveryFile.readInt();
1163
1164        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1165        Checksum checksum = new Adler32();
1166        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1167        try {
1168            for (int i = 0; i < pageCounter; i++) {
1169                long offset = recoveryFile.readLong();
1170                byte[] data = new byte[pageSize];
1171                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1172                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1173                    return nextTxId;
1174                }
1175                checksum.update(data, 0, pageSize);
1176                batch.put(offset, data);
1177            }
1178        } catch (Exception e) {
1179            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1180            // as the pages should still be consistent.
1181            LOG.debug("Redo buffer was not fully intact: ", e);
1182            return nextTxId;
1183        }
1184
1185        recoveryPageCount = pageCounter;
1186
1187        // If the checksum is not valid then the recovery buffer was partially written to disk.
1188        if (checksum.getValue() != expectedChecksum) {
1189            return nextTxId;
1190        }
1191
1192        // Re-apply all the writes in the recovery buffer.
1193        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1194            writeFile.seek(toOffset(e.getKey()));
1195            writeFile.write(e.getValue());
1196        }
1197
1198        // And sync it to disk
1199        writeFile.sync();
1200        return nextTxId;
1201    }
1202
1203    private void startWriter() {
1204        synchronized (writes) {
1205            if (enabledWriteThread) {
1206                stopWriter.set(false);
1207                writerThread = new Thread("KahaDB Page Writer") {
1208                    @Override
1209                    public void run() {
1210                        pollWrites();
1211                    }
1212                };
1213                writerThread.setPriority(Thread.MAX_PRIORITY);
1214                writerThread.setDaemon(true);
1215                writerThread.start();
1216            }
1217        }
1218    }
1219
1220    private void stopWriter() throws InterruptedException {
1221        if (enabledWriteThread) {
1222            stopWriter.set(true);
1223            writerThread.join();
1224        }
1225    }
1226
1227    public File getFile() {
1228        return getMainPageFile();
1229    }
1230
1231    public File getDirectory() {
1232        return directory;
1233    }
1234}