001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.kaha.impl.index.hash; 018 019import java.io.File; 020import java.io.IOException; 021import java.io.RandomAccessFile; 022import java.util.LinkedList; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.activemq.kaha.Marshaller; 026import org.apache.activemq.kaha.StoreEntry; 027import org.apache.activemq.kaha.impl.index.Index; 028import org.apache.activemq.kaha.impl.index.IndexManager; 029import org.apache.activemq.util.DataByteArrayInputStream; 030import org.apache.activemq.util.DataByteArrayOutputStream; 031import org.apache.activemq.util.IOHelper; 032import org.apache.activemq.util.LRUCache; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * BTree implementation 038 * 039 * 040 */ 041public class HashIndex implements Index, HashIndexMBean { 042 public static final int DEFAULT_PAGE_SIZE; 043 public static final int DEFAULT_KEY_SIZE; 044 public static final int DEFAULT_BIN_SIZE; 045 public static final int MAXIMUM_CAPACITY; 046 public static final int DEFAULT_LOAD_FACTOR; 047 private static final int LOW_WATER_MARK=1024*16; 048 private static final String NAME_PREFIX = "hash-index-"; 049 private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class); 050 private final String name; 051 private File directory; 052 private File file; 053 private RandomAccessFile indexFile; 054 private IndexManager indexManager; 055 private int pageSize = DEFAULT_PAGE_SIZE; 056 private int keySize = DEFAULT_KEY_SIZE; 057 private int numberOfBins = DEFAULT_BIN_SIZE; 058 private int keysPerPage = this.pageSize /this.keySize; 059 private DataByteArrayInputStream dataIn; 060 private DataByteArrayOutputStream dataOut; 061 private byte[] readBuffer; 062 private HashBin[] bins; 063 private Marshaller keyMarshaller; 064 private long length; 065 private LinkedList<HashPage> freeList = new LinkedList<HashPage>(); 066 private AtomicBoolean loaded = new AtomicBoolean(); 067 private LRUCache<Long, HashPage> pageCache; 068 private boolean enablePageCaching=false;//this is off by default - see AMQ-1667 069 private int pageCacheSize = 10; 070 private int size; 071 private int highestSize=0; 072 private int activeBins; 073 private int threshold; 074 private int maximumCapacity=MAXIMUM_CAPACITY; 075 private int loadFactor=DEFAULT_LOAD_FACTOR; 076 077 078 /** 079 * Constructor 080 * 081 * @param directory 082 * @param name 083 * @param indexManager 084 * @throws IOException 085 */ 086 public HashIndex(File directory, String name, IndexManager indexManager) throws IOException { 087 this.directory = directory; 088 this.name = name; 089 this.indexManager = indexManager; 090 openIndexFile(); 091 pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true); 092 } 093 094 /** 095 * Set the marshaller for key objects 096 * 097 * @param marshaller 098 */ 099 public synchronized void setKeyMarshaller(Marshaller marshaller) { 100 this.keyMarshaller = marshaller; 101 } 102 103 /** 104 * @return the keySize 105 */ 106 public synchronized int getKeySize() { 107 return this.keySize; 108 } 109 110 /** 111 * @param keySize the keySize to set 112 */ 113 public synchronized void setKeySize(int keySize) { 114 this.keySize = keySize; 115 if (loaded.get()) { 116 throw new RuntimeException("Pages already loaded - can't reset key size"); 117 } 118 } 119 120 /** 121 * @return the pageSize 122 */ 123 public synchronized int getPageSize() { 124 return this.pageSize; 125 } 126 127 /** 128 * @param pageSize the pageSize to set 129 */ 130 public synchronized void setPageSize(int pageSize) { 131 if (loaded.get() && pageSize != this.pageSize) { 132 throw new RuntimeException("Pages already loaded - can't reset page size"); 133 } 134 this.pageSize = pageSize; 135 } 136 137 /** 138 * @return number of bins 139 */ 140 public int getNumberOfBins() { 141 return this.numberOfBins; 142 } 143 144 /** 145 * @param numberOfBins 146 */ 147 public void setNumberOfBins(int numberOfBins) { 148 if (loaded.get() && numberOfBins != this.numberOfBins) { 149 throw new RuntimeException("Pages already loaded - can't reset bin size"); 150 } 151 this.numberOfBins = numberOfBins; 152 } 153 154 /** 155 * @return the enablePageCaching 156 */ 157 public synchronized boolean isEnablePageCaching() { 158 return this.enablePageCaching; 159 } 160 161 /** 162 * @param enablePageCaching the enablePageCaching to set 163 */ 164 public synchronized void setEnablePageCaching(boolean enablePageCaching) { 165 this.enablePageCaching = enablePageCaching; 166 } 167 168 /** 169 * @return the pageCacheSize 170 */ 171 public synchronized int getPageCacheSize() { 172 return this.pageCacheSize; 173 } 174 175 /** 176 * @param pageCacheSize the pageCacheSize to set 177 */ 178 public synchronized void setPageCacheSize(int pageCacheSize) { 179 this.pageCacheSize = pageCacheSize; 180 pageCache.setMaxCacheSize(pageCacheSize); 181 } 182 183 public synchronized boolean isTransient() { 184 return false; 185 } 186 187 /** 188 * @return the threshold 189 */ 190 public int getThreshold() { 191 return threshold; 192 } 193 194 /** 195 * @param threshold the threshold to set 196 */ 197 public void setThreshold(int threshold) { 198 this.threshold = threshold; 199 } 200 201 /** 202 * @return the loadFactor 203 */ 204 public int getLoadFactor() { 205 return loadFactor; 206 } 207 208 /** 209 * @param loadFactor the loadFactor to set 210 */ 211 public void setLoadFactor(int loadFactor) { 212 this.loadFactor = loadFactor; 213 } 214 215 /** 216 * @return the maximumCapacity 217 */ 218 public int getMaximumCapacity() { 219 return maximumCapacity; 220 } 221 222 /** 223 * @param maximumCapacity the maximumCapacity to set 224 */ 225 public void setMaximumCapacity(int maximumCapacity) { 226 this.maximumCapacity = maximumCapacity; 227 } 228 229 public synchronized int getSize() { 230 return size; 231 } 232 233 public synchronized int getActiveBins(){ 234 return activeBins; 235 } 236 237 public synchronized void load() { 238 if (loaded.compareAndSet(false, true)) { 239 int capacity = 1; 240 while (capacity < numberOfBins) { 241 capacity <<= 1; 242 } 243 this.bins = new HashBin[capacity]; 244 this.numberOfBins=capacity; 245 threshold = calculateThreashold(); 246 keysPerPage = pageSize / keySize; 247 dataIn = new DataByteArrayInputStream(); 248 dataOut = new DataByteArrayOutputStream(pageSize); 249 readBuffer = new byte[pageSize]; 250 try { 251 openIndexFile(); 252 if (indexFile.length() > 0) { 253 doCompress(); 254 } 255 } catch (IOException e) { 256 LOG.error("Failed to load index ", e); 257 throw new RuntimeException(e); 258 } 259 } 260 } 261 262 public synchronized void unload() throws IOException { 263 if (loaded.compareAndSet(true, false)) { 264 if (indexFile != null) { 265 indexFile.close(); 266 indexFile = null; 267 freeList.clear(); 268 pageCache.clear(); 269 bins = new HashBin[bins.length]; 270 } 271 } 272 } 273 274 public synchronized void store(Object key, StoreEntry value) throws IOException { 275 load(); 276 HashEntry entry = new HashEntry(); 277 entry.setKey((Comparable)key); 278 entry.setIndexOffset(value.getOffset()); 279 if (!getBin(key).put(entry)) { 280 this.size++; 281 } 282 if (this.size >= this.threshold) { 283 resize(2*bins.length); 284 } 285 if(this.size > this.highestSize) { 286 this.highestSize=this.size; 287 } 288 } 289 290 public synchronized StoreEntry get(Object key) throws IOException { 291 load(); 292 HashEntry entry = new HashEntry(); 293 entry.setKey((Comparable)key); 294 HashEntry result = getBin(key).find(entry); 295 return result != null ? indexManager.getIndex(result.getIndexOffset()) : null; 296 } 297 298 public synchronized StoreEntry remove(Object key) throws IOException { 299 load(); 300 StoreEntry result = null; 301 HashEntry entry = new HashEntry(); 302 entry.setKey((Comparable)key); 303 HashEntry he = getBin(key).remove(entry); 304 if (he != null) { 305 this.size--; 306 result = this.indexManager.getIndex(he.getIndexOffset()); 307 } 308 if (this.highestSize > LOW_WATER_MARK && this.highestSize > (this.size *2)) { 309 int newSize = this.size/this.keysPerPage; 310 newSize = Math.max(128, newSize); 311 this.highestSize=0; 312 resize(newSize); 313 314 } 315 return result; 316 } 317 318 public synchronized boolean containsKey(Object key) throws IOException { 319 return get(key) != null; 320 } 321 322 public synchronized void clear() throws IOException { 323 unload(); 324 delete(); 325 openIndexFile(); 326 load(); 327 } 328 329 public synchronized void delete() throws IOException { 330 unload(); 331 if (file.exists()) { 332 file.delete(); 333 } 334 length = 0; 335 } 336 337 HashPage lookupPage(long pageId) throws IOException { 338 HashPage result = null; 339 if (pageId >= 0) { 340 result = getFromCache(pageId); 341 if (result == null) { 342 result = getFullPage(pageId); 343 if (result != null) { 344 if (result.isActive()) { 345 addToCache(result); 346 } else { 347 throw new IllegalStateException("Trying to access an inactive page: " + pageId); 348 } 349 } 350 } 351 } 352 return result; 353 } 354 355 HashPage createPage(int binId) throws IOException { 356 HashPage result = getNextFreePage(); 357 if (result == null) { 358 // allocate one 359 result = new HashPage(keysPerPage); 360 result.setId(length); 361 result.setBinId(binId); 362 writePageHeader(result); 363 length += pageSize; 364 indexFile.seek(length); 365 indexFile.write(HashEntry.NOT_SET); 366 } 367 addToCache(result); 368 return result; 369 } 370 371 void releasePage(HashPage page) throws IOException { 372 removeFromCache(page); 373 page.reset(); 374 page.setActive(false); 375 writePageHeader(page); 376 freeList.add(page); 377 } 378 379 private HashPage getNextFreePage() throws IOException { 380 HashPage result = null; 381 if(!freeList.isEmpty()) { 382 result = freeList.removeFirst(); 383 result.setActive(true); 384 result.reset(); 385 writePageHeader(result); 386 } 387 return result; 388 } 389 390 void writeFullPage(HashPage page) throws IOException { 391 dataOut.reset(); 392 page.write(keyMarshaller, dataOut); 393 if (dataOut.size() > pageSize) { 394 throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size()); 395 } 396 indexFile.seek(page.getId()); 397 indexFile.write(dataOut.getData(), 0, dataOut.size()); 398 } 399 400 void writePageHeader(HashPage page) throws IOException { 401 dataOut.reset(); 402 page.writeHeader(dataOut); 403 indexFile.seek(page.getId()); 404 indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE); 405 } 406 407 HashPage getFullPage(long id) throws IOException { 408 indexFile.seek(id); 409 indexFile.readFully(readBuffer, 0, pageSize); 410 dataIn.restart(readBuffer); 411 HashPage page = new HashPage(keysPerPage); 412 page.setId(id); 413 page.read(keyMarshaller, dataIn); 414 return page; 415 } 416 417 HashPage getPageHeader(long id) throws IOException { 418 indexFile.seek(id); 419 indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE); 420 dataIn.restart(readBuffer); 421 HashPage page = new HashPage(keysPerPage); 422 page.setId(id); 423 page.readHeader(dataIn); 424 return page; 425 } 426 427 void addToBin(HashPage page) throws IOException { 428 int index = page.getBinId(); 429 if (index >= this.bins.length) { 430 resize(index+1); 431 } 432 HashBin bin = getBin(index); 433 bin.addHashPageInfo(page.getId(), page.getPersistedSize()); 434 } 435 436 private HashBin getBin(int index) { 437 438 HashBin result = bins[index]; 439 if (result == null) { 440 result = new HashBin(this, index, pageSize / keySize); 441 bins[index] = result; 442 activeBins++; 443 } 444 return result; 445 } 446 447 private void openIndexFile() throws IOException { 448 if (indexFile == null) { 449 file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name)); 450 IOHelper.mkdirs(file.getParentFile()); 451 indexFile = new RandomAccessFile(file, "rw"); 452 } 453 } 454 455 private HashBin getBin(Object key) { 456 int hash = hash(key); 457 int i = indexFor(hash, bins.length); 458 return getBin(i); 459 } 460 461 private HashPage getFromCache(long pageId) { 462 HashPage result = null; 463 if (enablePageCaching) { 464 result = pageCache.get(pageId); 465 } 466 return result; 467 } 468 469 private void addToCache(HashPage page) { 470 if (enablePageCaching) { 471 pageCache.put(page.getId(), page); 472 } 473 } 474 475 private void removeFromCache(HashPage page) { 476 if (enablePageCaching) { 477 pageCache.remove(page.getId()); 478 } 479 } 480 481 private void doLoad() throws IOException { 482 long offset = 0; 483 if (loaded.compareAndSet(false, true)) { 484 while ((offset + pageSize) <= indexFile.length()) { 485 indexFile.seek(offset); 486 indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE); 487 dataIn.restart(readBuffer); 488 HashPage page = new HashPage(keysPerPage); 489 page.setId(offset); 490 page.readHeader(dataIn); 491 if (!page.isActive()) { 492 page.reset(); 493 freeList.add(page); 494 } else { 495 addToBin(page); 496 size+=page.size(); 497 } 498 offset += pageSize; 499 } 500 length=offset; 501 } 502 } 503 504 private void doCompress() throws IOException { 505 String backFileName = name + "-COMPRESS"; 506 HashIndex backIndex = new HashIndex(directory,backFileName,indexManager); 507 backIndex.setKeyMarshaller(keyMarshaller); 508 backIndex.setKeySize(getKeySize()); 509 backIndex.setNumberOfBins(getNumberOfBins()); 510 backIndex.setPageSize(getPageSize()); 511 backIndex.load(); 512 File backFile = backIndex.file; 513 long offset = 0; 514 while ((offset + pageSize) <= indexFile.length()) { 515 indexFile.seek(offset); 516 HashPage page = getFullPage(offset); 517 if (page.isActive()) { 518 for (HashEntry entry : page.getEntries()) { 519 backIndex.getBin(entry.getKey()).put(entry); 520 backIndex.size++; 521 } 522 } 523 page=null; 524 offset += pageSize; 525 } 526 backIndex.unload(); 527 528 unload(); 529 IOHelper.deleteFile(file); 530 IOHelper.copyFile(backFile, file); 531 IOHelper.deleteFile(backFile); 532 openIndexFile(); 533 doLoad(); 534 } 535 536 private void resize(int newCapacity) throws IOException { 537 if (bins.length < getMaximumCapacity()) { 538 if (newCapacity != numberOfBins) { 539 int capacity = 1; 540 while (capacity < newCapacity) { 541 capacity <<= 1; 542 } 543 newCapacity=capacity; 544 if (newCapacity != numberOfBins) { 545 LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity); 546 String backFileName = name + "-REISZE"; 547 HashIndex backIndex = new HashIndex(directory,backFileName,indexManager); 548 backIndex.setKeyMarshaller(keyMarshaller); 549 backIndex.setKeySize(getKeySize()); 550 backIndex.setNumberOfBins(newCapacity); 551 backIndex.setPageSize(getPageSize()); 552 backIndex.load(); 553 File backFile = backIndex.file; 554 long offset = 0; 555 while ((offset + pageSize) <= indexFile.length()) { 556 indexFile.seek(offset); 557 HashPage page = getFullPage(offset); 558 if (page.isActive()) { 559 for (HashEntry entry : page.getEntries()) { 560 backIndex.getBin(entry.getKey()).put(entry); 561 backIndex.size++; 562 } 563 } 564 page=null; 565 offset += pageSize; 566 } 567 backIndex.unload(); 568 569 unload(); 570 IOHelper.deleteFile(file); 571 IOHelper.copyFile(backFile, file); 572 IOHelper.deleteFile(backFile); 573 setNumberOfBins(newCapacity); 574 bins = new HashBin[newCapacity]; 575 threshold = calculateThreashold(); 576 openIndexFile(); 577 doLoad(); 578 } 579 } 580 }else { 581 threshold = Integer.MAX_VALUE; 582 return; 583 } 584 } 585 586 private int calculateThreashold() { 587 return (int)(bins.length * loadFactor); 588 } 589 590 591 public String toString() { 592 String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName(); 593 return str; 594 } 595 596 597 static int hash(Object x) { 598 int h = x.hashCode(); 599 h += ~(h << 9); 600 h ^= h >>> 14; 601 h += h << 4; 602 h ^= h >>> 10; 603 return h; 604 } 605 606 static int indexFor(int h, int length) { 607 return h & (length - 1); 608 } 609 610 static { 611 DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024")); 612 DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96")); 613 DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024")); 614 MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384")); 615 DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50")); 616 } 617}