001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.index.tree;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.RandomAccessFile;
022import java.util.concurrent.atomic.AtomicBoolean;
023import org.apache.activemq.kaha.Marshaller;
024import org.apache.activemq.kaha.StoreEntry;
025import org.apache.activemq.kaha.impl.index.Index;
026import org.apache.activemq.kaha.impl.index.IndexManager;
027import org.apache.activemq.util.DataByteArrayInputStream;
028import org.apache.activemq.util.DataByteArrayOutputStream;
029import org.apache.activemq.util.IOHelper;
030import org.apache.activemq.util.LRUCache;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * BTree implementation
036 * 
037 * 
038 */
039public class TreeIndex implements Index {
040
041    private static final String NAME_PREFIX = "tree-index-";
042    private static final int DEFAULT_PAGE_SIZE;
043    private static final int DEFAULT_KEY_SIZE;
044    private static final Logger LOG = LoggerFactory.getLogger(TreeIndex.class);
045    private final String name;
046    private File directory;
047    private File file;
048    private RandomAccessFile indexFile;
049    private IndexManager indexManager;
050    private int pageSize = DEFAULT_PAGE_SIZE;
051    private int keySize = DEFAULT_KEY_SIZE;
052    private int keysPerPage = pageSize / keySize;
053    private TreePage root;
054    private LRUCache<Long, TreePage> pageCache;
055    private DataByteArrayInputStream dataIn;
056    private DataByteArrayOutputStream dataOut;
057    private byte[] readBuffer;
058    private Marshaller keyMarshaller;
059    private long length;
060    private TreePage firstFree;
061    private TreePage lastFree;
062    private AtomicBoolean loaded = new AtomicBoolean();
063    private boolean enablePageCaching = true;
064    private int pageCacheSize = 10;
065
066    /**
067     * Constructor
068     * 
069     * @param directory
070     * @param name
071     * @param indexManager
072     * @throws IOException
073     */
074    public TreeIndex(File directory, String name, IndexManager indexManager) throws IOException {
075        this.directory = directory;
076        this.name = name;
077        this.indexManager = indexManager;
078        pageCache = new LRUCache<Long, TreePage>(pageCacheSize, pageCacheSize, 0.75f, true);
079        openIndexFile();
080    }
081
082    /**
083     * Set the marshaller for key objects
084     * 
085     * @param marshaller
086     */
087    public void setKeyMarshaller(Marshaller marshaller) {
088        this.keyMarshaller = marshaller;
089    }
090
091    /**
092     * @return the keySize
093     */
094    public int getKeySize() {
095        return this.keySize;
096    }
097
098    /**
099     * @param keySize the keySize to set
100     */
101    public void setKeySize(int keySize) {
102        this.keySize = keySize;
103        if (loaded.get()) {
104            throw new RuntimeException("Pages already loaded - can't reset key size");
105        }
106    }
107
108    /**
109     * @return the pageSize
110     */
111    public int getPageSize() {
112        return this.pageSize;
113    }
114
115    /**
116     * @param pageSize the pageSize to set
117     */
118    public void setPageSize(int pageSize) {
119        if (loaded.get() && pageSize != this.pageSize) {
120            throw new RuntimeException("Pages already loaded - can't reset page size");
121        }
122        this.pageSize = pageSize;
123    }
124
125    public boolean isTransient() {
126        return false;
127    }
128
129    /**
130     * @return the enablePageCaching
131     */
132    public boolean isEnablePageCaching() {
133        return this.enablePageCaching;
134    }
135
136    /**
137     * @param enablePageCaching the enablePageCaching to set
138     */
139    public void setEnablePageCaching(boolean enablePageCaching) {
140        this.enablePageCaching = enablePageCaching;
141    }
142
143    /**
144     * @return the pageCacheSize
145     */
146    public int getPageCacheSize() {
147        return this.pageCacheSize;
148    }
149
150    /**
151     * @param pageCacheSize the pageCacheSize to set
152     */
153    public void setPageCacheSize(int pageCacheSize) {
154        this.pageCacheSize = pageCacheSize;
155        pageCache.setMaxCacheSize(pageCacheSize);
156    }
157
158    public void load() {
159        if (loaded.compareAndSet(false, true)) {
160            keysPerPage = pageSize / keySize;
161            dataIn = new DataByteArrayInputStream();
162            dataOut = new DataByteArrayOutputStream(pageSize);
163            readBuffer = new byte[pageSize];
164            try {
165                openIndexFile();
166                long offset = 0;
167                while ((offset + pageSize) <= indexFile.length()) {
168                    indexFile.seek(offset);
169                    indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
170                    dataIn.restart(readBuffer);
171                    TreePage page = new TreePage(keysPerPage);
172                    page.setTree(this);
173                    page.setId(offset);
174                    page.readHeader(dataIn);
175                    if (!page.isActive()) {
176                        if (lastFree != null) {
177                            lastFree.setNextFreePageId(offset);
178                            indexFile.seek(lastFree.getId());
179                            dataOut.reset();
180                            lastFree.writeHeader(dataOut);
181                            indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
182                            lastFree = page;
183                        } else {
184                            lastFree = page;
185                            firstFree = page;
186                        }
187                    } else if (root == null && page.isRoot()) {
188                        root = getFullPage(offset);
189                    }
190                    offset += pageSize;
191                }
192                length = offset;
193                if (root == null) {
194                    root = createRoot();
195                }
196            } catch (IOException e) {
197                LOG.error("Failed to load index ", e);
198                throw new RuntimeException(e);
199            }
200        }
201    }
202
203    public void unload() throws IOException {
204        if (loaded.compareAndSet(true, false)) {
205            if (indexFile != null) {
206                indexFile.close();
207                indexFile = null;
208                pageCache.clear();
209                root = null;
210                firstFree = null;
211                lastFree = null;
212            }
213        }
214    }
215
216    public void store(Object key, StoreEntry value) throws IOException {
217        TreeEntry entry = new TreeEntry();
218        entry.setKey((Comparable)key);
219        entry.setIndexOffset(value.getOffset());
220        root.put(entry);
221    }
222
223    public StoreEntry get(Object key) throws IOException {
224        TreeEntry entry = new TreeEntry();
225        entry.setKey((Comparable)key);
226        TreeEntry result = root.find(entry);
227        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
228    }
229
230    public StoreEntry remove(Object key) throws IOException {
231        TreeEntry entry = new TreeEntry();
232        entry.setKey((Comparable)key);
233        TreeEntry result = root.remove(entry);
234        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
235    }
236
237    public boolean containsKey(Object key) throws IOException {
238        TreeEntry entry = new TreeEntry();
239        entry.setKey((Comparable)key);
240        return root.find(entry) != null;
241    }
242
243    public void clear() throws IOException {
244        unload();
245        delete();
246        openIndexFile();
247        load();
248    }
249
250    public void delete() throws IOException {
251        unload();
252        if (file.exists()) {
253            boolean result = file.delete();
254        }
255        length = 0;
256    }
257
258    /**
259     * @return the root
260     */
261    TreePage getRoot() {
262        return this.root;
263    }
264
265    TreePage lookupPage(long pageId) throws IOException {
266        TreePage result = null;
267        if (pageId >= 0) {
268            if (root != null && root.getId() == pageId) {
269                result = root;
270            } else {
271                result = getFromCache(pageId);
272            }
273            if (result == null) {
274                result = getFullPage(pageId);
275                if (result != null) {
276                    if (result.isActive()) {
277                        addToCache(result);
278                    } else {
279                        throw new IllegalStateException("Trying to access an inactive page: " + pageId + " root is " + root);
280                    }
281                }
282            }
283        }
284        return result;
285    }
286
287    TreePage createRoot() throws IOException {
288        TreePage result = createPage(-1);
289        root = result;
290        return result;
291    }
292
293    TreePage createPage(long parentId) throws IOException {
294        TreePage result = getNextFreePage();
295        if (result == null) {
296            // allocate one
297            result = new TreePage(keysPerPage);
298            result.setId(length);
299            result.setTree(this);
300            result.setParentId(parentId);
301            writePage(result);
302            length += pageSize;
303            indexFile.seek(length);
304            indexFile.write(TreeEntry.NOT_SET);
305        }
306        addToCache(result);
307        return result;
308    }
309
310    void releasePage(TreePage page) throws IOException {
311        removeFromCache(page);
312        page.reset();
313        page.setActive(false);
314        if (lastFree == null) {
315            firstFree = page;
316            lastFree = page;
317        } else {
318            lastFree.setNextFreePageId(page.getId());
319            writePage(lastFree);
320        }
321        writePage(page);
322    }
323
324    private TreePage getNextFreePage() throws IOException {
325        TreePage result = null;
326        if (firstFree != null) {
327            if (firstFree.equals(lastFree)) {
328                result = firstFree;
329                firstFree = null;
330                lastFree = null;
331            } else {
332                result = firstFree;
333                firstFree = getPage(firstFree.getNextFreePageId());
334                if (firstFree == null) {
335                    lastFree = null;
336                }
337            }
338            result.setActive(true);
339            result.reset();
340            result.saveHeader();
341        }
342        return result;
343    }
344
345    void writeFullPage(TreePage page) throws IOException {
346        dataOut.reset();
347        page.write(keyMarshaller, dataOut);
348        if (dataOut.size() > pageSize) {
349            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
350        }
351        indexFile.seek(page.getId());
352        indexFile.write(dataOut.getData(), 0, dataOut.size());
353    }
354
355    void writePage(TreePage page) throws IOException {
356        dataOut.reset();
357        page.writeHeader(dataOut);
358        indexFile.seek(page.getId());
359        indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
360    }
361
362    TreePage getFullPage(long id) throws IOException {
363        indexFile.seek(id);
364        indexFile.readFully(readBuffer, 0, pageSize);
365        dataIn.restart(readBuffer);
366        TreePage page = new TreePage(keysPerPage);
367        page.setId(id);
368        page.setTree(this);
369        page.read(keyMarshaller, dataIn);
370        return page;
371    }
372
373    TreePage getPage(long id) throws IOException {
374        indexFile.seek(id);
375        indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
376        dataIn.restart(readBuffer);
377        TreePage page = new TreePage(keysPerPage);
378        page.setId(id);
379        page.setTree(this);
380        page.readHeader(dataIn);
381        return page;
382    }
383
384    private TreePage getFromCache(long pageId) {
385        TreePage result = null;
386        if (enablePageCaching) {
387            result = pageCache.get(pageId);
388        }
389        return result;
390    }
391
392    private void addToCache(TreePage page) {
393        if (enablePageCaching) {
394            pageCache.put(page.getId(), page);
395        }
396    }
397
398    private void removeFromCache(TreePage page) {
399        if (enablePageCaching) {
400            pageCache.remove(page.getId());
401        }
402    }
403
404    protected void openIndexFile() throws IOException {
405        if (indexFile == null) {
406            file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
407            IOHelper.mkdirs(file.getParentFile());
408            indexFile = new RandomAccessFile(file, "rw");
409        }
410    }
411
412    static {
413        DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
414        DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
415    }
416
417    public int getSize() {
418        return 0;
419    }
420}