001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.index.hash;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.RandomAccessFile;
022import java.util.LinkedList;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.kaha.Marshaller;
026import org.apache.activemq.kaha.StoreEntry;
027import org.apache.activemq.kaha.impl.index.Index;
028import org.apache.activemq.kaha.impl.index.IndexManager;
029import org.apache.activemq.util.DataByteArrayInputStream;
030import org.apache.activemq.util.DataByteArrayOutputStream;
031import org.apache.activemq.util.IOHelper;
032import org.apache.activemq.util.LRUCache;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * BTree implementation
038 * 
039 * 
040 */
041public class HashIndex implements Index, HashIndexMBean {
042    public static final int DEFAULT_PAGE_SIZE;
043    public static final int DEFAULT_KEY_SIZE;
044    public static final int DEFAULT_BIN_SIZE;
045    public static final int MAXIMUM_CAPACITY;
046    public static final int DEFAULT_LOAD_FACTOR;
047    private static final int LOW_WATER_MARK=1024*16;
048    private static final String NAME_PREFIX = "hash-index-";
049    private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
050    private final String name;
051    private File directory;
052    private File file;
053    private RandomAccessFile indexFile;
054    private IndexManager indexManager;
055    private int pageSize = DEFAULT_PAGE_SIZE;
056    private int keySize = DEFAULT_KEY_SIZE;
057    private int numberOfBins = DEFAULT_BIN_SIZE;
058    private int keysPerPage = this.pageSize /this.keySize;
059    private DataByteArrayInputStream dataIn;
060    private DataByteArrayOutputStream dataOut;
061    private byte[] readBuffer;
062    private HashBin[] bins;
063    private Marshaller keyMarshaller;
064    private long length;
065    private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
066    private AtomicBoolean loaded = new AtomicBoolean();
067    private LRUCache<Long, HashPage> pageCache;
068    private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
069    private int pageCacheSize = 10;
070    private int size;
071    private int highestSize=0;
072    private int activeBins;
073    private int threshold;
074    private int maximumCapacity=MAXIMUM_CAPACITY;
075    private int loadFactor=DEFAULT_LOAD_FACTOR;
076    
077    
078    /**
079     * Constructor
080     * 
081     * @param directory
082     * @param name
083     * @param indexManager
084     * @throws IOException
085     */
086    public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
087        this.directory = directory;
088        this.name = name;
089        this.indexManager = indexManager;
090        openIndexFile();
091        pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
092    }
093
094    /**
095     * Set the marshaller for key objects
096     * 
097     * @param marshaller
098     */
099    public synchronized void setKeyMarshaller(Marshaller marshaller) {
100        this.keyMarshaller = marshaller;
101    }
102
103    /**
104     * @return the keySize
105     */
106    public synchronized int getKeySize() {
107        return this.keySize;
108    }
109
110    /**
111     * @param keySize the keySize to set
112     */
113    public synchronized void setKeySize(int keySize) {
114        this.keySize = keySize;
115        if (loaded.get()) {
116            throw new RuntimeException("Pages already loaded - can't reset key size");
117        }
118    }
119
120    /**
121     * @return the pageSize
122     */
123    public synchronized int getPageSize() {
124        return this.pageSize;
125    }
126
127    /**
128     * @param pageSize the pageSize to set
129     */
130    public synchronized void setPageSize(int pageSize) {
131        if (loaded.get() && pageSize != this.pageSize) {
132            throw new RuntimeException("Pages already loaded - can't reset page size");
133        }
134        this.pageSize = pageSize;
135    }
136    
137    /**
138     * @return number of bins
139     */
140    public int getNumberOfBins() {
141        return this.numberOfBins;
142    }
143
144    /**
145     * @param numberOfBins
146     */
147    public void setNumberOfBins(int numberOfBins) {
148        if (loaded.get() && numberOfBins != this.numberOfBins) {
149            throw new RuntimeException("Pages already loaded - can't reset bin size");
150        }
151        this.numberOfBins = numberOfBins;
152    }
153
154    /**
155     * @return the enablePageCaching
156     */
157    public synchronized boolean isEnablePageCaching() {
158        return this.enablePageCaching;
159    }
160
161    /**
162     * @param enablePageCaching the enablePageCaching to set
163     */
164    public synchronized void setEnablePageCaching(boolean enablePageCaching) {
165        this.enablePageCaching = enablePageCaching;
166    }
167
168    /**
169     * @return the pageCacheSize
170     */
171    public synchronized int getPageCacheSize() {
172        return this.pageCacheSize;
173    }
174
175    /**
176     * @param pageCacheSize the pageCacheSize to set
177     */
178    public synchronized void setPageCacheSize(int pageCacheSize) {
179        this.pageCacheSize = pageCacheSize;
180        pageCache.setMaxCacheSize(pageCacheSize);
181    }
182
183    public synchronized boolean isTransient() {
184        return false;
185    }
186    
187    /**
188     * @return the threshold
189     */
190    public int getThreshold() {
191        return threshold;
192    }
193
194    /**
195     * @param threshold the threshold to set
196     */
197    public void setThreshold(int threshold) {
198        this.threshold = threshold;
199    }
200
201    /**
202     * @return the loadFactor
203     */
204    public int getLoadFactor() {
205        return loadFactor;
206    }
207
208    /**
209     * @param loadFactor the loadFactor to set
210     */
211    public void setLoadFactor(int loadFactor) {
212        this.loadFactor = loadFactor;
213    }
214    
215    /**
216     * @return the maximumCapacity
217     */
218    public int getMaximumCapacity() {
219        return maximumCapacity;
220    }
221
222    /**
223     * @param maximumCapacity the maximumCapacity to set
224     */
225    public void setMaximumCapacity(int maximumCapacity) {
226        this.maximumCapacity = maximumCapacity;
227    }
228    
229    public synchronized int getSize() {
230        return size;
231    }
232    
233    public synchronized int getActiveBins(){
234        return activeBins;
235    }
236
237    public synchronized void load() {
238        if (loaded.compareAndSet(false, true)) {
239            int capacity = 1;
240            while (capacity < numberOfBins) {
241                capacity <<= 1;
242            }
243            this.bins = new HashBin[capacity];
244            this.numberOfBins=capacity;
245            threshold = calculateThreashold();
246            keysPerPage = pageSize / keySize;
247            dataIn = new DataByteArrayInputStream();
248            dataOut = new DataByteArrayOutputStream(pageSize);
249            readBuffer = new byte[pageSize];
250            try {
251                openIndexFile();
252                if (indexFile.length() > 0) {
253                    doCompress();
254                }
255            } catch (IOException e) {
256                LOG.error("Failed to load index ", e);
257                throw new RuntimeException(e);
258            }
259        }
260    }    
261
262    public synchronized void unload() throws IOException {
263        if (loaded.compareAndSet(true, false)) {
264            if (indexFile != null) {
265                indexFile.close();
266                indexFile = null;
267                freeList.clear();
268                pageCache.clear();
269                bins = new HashBin[bins.length];
270            }
271        }
272    }
273
274    public synchronized void store(Object key, StoreEntry value) throws IOException {
275        load();
276        HashEntry entry = new HashEntry();
277        entry.setKey((Comparable)key);
278        entry.setIndexOffset(value.getOffset());
279        if (!getBin(key).put(entry)) {
280            this.size++;
281        }
282        if (this.size >= this.threshold) {
283            resize(2*bins.length);
284        }
285        if(this.size > this.highestSize) {
286            this.highestSize=this.size;
287        }
288    }
289
290    public synchronized StoreEntry get(Object key) throws IOException {
291        load();
292        HashEntry entry = new HashEntry();
293        entry.setKey((Comparable)key);
294        HashEntry result = getBin(key).find(entry);
295        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
296    }
297
298    public synchronized StoreEntry remove(Object key) throws IOException {
299        load();
300        StoreEntry result = null;
301        HashEntry entry = new HashEntry();
302        entry.setKey((Comparable)key);
303        HashEntry he = getBin(key).remove(entry);
304        if (he != null) {
305            this.size--;
306            result = this.indexManager.getIndex(he.getIndexOffset());
307        }
308        if (this.highestSize > LOW_WATER_MARK &&  this.highestSize > (this.size *2)) {
309            int newSize = this.size/this.keysPerPage;
310            newSize = Math.max(128, newSize);
311            this.highestSize=0;
312            resize(newSize);
313            
314        }
315        return result;
316    }
317
318    public synchronized boolean containsKey(Object key) throws IOException {
319        return get(key) != null;
320    }
321
322    public synchronized void clear() throws IOException {
323        unload();
324        delete();
325        openIndexFile();
326        load();
327    }
328
329    public synchronized void delete() throws IOException {
330        unload();
331        if (file.exists()) {
332            file.delete();
333        }
334        length = 0;
335    }
336
337    HashPage lookupPage(long pageId) throws IOException {
338        HashPage result = null;
339        if (pageId >= 0) {
340            result = getFromCache(pageId);
341            if (result == null) {
342                result = getFullPage(pageId);
343                if (result != null) {
344                    if (result.isActive()) {
345                        addToCache(result);
346                    } else {
347                        throw new IllegalStateException("Trying to access an inactive page: " + pageId);
348                    }
349                }
350            }
351        }
352        return result;
353    }
354
355    HashPage createPage(int binId) throws IOException {
356        HashPage result = getNextFreePage();
357        if (result == null) {  
358            // allocate one
359            result = new HashPage(keysPerPage);
360            result.setId(length);
361            result.setBinId(binId);
362            writePageHeader(result);
363            length += pageSize;
364            indexFile.seek(length);
365            indexFile.write(HashEntry.NOT_SET);
366        }
367        addToCache(result);
368        return result;
369    }
370
371    void releasePage(HashPage page) throws IOException {
372        removeFromCache(page);
373        page.reset();
374        page.setActive(false);
375        writePageHeader(page);
376        freeList.add(page);
377    }
378
379    private HashPage getNextFreePage() throws IOException {
380        HashPage result = null;
381        if(!freeList.isEmpty()) {
382            result = freeList.removeFirst();
383            result.setActive(true);
384            result.reset();
385            writePageHeader(result);
386        }
387        return result;
388    }
389
390    void writeFullPage(HashPage page) throws IOException {
391        dataOut.reset();
392        page.write(keyMarshaller, dataOut);
393        if (dataOut.size() > pageSize) {
394            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
395        }
396        indexFile.seek(page.getId());
397        indexFile.write(dataOut.getData(), 0, dataOut.size());
398    }
399
400    void writePageHeader(HashPage page) throws IOException {
401        dataOut.reset();
402        page.writeHeader(dataOut);
403        indexFile.seek(page.getId());
404        indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE);
405    }
406
407    HashPage getFullPage(long id) throws IOException {
408        indexFile.seek(id);
409        indexFile.readFully(readBuffer, 0, pageSize);
410        dataIn.restart(readBuffer);
411        HashPage page = new HashPage(keysPerPage);
412        page.setId(id);
413        page.read(keyMarshaller, dataIn);
414        return page;
415    }
416
417    HashPage getPageHeader(long id) throws IOException {
418        indexFile.seek(id);
419        indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
420        dataIn.restart(readBuffer);
421        HashPage page = new HashPage(keysPerPage);
422        page.setId(id);
423        page.readHeader(dataIn);
424        return page;
425    }
426
427    void addToBin(HashPage page) throws IOException {
428        int index = page.getBinId();
429        if (index >= this.bins.length) {
430            resize(index+1);
431        }
432        HashBin bin = getBin(index);
433        bin.addHashPageInfo(page.getId(), page.getPersistedSize());
434    }
435
436    private HashBin getBin(int index) {
437        
438        HashBin result = bins[index];
439        if (result == null) {
440            result = new HashBin(this, index, pageSize / keySize);
441            bins[index] = result;
442            activeBins++;
443        }
444        return result;
445    }
446
447    private void openIndexFile() throws IOException {
448        if (indexFile == null) {
449            file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
450            IOHelper.mkdirs(file.getParentFile());
451            indexFile = new RandomAccessFile(file, "rw");
452        }
453    }
454    
455    private HashBin getBin(Object key) {
456        int hash = hash(key);
457        int i = indexFor(hash, bins.length);
458        return getBin(i);
459    }
460
461    private HashPage getFromCache(long pageId) {
462        HashPage result = null;
463        if (enablePageCaching) {
464            result = pageCache.get(pageId);
465        }
466        return result;
467    }
468
469    private void addToCache(HashPage page) {
470        if (enablePageCaching) {
471            pageCache.put(page.getId(), page);
472        }
473    }
474
475    private void removeFromCache(HashPage page) {
476        if (enablePageCaching) {
477            pageCache.remove(page.getId());
478        }
479    }
480    
481    private void doLoad() throws IOException {
482        long offset = 0;
483        if (loaded.compareAndSet(false, true)) {
484            while ((offset + pageSize) <= indexFile.length()) {
485                indexFile.seek(offset);
486                indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
487                dataIn.restart(readBuffer);
488                HashPage page = new HashPage(keysPerPage);
489                page.setId(offset);
490                page.readHeader(dataIn);
491                if (!page.isActive()) {
492                    page.reset();
493                    freeList.add(page);
494                } else {
495                    addToBin(page);
496                    size+=page.size();
497                }
498                offset += pageSize;
499            }
500            length=offset;
501        }
502    }
503    
504    private void doCompress() throws IOException {
505        String backFileName = name + "-COMPRESS";
506        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
507        backIndex.setKeyMarshaller(keyMarshaller);
508        backIndex.setKeySize(getKeySize());
509        backIndex.setNumberOfBins(getNumberOfBins());
510        backIndex.setPageSize(getPageSize());
511        backIndex.load();
512        File backFile = backIndex.file;
513        long offset = 0;
514        while ((offset + pageSize) <= indexFile.length()) {
515            indexFile.seek(offset);
516            HashPage page = getFullPage(offset);
517            if (page.isActive()) {
518                for (HashEntry entry : page.getEntries()) {
519                    backIndex.getBin(entry.getKey()).put(entry);
520                    backIndex.size++;
521                }
522            }
523            page=null;
524            offset += pageSize;
525        }
526        backIndex.unload();
527      
528        unload();
529        IOHelper.deleteFile(file);
530        IOHelper.copyFile(backFile, file);
531        IOHelper.deleteFile(backFile);
532        openIndexFile();
533        doLoad();
534    }
535    
536    private void resize(int newCapacity) throws IOException {
537        if (bins.length < getMaximumCapacity()) {
538            if (newCapacity != numberOfBins) {
539                int capacity = 1;
540                while (capacity < newCapacity) {
541                    capacity <<= 1;
542                }
543                newCapacity=capacity;
544                if (newCapacity != numberOfBins) {
545                    LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
546                    String backFileName = name + "-REISZE";
547                    HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
548                    backIndex.setKeyMarshaller(keyMarshaller);
549                    backIndex.setKeySize(getKeySize());
550                    backIndex.setNumberOfBins(newCapacity);
551                    backIndex.setPageSize(getPageSize());
552                    backIndex.load();
553                    File backFile = backIndex.file;
554                    long offset = 0;
555                    while ((offset + pageSize) <= indexFile.length()) {
556                        indexFile.seek(offset);
557                        HashPage page = getFullPage(offset);
558                        if (page.isActive()) {
559                            for (HashEntry entry : page.getEntries()) {
560                                backIndex.getBin(entry.getKey()).put(entry);
561                                backIndex.size++;
562                            }
563                        }
564                        page=null;
565                        offset += pageSize;
566                    }
567                    backIndex.unload();
568                  
569                    unload();
570                    IOHelper.deleteFile(file);
571                    IOHelper.copyFile(backFile, file);
572                    IOHelper.deleteFile(backFile);
573                    setNumberOfBins(newCapacity);
574                    bins = new HashBin[newCapacity];
575                    threshold = calculateThreashold();
576                    openIndexFile();
577                    doLoad();
578                }
579            }
580        }else {
581            threshold = Integer.MAX_VALUE;
582            return;
583        }
584    }
585    
586    private int calculateThreashold() {
587        return (int)(bins.length * loadFactor);
588    }
589    
590    
591    public String toString() {
592        String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
593        return str;
594    }
595      
596
597    static int hash(Object x) {
598        int h = x.hashCode();
599        h += ~(h << 9);
600        h ^= h >>> 14;
601        h += h << 4;
602        h ^= h >>> 10;
603        return h;
604    }
605
606    static int indexFor(int h, int length) {
607        return h & (length - 1);
608    }
609
610    static {
611        DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
612        DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
613        DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
614        MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
615        DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
616    }
617}