001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.kahadb.page; 018 019import java.io.*; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.LinkedHashMap; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Properties; 029import java.util.TreeMap; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.zip.Adler32; 034import java.util.zip.Checksum; 035 036import org.apache.kahadb.util.DataByteArrayOutputStream; 037import org.apache.kahadb.util.IOExceptionSupport; 038import org.apache.kahadb.util.IOHelper; 039import org.apache.kahadb.util.IntrospectionSupport; 040import org.apache.kahadb.util.LFUCache; 041import org.apache.kahadb.util.LRUCache; 042import org.apache.kahadb.util.Sequence; 043import org.apache.kahadb.util.SequenceSet; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 049 * be externally synchronized. 050 * <p/> 051 * The file has 3 parts: 052 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file. 053 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent 054 * Page Space: The pages in the page file. 055 */ 056public class PageFile { 057 058 private static final String PAGEFILE_SUFFIX = ".data"; 059 private static final String RECOVERY_FILE_SUFFIX = ".redo"; 060 private static final String FREE_FILE_SUFFIX = ".free"; 061 062 // 4k Default page size. 063 public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4); 064 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000); 065 public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);; 066 067 private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4; 068 private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4; 069 070 // Recovery header is (long offset) 071 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); 072 073 // A PageFile will use a couple of files in this directory 074 private File directory; 075 // And the file names in that directory will be based on this name. 076 private final String name; 077 078 // File handle used for reading pages.. 079 private RandomAccessFile readFile; 080 // File handle used for writing pages.. 081 private RandomAccessFile writeFile; 082 // File handle used for writing pages.. 083 private RandomAccessFile recoveryFile; 084 085 // The size of pages 086 private int pageSize = DEFAULT_PAGE_SIZE; 087 088 // The minimum number of space allocated to the recovery file in number of pages. 089 private int recoveryFileMinPageCount = 1000; 090 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 091 // to this max size as soon as possible. 092 private int recoveryFileMaxPageCount = 10000; 093 // The number of pages in the current recovery buffer 094 private int recoveryPageCount; 095 096 private AtomicBoolean loaded = new AtomicBoolean(); 097 // The number of pages we are aiming to write every time we 098 // write to disk. 099 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; 100 101 // We keep a cache of pages recently used? 102 private Map<Long, Page> pageCache; 103 // The cache of recently used pages. 104 private boolean enablePageCaching = true; 105 // How many pages will we keep in the cache? 106 private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE; 107 108 // Should first log the page write to the recovery buffer? Avoids partial 109 // page write failures.. 110 private boolean enableRecoveryFile = true; 111 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() 112 private boolean enableDiskSyncs = true; 113 // Will writes be done in an async thread? 114 private boolean enabledWriteThread = false; 115 116 // These are used if enableAsyncWrites==true 117 private AtomicBoolean stopWriter = new AtomicBoolean(); 118 private Thread writerThread; 119 private CountDownLatch checkpointLatch; 120 121 // Keeps track of writes that are being written to disk. 122 private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>(); 123 124 // Keeps track of free pages. 125 private final AtomicLong nextFreePageId = new AtomicLong(); 126 private SequenceSet freeList = new SequenceSet(); 127 128 private AtomicLong nextTxid = new AtomicLong(); 129 130 // Persistent settings stored in the page file. 131 private MetaData metaData; 132 133 private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>(); 134 135 private boolean useLFRUEviction = false; 136 private float LFUEvictionFactor = 0.2f; 137 138 /** 139 * Use to keep track of updated pages which have not yet been committed. 140 */ 141 static class PageWrite { 142 Page page; 143 byte[] current; 144 byte[] diskBound; 145 long currentLocation = -1; 146 long diskBoundLocation = -1; 147 File tmpFile; 148 int length; 149 150 public PageWrite(Page page, byte[] data) { 151 this.page = page; 152 current = data; 153 } 154 155 public PageWrite(Page page, long currentLocation, int length, File tmpFile) { 156 this.page = page; 157 this.currentLocation = currentLocation; 158 this.tmpFile = tmpFile; 159 this.length = length; 160 } 161 162 public void setCurrent(Page page, byte[] data) { 163 this.page = page; 164 current = data; 165 currentLocation = -1; 166 diskBoundLocation = -1; 167 } 168 169 public void setCurrentLocation(Page page, long location, int length) { 170 this.page = page; 171 this.currentLocation = location; 172 this.length = length; 173 this.current = null; 174 } 175 176 @Override 177 public String toString() { 178 return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]"; 179 } 180 181 @SuppressWarnings("unchecked") 182 public Page getPage() { 183 return page; 184 } 185 186 public byte[] getDiskBound() throws IOException { 187 if (diskBound == null && diskBoundLocation != -1) { 188 diskBound = new byte[length]; 189 RandomAccessFile file = new RandomAccessFile(tmpFile, "r"); 190 file.seek(diskBoundLocation); 191 file.read(diskBound); 192 file.close(); 193 diskBoundLocation = -1; 194 } 195 return diskBound; 196 } 197 198 void begin() { 199 if (currentLocation != -1) { 200 diskBoundLocation = currentLocation; 201 } else { 202 diskBound = current; 203 } 204 current = null; 205 currentLocation = -1; 206 } 207 208 /** 209 * @return true if there is no pending writes to do. 210 */ 211 boolean done() { 212 diskBoundLocation = -1; 213 diskBound = null; 214 return current == null || currentLocation == -1; 215 } 216 217 boolean isDone() { 218 return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; 219 } 220 } 221 222 /** 223 * The MetaData object hold the persistent data associated with a PageFile object. 224 */ 225 public static class MetaData { 226 227 String fileType; 228 String fileTypeVersion; 229 230 long metaDataTxId = -1; 231 int pageSize; 232 boolean cleanShutdown; 233 long lastTxId; 234 long freePages; 235 236 public String getFileType() { 237 return fileType; 238 } 239 240 public void setFileType(String fileType) { 241 this.fileType = fileType; 242 } 243 244 public String getFileTypeVersion() { 245 return fileTypeVersion; 246 } 247 248 public void setFileTypeVersion(String version) { 249 this.fileTypeVersion = version; 250 } 251 252 public long getMetaDataTxId() { 253 return metaDataTxId; 254 } 255 256 public void setMetaDataTxId(long metaDataTxId) { 257 this.metaDataTxId = metaDataTxId; 258 } 259 260 public int getPageSize() { 261 return pageSize; 262 } 263 264 public void setPageSize(int pageSize) { 265 this.pageSize = pageSize; 266 } 267 268 public boolean isCleanShutdown() { 269 return cleanShutdown; 270 } 271 272 public void setCleanShutdown(boolean cleanShutdown) { 273 this.cleanShutdown = cleanShutdown; 274 } 275 276 public long getLastTxId() { 277 return lastTxId; 278 } 279 280 public void setLastTxId(long lastTxId) { 281 this.lastTxId = lastTxId; 282 } 283 284 public long getFreePages() { 285 return freePages; 286 } 287 288 public void setFreePages(long value) { 289 this.freePages = value; 290 } 291 } 292 293 public Transaction tx() { 294 assertLoaded(); 295 return new Transaction(this); 296 } 297 298 /** 299 * Creates a PageFile in the specified directory who's data files are named by name. 300 */ 301 public PageFile(File directory, String name) { 302 this.directory = directory; 303 this.name = name; 304 } 305 306 /** 307 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded. 308 * 309 * @throws IOException if the files cannot be deleted. 310 * @throws IllegalStateException if this PageFile is loaded 311 */ 312 public void delete() throws IOException { 313 if (loaded.get()) { 314 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 315 } 316 delete(getMainPageFile()); 317 delete(getFreeFile()); 318 delete(getRecoveryFile()); 319 } 320 321 public void archive() throws IOException { 322 if (loaded.get()) { 323 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 324 } 325 long timestamp = System.currentTimeMillis(); 326 archive(getMainPageFile(), String.valueOf(timestamp)); 327 archive(getFreeFile(), String.valueOf(timestamp)); 328 archive(getRecoveryFile(), String.valueOf(timestamp)); 329 } 330 331 /** 332 * @param file 333 * @throws IOException 334 */ 335 private void delete(File file) throws IOException { 336 if (file.exists() && !file.delete()) { 337 throw new IOException("Could not delete: " + file.getPath()); 338 } 339 } 340 341 private void archive(File file, String suffix) throws IOException { 342 if (file.exists()) { 343 File archive = new File(file.getPath() + "-" + suffix); 344 if (!file.renameTo(archive)) { 345 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath()); 346 } 347 } 348 } 349 350 /** 351 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the 352 * first time the page file is loaded, then this creates the page file in the file system. 353 * 354 * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 355 * there was a disk error. 356 * @throws IllegalStateException If the page file was already loaded. 357 */ 358 public void load() throws IOException, IllegalStateException { 359 if (loaded.compareAndSet(false, true)) { 360 361 if (enablePageCaching) { 362 if (isUseLFRUEviction()) { 363 pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor())); 364 } else { 365 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true)); 366 } 367 } 368 369 File file = getMainPageFile(); 370 IOHelper.mkdirs(file.getParentFile()); 371 writeFile = new RandomAccessFile(file, "rw"); 372 readFile = new RandomAccessFile(file, "r"); 373 374 if (readFile.length() > 0) { 375 // Load the page size setting cause that can't change once the file is created. 376 loadMetaData(); 377 pageSize = metaData.getPageSize(); 378 } else { 379 // Store the page size setting cause that can't change once the file is created. 380 metaData = new MetaData(); 381 metaData.setFileType(PageFile.class.getName()); 382 metaData.setFileTypeVersion("1"); 383 metaData.setPageSize(getPageSize()); 384 metaData.setCleanShutdown(true); 385 metaData.setFreePages(-1); 386 metaData.setLastTxId(0); 387 storeMetaData(); 388 } 389 390 if (enableRecoveryFile) { 391 recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw"); 392 } 393 394 if (metaData.isCleanShutdown()) { 395 nextTxid.set(metaData.getLastTxId() + 1); 396 if (metaData.getFreePages() > 0) { 397 loadFreeList(); 398 } 399 } else { 400 LOG.debug(toString() + ", Recovering page file..."); 401 nextTxid.set(redoRecoveryUpdates()); 402 403 // Scan all to find the free pages. 404 freeList = new SequenceSet(); 405 for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) { 406 Page page = i.next(); 407 if (page.getType() == Page.PAGE_FREE_TYPE) { 408 freeList.add(page.getPageId()); 409 } 410 } 411 } 412 413 metaData.setCleanShutdown(false); 414 storeMetaData(); 415 getFreeFile().delete(); 416 417 if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { 418 writeFile.setLength(PAGE_FILE_HEADER_SIZE); 419 } 420 nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize); 421 startWriter(); 422 423 } else { 424 throw new IllegalStateException("Cannot load the page file when it is already loaded."); 425 } 426 } 427 428 429 /** 430 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. 431 * once unloaded, you can no longer use the page file to read or write Pages. 432 * 433 * @throws IOException if there was a disk error occurred while closing the down the page file. 434 * @throws IllegalStateException if the PageFile is not loaded 435 */ 436 public void unload() throws IOException { 437 if (loaded.compareAndSet(true, false)) { 438 flush(); 439 try { 440 stopWriter(); 441 } catch (InterruptedException e) { 442 throw new InterruptedIOException(); 443 } 444 445 if (freeList.isEmpty()) { 446 metaData.setFreePages(0); 447 } else { 448 storeFreeList(); 449 metaData.setFreePages(freeList.size()); 450 } 451 452 metaData.setLastTxId(nextTxid.get() - 1); 453 metaData.setCleanShutdown(true); 454 storeMetaData(); 455 456 if (readFile != null) { 457 readFile.close(); 458 readFile = null; 459 writeFile.close(); 460 writeFile = null; 461 if (enableRecoveryFile) { 462 recoveryFile.close(); 463 recoveryFile = null; 464 } 465 freeList.clear(); 466 if (pageCache != null) { 467 pageCache = null; 468 } 469 synchronized (writes) { 470 writes.clear(); 471 } 472 } 473 } else { 474 throw new IllegalStateException("Cannot unload the page file when it is not loaded"); 475 } 476 } 477 478 public boolean isLoaded() { 479 return loaded.get(); 480 } 481 482 /** 483 * Flush and sync all write buffers to disk. 484 * 485 * @throws IOException If an disk error occurred. 486 */ 487 public void flush() throws IOException { 488 489 if (enabledWriteThread && stopWriter.get()) { 490 throw new IOException("Page file already stopped: checkpointing is not allowed"); 491 } 492 493 // Setup a latch that gets notified when all buffered writes hits the disk. 494 CountDownLatch checkpointLatch; 495 synchronized (writes) { 496 if (writes.isEmpty()) { 497 return; 498 } 499 if (enabledWriteThread) { 500 if (this.checkpointLatch == null) { 501 this.checkpointLatch = new CountDownLatch(1); 502 } 503 checkpointLatch = this.checkpointLatch; 504 writes.notify(); 505 } else { 506 writeBatch(); 507 return; 508 } 509 } 510 try { 511 checkpointLatch.await(); 512 } catch (InterruptedException e) { 513 InterruptedIOException ioe = new InterruptedIOException(); 514 ioe.initCause(e); 515 throw ioe; 516 } 517 } 518 519 520 public String toString() { 521 return "Page File: " + getMainPageFile(); 522 } 523 524 /////////////////////////////////////////////////////////////////// 525 // Private Implementation Methods 526 /////////////////////////////////////////////////////////////////// 527 private File getMainPageFile() { 528 return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX); 529 } 530 531 public File getFreeFile() { 532 return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX); 533 } 534 535 public File getRecoveryFile() { 536 return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX); 537 } 538 539 public long toOffset(long pageId) { 540 return PAGE_FILE_HEADER_SIZE + (pageId * pageSize); 541 } 542 543 private void loadMetaData() throws IOException { 544 545 ByteArrayInputStream is; 546 MetaData v1 = new MetaData(); 547 MetaData v2 = new MetaData(); 548 try { 549 Properties p = new Properties(); 550 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 551 readFile.seek(0); 552 readFile.readFully(d); 553 is = new ByteArrayInputStream(d); 554 p.load(is); 555 IntrospectionSupport.setProperties(v1, p); 556 } catch (IOException e) { 557 v1 = null; 558 } 559 560 try { 561 Properties p = new Properties(); 562 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 563 readFile.seek(PAGE_FILE_HEADER_SIZE / 2); 564 readFile.readFully(d); 565 is = new ByteArrayInputStream(d); 566 p.load(is); 567 IntrospectionSupport.setProperties(v2, p); 568 } catch (IOException e) { 569 v2 = null; 570 } 571 572 if (v1 == null && v2 == null) { 573 throw new IOException("Could not load page file meta data"); 574 } 575 576 if (v1 == null || v1.metaDataTxId < 0) { 577 metaData = v2; 578 } else if (v2 == null || v1.metaDataTxId < 0) { 579 metaData = v1; 580 } else if (v1.metaDataTxId == v2.metaDataTxId) { 581 metaData = v1; // use the first since the 2nd could be a partial.. 582 } else { 583 metaData = v2; // use the second cause the first is probably a partial. 584 } 585 } 586 587 private void storeMetaData() throws IOException { 588 // Convert the metadata into a property format 589 metaData.metaDataTxId++; 590 Properties p = new Properties(); 591 IntrospectionSupport.getProperties(metaData, p, null); 592 593 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE); 594 p.store(os, ""); 595 if (os.size() > PAGE_FILE_HEADER_SIZE / 2) { 596 throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2); 597 } 598 // Fill the rest with space... 599 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()]; 600 Arrays.fill(filler, (byte) ' '); 601 os.write(filler); 602 os.flush(); 603 604 byte[] d = os.toByteArray(); 605 606 // So we don't loose it.. write it 2 times... 607 writeFile.seek(0); 608 writeFile.write(d); 609 writeFile.getFD().sync(); 610 writeFile.seek(PAGE_FILE_HEADER_SIZE / 2); 611 writeFile.write(d); 612 writeFile.getFD().sync(); 613 } 614 615 private void storeFreeList() throws IOException { 616 FileOutputStream os = new FileOutputStream(getFreeFile()); 617 DataOutputStream dos = new DataOutputStream(os); 618 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); 619 dos.close(); 620 } 621 622 private void loadFreeList() throws IOException { 623 freeList.clear(); 624 FileInputStream is = new FileInputStream(getFreeFile()); 625 DataInputStream dis = new DataInputStream(is); 626 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); 627 dis.close(); 628 } 629 630 /////////////////////////////////////////////////////////////////// 631 // Property Accessors 632 /////////////////////////////////////////////////////////////////// 633 634 /** 635 * Is the recovery buffer used to double buffer page writes. Enabled by default. 636 * 637 * @return is the recovery buffer enabled. 638 */ 639 public boolean isEnableRecoveryFile() { 640 return enableRecoveryFile; 641 } 642 643 /** 644 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this 645 * may potentially cause partial page writes which can lead to page file corruption. 646 */ 647 public void setEnableRecoveryFile(boolean doubleBuffer) { 648 assertNotLoaded(); 649 this.enableRecoveryFile = doubleBuffer; 650 } 651 652 /** 653 * @return Are page writes synced to disk? 654 */ 655 public boolean isEnableDiskSyncs() { 656 return enableDiskSyncs; 657 } 658 659 /** 660 * Allows you enable syncing writes to disk. 661 */ 662 public void setEnableDiskSyncs(boolean syncWrites) { 663 assertNotLoaded(); 664 this.enableDiskSyncs = syncWrites; 665 } 666 667 /** 668 * @return the page size 669 */ 670 public int getPageSize() { 671 return this.pageSize; 672 } 673 674 /** 675 * @return the amount of content data that a page can hold. 676 */ 677 public int getPageContentSize() { 678 return this.pageSize - Page.PAGE_HEADER_SIZE; 679 } 680 681 /** 682 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, 683 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting 684 * can no longer be changed. 685 * 686 * @param pageSize the pageSize to set 687 * @throws IllegalStateException once the page file is loaded. 688 */ 689 public void setPageSize(int pageSize) throws IllegalStateException { 690 assertNotLoaded(); 691 this.pageSize = pageSize; 692 } 693 694 /** 695 * @return true if read page caching is enabled 696 */ 697 public boolean isEnablePageCaching() { 698 return this.enablePageCaching; 699 } 700 701 /** 702 * @param enablePageCaching allows you to enable read page caching 703 */ 704 public void setEnablePageCaching(boolean enablePageCaching) { 705 assertNotLoaded(); 706 this.enablePageCaching = enablePageCaching; 707 } 708 709 /** 710 * @return the maximum number of pages that will get stored in the read page cache. 711 */ 712 public int getPageCacheSize() { 713 return this.pageCacheSize; 714 } 715 716 /** 717 * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache. 718 */ 719 public void setPageCacheSize(int pageCacheSize) { 720 assertNotLoaded(); 721 this.pageCacheSize = pageCacheSize; 722 } 723 724 public boolean isEnabledWriteThread() { 725 return enabledWriteThread; 726 } 727 728 public void setEnableWriteThread(boolean enableAsyncWrites) { 729 assertNotLoaded(); 730 this.enabledWriteThread = enableAsyncWrites; 731 } 732 733 public long getDiskSize() throws IOException { 734 return toOffset(nextFreePageId.get()); 735 } 736 737 /** 738 * @return the number of pages allocated in the PageFile 739 */ 740 public long getPageCount() { 741 return nextFreePageId.get(); 742 } 743 744 public int getRecoveryFileMinPageCount() { 745 return recoveryFileMinPageCount; 746 } 747 748 public long getFreePageCount() { 749 assertLoaded(); 750 return freeList.rangeSize(); 751 } 752 753 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { 754 assertNotLoaded(); 755 this.recoveryFileMinPageCount = recoveryFileMinPageCount; 756 } 757 758 public int getRecoveryFileMaxPageCount() { 759 return recoveryFileMaxPageCount; 760 } 761 762 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) { 763 assertNotLoaded(); 764 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; 765 } 766 767 public int getWriteBatchSize() { 768 return writeBatchSize; 769 } 770 771 public void setWriteBatchSize(int writeBatchSize) { 772 this.writeBatchSize = writeBatchSize; 773 } 774 775 public float getLFUEvictionFactor() { 776 return LFUEvictionFactor; 777 } 778 779 public void setLFUEvictionFactor(float LFUEvictionFactor) { 780 this.LFUEvictionFactor = LFUEvictionFactor; 781 } 782 783 public boolean isUseLFRUEviction() { 784 return useLFRUEviction; 785 } 786 787 public void setUseLFRUEviction(boolean useLFRUEviction) { 788 this.useLFRUEviction = useLFRUEviction; 789 } 790 791 /////////////////////////////////////////////////////////////////// 792 // Package Protected Methods exposed to Transaction 793 /////////////////////////////////////////////////////////////////// 794 795 /** 796 * @throws IllegalStateException if the page file is not loaded. 797 */ 798 void assertLoaded() throws IllegalStateException { 799 if (!loaded.get()) { 800 throw new IllegalStateException("PageFile is not loaded"); 801 } 802 } 803 804 void assertNotLoaded() throws IllegalStateException { 805 if (loaded.get()) { 806 throw new IllegalStateException("PageFile is loaded"); 807 } 808 } 809 810 /** 811 * Allocates a block of free pages that you can write data to. 812 * 813 * @param count the number of sequential pages to allocate 814 * @return the first page of the sequential set. 815 * @throws IOException If an disk error occurred. 816 * @throws IllegalStateException if the PageFile is not loaded 817 */ 818 <T> Page<T> allocate(int count) throws IOException { 819 assertLoaded(); 820 if (count <= 0) { 821 throw new IllegalArgumentException("The allocation count must be larger than zero"); 822 } 823 824 Sequence seq = freeList.removeFirstSequence(count); 825 826 // We may need to create new free pages... 827 if (seq == null) { 828 829 Page<T> first = null; 830 int c = count; 831 832 // Perform the id's only once.... 833 long pageId = nextFreePageId.getAndAdd(count); 834 long writeTxnId = nextTxid.getAndAdd(count); 835 836 while (c-- > 0) { 837 Page<T> page = new Page<T>(pageId++); 838 page.makeFree(writeTxnId++); 839 840 if (first == null) { 841 first = page; 842 } 843 844 addToCache(page); 845 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); 846 page.write(out); 847 write(page, out.getData()); 848 849 // LOG.debug("allocate writing: "+page.getPageId()); 850 } 851 852 return first; 853 } 854 855 Page<T> page = new Page<T>(seq.getFirst()); 856 page.makeFree(0); 857 // LOG.debug("allocated: "+page.getPageId()); 858 return page; 859 } 860 861 long getNextWriteTransactionId() { 862 return nextTxid.incrementAndGet(); 863 } 864 865 void readPage(long pageId, byte[] data) throws IOException { 866 readFile.seek(toOffset(pageId)); 867 readFile.readFully(data); 868 } 869 870 public void freePage(long pageId) { 871 freeList.add(pageId); 872 removeFromCache(pageId); 873 } 874 875 @SuppressWarnings("unchecked") 876 private <T> void write(Page<T> page, byte[] data) throws IOException { 877 final PageWrite write = new PageWrite(page, data); 878 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() { 879 public Long getKey() { 880 return write.getPage().getPageId(); 881 } 882 883 public PageWrite getValue() { 884 return write; 885 } 886 887 public PageWrite setValue(PageWrite value) { 888 return null; 889 } 890 }; 891 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry}; 892 write(Arrays.asList(entries)); 893 } 894 895 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException { 896 synchronized (writes) { 897 if (enabledWriteThread) { 898 while (writes.size() >= writeBatchSize && !stopWriter.get()) { 899 try { 900 writes.wait(); 901 } catch (InterruptedException e) { 902 Thread.currentThread().interrupt(); 903 throw new InterruptedIOException(); 904 } 905 } 906 } 907 908 boolean longTx = false; 909 910 for (Map.Entry<Long, PageWrite> entry : updates) { 911 Long key = entry.getKey(); 912 PageWrite value = entry.getValue(); 913 PageWrite write = writes.get(key); 914 if (write == null) { 915 writes.put(key, value); 916 } else { 917 if (value.currentLocation != -1) { 918 write.setCurrentLocation(value.page, value.currentLocation, value.length); 919 write.tmpFile = value.tmpFile; 920 longTx = true; 921 } else { 922 write.setCurrent(value.page, value.current); 923 } 924 } 925 } 926 927 // Once we start approaching capacity, notify the writer to start writing 928 // sync immediately for long txs 929 if (longTx || canStartWriteBatch()) { 930 931 if (enabledWriteThread) { 932 writes.notify(); 933 } else { 934 writeBatch(); 935 } 936 } 937 } 938 } 939 940 private boolean canStartWriteBatch() { 941 int capacityUsed = ((writes.size() * 100) / writeBatchSize); 942 if (enabledWriteThread) { 943 // The constant 10 here controls how soon write batches start going to disk.. 944 // would be nice to figure out how to auto tune that value. Make to small and 945 // we reduce through put because we are locking the write mutex too often doing writes 946 return capacityUsed >= 10 || checkpointLatch != null; 947 } else { 948 return capacityUsed >= 80 || checkpointLatch != null; 949 } 950 } 951 952 /////////////////////////////////////////////////////////////////// 953 // Cache Related operations 954 /////////////////////////////////////////////////////////////////// 955 @SuppressWarnings("unchecked") 956 <T> Page<T> getFromCache(long pageId) { 957 synchronized (writes) { 958 PageWrite pageWrite = writes.get(pageId); 959 if (pageWrite != null) { 960 return pageWrite.page; 961 } 962 } 963 964 Page<T> result = null; 965 if (enablePageCaching) { 966 result = pageCache.get(pageId); 967 } 968 return result; 969 } 970 971 void addToCache(Page page) { 972 if (enablePageCaching) { 973 pageCache.put(page.getPageId(), page); 974 } 975 } 976 977 void removeFromCache(long pageId) { 978 if (enablePageCaching) { 979 pageCache.remove(pageId); 980 } 981 } 982 983 /////////////////////////////////////////////////////////////////// 984 // Internal Double write implementation follows... 985 /////////////////////////////////////////////////////////////////// 986 987 private void pollWrites() { 988 try { 989 while (!stopWriter.get()) { 990 // Wait for a notification... 991 synchronized (writes) { 992 writes.notifyAll(); 993 994 // If there is not enough to write, wait for a notification... 995 while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) { 996 writes.wait(100); 997 } 998 999 if (writes.isEmpty()) { 1000 releaseCheckpointWaiter(); 1001 } 1002 } 1003 writeBatch(); 1004 } 1005 } catch (Throwable e) { 1006 LOG.info("An exception was raised while performing poll writes", e); 1007 } finally { 1008 releaseCheckpointWaiter(); 1009 } 1010 } 1011 1012 private void writeBatch() throws IOException { 1013 1014 CountDownLatch checkpointLatch; 1015 ArrayList<PageWrite> batch; 1016 synchronized (writes) { 1017 // If there is not enough to write, wait for a notification... 1018 1019 batch = new ArrayList<PageWrite>(writes.size()); 1020 // build a write batch from the current write cache. 1021 for (PageWrite write : writes.values()) { 1022 batch.add(write); 1023 // Move the current write to the diskBound write, this lets folks update the 1024 // page again without blocking for this write. 1025 write.begin(); 1026 if (write.diskBound == null && write.diskBoundLocation == -1) { 1027 batch.remove(write); 1028 } 1029 } 1030 1031 // Grab on to the existing checkpoint latch cause once we do this write we can 1032 // release the folks that were waiting for those writes to hit disk. 1033 checkpointLatch = this.checkpointLatch; 1034 this.checkpointLatch = null; 1035 } 1036 1037 Checksum checksum = new Adler32(); 1038 if (enableRecoveryFile) { 1039 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1040 } 1041 for (PageWrite w : batch) { 1042 if (enableRecoveryFile) { 1043 try { 1044 checksum.update(w.getDiskBound(), 0, pageSize); 1045 } catch (Throwable t) { 1046 throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); 1047 } 1048 recoveryFile.writeLong(w.page.getPageId()); 1049 recoveryFile.write(w.getDiskBound(), 0, pageSize); 1050 } 1051 1052 writeFile.seek(toOffset(w.page.getPageId())); 1053 writeFile.write(w.getDiskBound(), 0, pageSize); 1054 w.done(); 1055 } 1056 1057 try { 1058 if (enableRecoveryFile) { 1059 // Can we shrink the recovery buffer?? 1060 if (recoveryPageCount > recoveryFileMaxPageCount) { 1061 int t = Math.max(recoveryFileMinPageCount, batch.size()); 1062 recoveryFile.setLength(recoveryFileSizeForPages(t)); 1063 } 1064 1065 // Record the page writes in the recovery buffer. 1066 recoveryFile.seek(0); 1067 // Store the next tx id... 1068 recoveryFile.writeLong(nextTxid.get()); 1069 // Store the checksum for thw write batch so that on recovery we 1070 // know if we have a consistent 1071 // write batch on disk. 1072 recoveryFile.writeLong(checksum.getValue()); 1073 // Write the # of pages that will follow 1074 recoveryFile.writeInt(batch.size()); 1075 } 1076 1077 if (enableDiskSyncs) { 1078 // Sync to make sure recovery buffer writes land on disk.. 1079 if (enableRecoveryFile) { 1080 recoveryFile.getFD().sync(); 1081 } 1082 writeFile.getFD().sync(); 1083 } 1084 } finally { 1085 synchronized (writes) { 1086 for (PageWrite w : batch) { 1087 // If there are no more pending writes, then remove it from 1088 // the write cache. 1089 if (w.isDone()) { 1090 writes.remove(w.page.getPageId()); 1091 if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) { 1092 if (!w.tmpFile.delete()) { 1093 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile); 1094 } 1095 tmpFilesForRemoval.remove(w.tmpFile); 1096 } 1097 } 1098 } 1099 } 1100 1101 if (checkpointLatch != null) { 1102 checkpointLatch.countDown(); 1103 } 1104 } 1105 } 1106 1107 public void removeTmpFile(File file) { 1108 tmpFilesForRemoval.add(file); 1109 } 1110 1111 private long recoveryFileSizeForPages(int pageCount) { 1112 return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount); 1113 } 1114 1115 private void releaseCheckpointWaiter() { 1116 if (checkpointLatch != null) { 1117 checkpointLatch.countDown(); 1118 checkpointLatch = null; 1119 } 1120 } 1121 1122 /** 1123 * Inspects the recovery buffer and re-applies any 1124 * partially applied page writes. 1125 * 1126 * @return the next transaction id that can be used. 1127 */ 1128 private long redoRecoveryUpdates() throws IOException { 1129 if (!enableRecoveryFile) { 1130 return 0; 1131 } 1132 recoveryPageCount = 0; 1133 1134 // Are we initializing the recovery file? 1135 if (recoveryFile.length() == 0) { 1136 // Write an empty header.. 1137 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); 1138 // Preallocate the minium size for better performance. 1139 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); 1140 return 0; 1141 } 1142 1143 // How many recovery pages do we have in the recovery buffer? 1144 recoveryFile.seek(0); 1145 long nextTxId = recoveryFile.readLong(); 1146 long expectedChecksum = recoveryFile.readLong(); 1147 int pageCounter = recoveryFile.readInt(); 1148 1149 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1150 Checksum checksum = new Adler32(); 1151 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>(); 1152 try { 1153 for (int i = 0; i < pageCounter; i++) { 1154 long offset = recoveryFile.readLong(); 1155 byte[] data = new byte[pageSize]; 1156 if (recoveryFile.read(data, 0, pageSize) != pageSize) { 1157 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer 1158 return nextTxId; 1159 } 1160 checksum.update(data, 0, pageSize); 1161 batch.put(offset, data); 1162 } 1163 } catch (Exception e) { 1164 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 1165 // as the pages should still be consistent. 1166 LOG.debug("Redo buffer was not fully intact: ", e); 1167 return nextTxId; 1168 } 1169 1170 recoveryPageCount = pageCounter; 1171 1172 // If the checksum is not valid then the recovery buffer was partially written to disk. 1173 if (checksum.getValue() != expectedChecksum) { 1174 return nextTxId; 1175 } 1176 1177 // Re-apply all the writes in the recovery buffer. 1178 for (Map.Entry<Long, byte[]> e : batch.entrySet()) { 1179 writeFile.seek(toOffset(e.getKey())); 1180 writeFile.write(e.getValue()); 1181 } 1182 1183 // And sync it to disk 1184 writeFile.getFD().sync(); 1185 return nextTxId; 1186 } 1187 1188 private void startWriter() { 1189 synchronized (writes) { 1190 if (enabledWriteThread) { 1191 stopWriter.set(false); 1192 writerThread = new Thread("KahaDB Page Writer") { 1193 @Override 1194 public void run() { 1195 pollWrites(); 1196 } 1197 }; 1198 writerThread.setPriority(Thread.MAX_PRIORITY); 1199 writerThread.setDaemon(true); 1200 writerThread.start(); 1201 } 1202 } 1203 } 1204 1205 private void stopWriter() throws InterruptedException { 1206 if (enabledWriteThread) { 1207 stopWriter.set(true); 1208 writerThread.join(); 1209 } 1210 } 1211 1212 public File getFile() { 1213 return getMainPageFile(); 1214 } 1215 1216 public File getDirectory() { 1217 return directory; 1218 } 1219}