001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.page;
018
019import org.apache.kahadb.page.PageFile.PageWrite;
020import org.apache.kahadb.util.*;
021
022import java.io.*;
023import java.util.Iterator;
024import java.util.NoSuchElementException;
025import java.util.TreeMap;
026
027/**
028 * The class used to read/update a PageFile object.  Using a transaction allows you to
029 * do multiple update operations in a single unit of work.
030 */
031public class Transaction implements Iterable<Page> {
032
033    private RandomAccessFile tmpFile;
034    private File txFile;
035    private long nextLocation = 0;
036
037    /**
038     * The PageOverflowIOException occurs when a page write is requested
039     * and it's data is larger than what would fit into a single page.
040     */
041    public class PageOverflowIOException extends IOException {
042        private static final long serialVersionUID = 1L;
043
044        public PageOverflowIOException(String message) {
045            super(message);
046        }
047    }
048
049    /**
050     * The InvalidPageIOException is thrown if try to load/store a a page
051     * with an invalid page id.
052     */
053    public class InvalidPageIOException extends IOException {
054        private static final long serialVersionUID = 1L;
055
056        private final long page;
057
058        public InvalidPageIOException(String message, long page) {
059            super(message);
060            this.page = page;
061        }
062
063        public long getPage() {
064            return page;
065        }
066    }
067
068    /**
069     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
070     *
071     * @param <T> The type of exceptions that operation will throw.
072     */
073    public interface Closure <T extends Throwable> {
074        public void execute(Transaction tx) throws T;
075    }
076
077    /**
078     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
079     *
080     * @param <R> The type of result that the closure produces.
081     * @param <T> The type of exceptions that operation will throw.
082     */
083    public interface CallableClosure<R, T extends Throwable> {
084        public R execute(Transaction tx) throws T;
085    }
086
087
088    // The page file that this Transaction operates against.
089    private final PageFile pageFile;
090    // If this transaction is updating stuff.. this is the tx of
091    private long writeTransactionId=-1;
092    // List of pages that this transaction has modified.
093    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
094    // List of pages allocated in this transaction
095    private final SequenceSet allocateList = new SequenceSet();
096    // List of pages freed in this transaction
097    private final SequenceSet freeList = new SequenceSet();
098
099    private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
100
101    private long size = 0;
102
103    Transaction(PageFile pageFile) {
104        this.pageFile = pageFile;
105    }
106
107    /**
108     * @return the page file that created this Transaction
109     */
110    public PageFile getPageFile() {
111        return this.pageFile;
112    }
113
114    /**
115     * Allocates a free page that you can write data to.
116     *
117     * @return a newly allocated page.
118     * @throws IOException
119     *         If an disk error occurred.
120     * @throws IllegalStateException
121     *         if the PageFile is not loaded
122     */
123    public <T> Page<T> allocate() throws IOException {
124        return allocate(1);
125    }
126
127    /**
128     * Allocates a block of free pages that you can write data to.
129     *
130     * @param count the number of sequential pages to allocate
131     * @return the first page of the sequential set.
132     * @throws IOException
133     *         If an disk error occurred.
134     * @throws IllegalStateException
135     *         if the PageFile is not loaded
136     */
137    public <T> Page<T> allocate(int count) throws IOException {
138        Page<T> rc = pageFile.allocate(count);
139        allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
140        return rc;
141    }
142
143    /**
144     * Frees up a previously allocated page so that it can be re-allocated again.
145     *
146     * @param pageId the page to free up
147     * @throws IOException
148     *         If an disk error occurred.
149     * @throws IllegalStateException
150     *         if the PageFile is not loaded
151     */
152    public void free(long pageId) throws IOException {
153        free(load(pageId, null));
154    }
155
156    /**
157     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
158     *
159     * @param pageId the initial page of the sequence that will be getting freed
160     * @param count the number of pages in the sequence
161     *
162     * @throws IOException
163     *         If an disk error occurred.
164     * @throws IllegalStateException
165     *         if the PageFile is not loaded
166     */
167    public void free(long pageId, int count) throws IOException {
168        free(load(pageId, null), count);
169    }
170
171    /**
172     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
173     *
174     * @param page the initial page of the sequence that will be getting freed
175     * @param count the number of pages in the sequence
176     *
177     * @throws IOException
178     *         If an disk error occurred.
179     * @throws IllegalStateException
180     *         if the PageFile is not loaded
181     */
182    public <T> void free(Page<T> page, int count) throws IOException {
183        pageFile.assertLoaded();
184        long offsetPage = page.getPageId();
185        while (count-- > 0) {
186            if (page == null) {
187                page = load(offsetPage, null);
188            }
189            free(page);
190            page = null;
191            // Increment the offsetPage value since using it depends on the current count.
192            offsetPage++;
193        }
194    }
195
196    /**
197     * Frees up a previously allocated page so that it can be re-allocated again.
198     *
199     * @param page the page to free up
200     * @throws IOException
201     *         If an disk error occurred.
202     * @throws IllegalStateException
203     *         if the PageFile is not loaded
204     */
205    public <T> void free(Page<T> page) throws IOException {
206        pageFile.assertLoaded();
207
208        // We may need loop to free up a page chain.
209        while (page != null) {
210
211            // Is it already free??
212            if (page.getType() == Page.PAGE_FREE_TYPE) {
213                return;
214            }
215
216            Page<T> next = null;
217            if (page.getType() == Page.PAGE_PART_TYPE) {
218                next = load(page.getNext(), null);
219            }
220
221            page.makeFree(getWriteTransactionId());
222            // ensure free page is visible while write is pending
223            pageFile.addToCache(page.copy());
224
225            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
226            page.write(out);
227            write(page, out.getData());
228
229            freeList.add(page.getPageId());
230            page = next;
231        }
232    }
233
234    /**
235     *
236     * @param page
237     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
238     * @param marshaller
239     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
240     * @param overflow
241     *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
242     *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
243     *        and the overflow condition would occur, then the PageOverflowIOException is thrown.
244     * @throws IOException
245     *         If an disk error occurred.
246     * @throws PageOverflowIOException
247     *         If the page data marshalls to size larger than maximum page size and overflow was false.
248     * @throws IllegalStateException
249     *         if the PageFile is not loaded
250     */
251    public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
252        DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
253        if (marshaller != null) {
254            marshaller.writePayload(page.get(), out);
255        }
256        out.close();
257    }
258
259    /**
260     * @throws IOException
261     */
262    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
263        pageFile.assertLoaded();
264
265        // Copy to protect against the end user changing
266        // the page instance while we are doing a write.
267        final Page copy = page.copy();
268        pageFile.addToCache(copy);
269
270        //
271        // To support writing VERY large data, we override the output stream so
272        // that we
273        // we do the page writes incrementally while the data is being
274        // marshalled.
275        DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
276            Page current = copy;
277
278            @SuppressWarnings("unchecked")
279            @Override
280            protected void onWrite() throws IOException {
281
282                // Are we at an overflow condition?
283                final int pageSize = pageFile.getPageSize();
284                if (pos >= pageSize) {
285                    // If overflow is allowed
286                    if (overflow) {
287
288                        do {
289                            Page next;
290                            if (current.getType() == Page.PAGE_PART_TYPE) {
291                                next = load(current.getNext(), null);
292                            } else {
293                                next = allocate();
294                            }
295
296                            next.txId = current.txId;
297
298                            // Write the page header
299                            int oldPos = pos;
300                            pos = 0;
301
302                            current.makePagePart(next.getPageId(), getWriteTransactionId());
303                            current.write(this);
304
305                            // Do the page write..
306                            byte[] data = new byte[pageSize];
307                            System.arraycopy(buf, 0, data, 0, pageSize);
308                            Transaction.this.write(current, data);
309
310                            // Reset for the next page chunk
311                            pos = 0;
312                            // The page header marshalled after the data is written.
313                            skip(Page.PAGE_HEADER_SIZE);
314                            // Move the overflow data after the header.
315                            System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
316                            pos += oldPos - pageSize;
317                            current = next;
318
319                        } while (pos > pageSize);
320                    } else {
321                        throw new PageOverflowIOException("Page overflow.");
322                    }
323                }
324
325            }
326
327            @Override
328            public void close() throws IOException {
329                super.close();
330
331                // We need to free up the rest of the page chain..
332                if (current.getType() == Page.PAGE_PART_TYPE) {
333                    free(current.getNext());
334                }
335
336                current.makePageEnd(pos, getWriteTransactionId());
337
338                // Write the header..
339                pos = 0;
340                current.write(this);
341
342                Transaction.this.write(current, buf);
343            }
344        };
345
346        // The page header marshaled after the data is written.
347        out.skip(Page.PAGE_HEADER_SIZE);
348        return out;
349    }
350
351    /**
352     * Loads a page from disk.
353     *
354     * @param pageId
355     *        the id of the page to load
356     * @param marshaller
357     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
358     * @return The page with the given id
359     * @throws IOException
360     *         If an disk error occurred.
361     * @throws IllegalStateException
362     *         if the PageFile is not loaded
363     */
364    public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
365        pageFile.assertLoaded();
366        Page<T> page = new Page<T>(pageId);
367        load(page, marshaller);
368        return page;
369    }
370
371    /**
372     * Loads a page from disk.
373     *
374     * @param page - The pageId field must be properly set
375     * @param marshaller
376     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
377     * @throws IOException
378     *         If an disk error occurred.
379     * @throws InvalidPageIOException
380     *         If the page is is not valid.
381     * @throws IllegalStateException
382     *         if the PageFile is not loaded
383     */
384    @SuppressWarnings("unchecked")
385    public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
386        pageFile.assertLoaded();
387
388        // Can't load invalid offsets...
389        long pageId = page.getPageId();
390        if (pageId < 0) {
391            throw new InvalidPageIOException("Page id is not valid", pageId);
392        }
393
394        // It might be a page this transaction has modified...
395        PageWrite update = writes.get(pageId);
396        if (update != null) {
397            page.copy(update.getPage());
398            return;
399        }
400
401        // We may be able to get it from the cache...
402        Page<T> t = pageFile.getFromCache(pageId);
403        if (t != null) {
404            page.copy(t);
405            return;
406        }
407
408        if (marshaller != null) {
409            // Full page read..
410            InputStream is = openInputStream(page);
411            DataInputStream dataIn = new DataInputStream(is);
412            page.set(marshaller.readPayload(dataIn));
413            is.close();
414        } else {
415            // Page header read.
416            DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
417            pageFile.readPage(pageId, in.getRawData());
418            page.read(in);
419            page.set(null);
420        }
421
422        // Cache it.
423        if (marshaller != null) {
424            pageFile.addToCache(page);
425        }
426    }
427
428    /**
429     * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
430     *      org.apache.kahadb.util.Marshaller)
431     */
432    public InputStream openInputStream(final Page p) throws IOException {
433
434        return new InputStream() {
435
436            private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
437            private Page page = readPage(p);
438            private int pageCount = 1;
439
440            private Page markPage;
441            private ByteSequence markChunk;
442
443            private Page readPage(Page page) throws IOException {
444                // Read the page data
445
446                pageFile.readPage(page.getPageId(), chunk.getData());
447
448                chunk.setOffset(0);
449                chunk.setLength(pageFile.getPageSize());
450
451                DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
452                page.read(in);
453
454                chunk.setOffset(Page.PAGE_HEADER_SIZE);
455                if (page.getType() == Page.PAGE_END_TYPE) {
456                    chunk.setLength((int)(page.getNext()));
457                }
458
459                if (page.getType() == Page.PAGE_FREE_TYPE) {
460                    throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
461                }
462
463                return page;
464            }
465
466            public int read() throws IOException {
467                if (!atEOF()) {
468                    return chunk.data[chunk.offset++] & 0xff;
469                } else {
470                    return -1;
471                }
472            }
473
474            private boolean atEOF() throws IOException {
475                if (chunk.offset < chunk.length) {
476                    return false;
477                }
478                if (page.getType() == Page.PAGE_END_TYPE) {
479                    return true;
480                }
481                fill();
482                return chunk.offset >= chunk.length;
483            }
484
485            private void fill() throws IOException {
486                page = readPage(new Page(page.getNext()));
487                pageCount++;
488            }
489
490            public int read(byte[] b) throws IOException {
491                return read(b, 0, b.length);
492            }
493
494            public int read(byte b[], int off, int len) throws IOException {
495                if (!atEOF()) {
496                    int rc = 0;
497                    while (!atEOF() && rc < len) {
498                        len = Math.min(len, chunk.length - chunk.offset);
499                        if (len > 0) {
500                            System.arraycopy(chunk.data, chunk.offset, b, off, len);
501                            chunk.offset += len;
502                        }
503                        rc += len;
504                    }
505                    return rc;
506                } else {
507                    return -1;
508                }
509            }
510
511            public long skip(long len) throws IOException {
512                if (atEOF()) {
513                    int rc = 0;
514                    while (!atEOF() && rc < len) {
515                        len = Math.min(len, chunk.length - chunk.offset);
516                        if (len > 0) {
517                            chunk.offset += len;
518                        }
519                        rc += len;
520                    }
521                    return rc;
522                } else {
523                    return -1;
524                }
525            }
526
527            public int available() {
528                return chunk.length - chunk.offset;
529            }
530
531            public boolean markSupported() {
532                return true;
533            }
534
535            public void mark(int markpos) {
536                markPage = page;
537                byte data[] = new byte[pageFile.getPageSize()];
538                System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
539                markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
540            }
541
542            public void reset() {
543                page = markPage;
544                chunk = markChunk;
545            }
546
547        };
548    }
549
550    /**
551     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are
552     * not included in this iteration.
553     *
554     * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
555     *
556     * @throws IllegalStateException
557     *         if the PageFile is not loaded
558     */
559    public Iterator<Page> iterator() {
560        return (Iterator<Page>)iterator(false);
561    }
562
563    /**
564     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
565     * iterated.
566     *
567     * @param includeFreePages - if true, free pages are included in the iteration
568     * @throws IllegalStateException
569     *         if the PageFile is not loaded
570     */
571    public Iterator<Page> iterator(final boolean includeFreePages) {
572
573        pageFile.assertLoaded();
574
575        return new Iterator<Page>() {
576
577            long nextId;
578            Page nextPage;
579            Page lastPage;
580
581            private void findNextPage() {
582                if (!pageFile.isLoaded()) {
583                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
584                }
585
586                if (nextPage != null) {
587                    return;
588                }
589
590                try {
591                    while (nextId < pageFile.getPageCount()) {
592
593                        Page page = load(nextId, null);
594
595                        if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
596                            nextPage = page;
597                            return;
598                        } else {
599                            nextId++;
600                        }
601                    }
602                } catch (IOException e) {
603                }
604            }
605
606            public boolean hasNext() {
607                findNextPage();
608                return nextPage != null;
609            }
610
611            public Page next() {
612                findNextPage();
613                if (nextPage != null) {
614                    lastPage = nextPage;
615                    nextPage = null;
616                    nextId++;
617                    return lastPage;
618                } else {
619                    throw new NoSuchElementException();
620                }
621            }
622
623            @SuppressWarnings("unchecked")
624            public void remove() {
625                if (lastPage == null) {
626                    throw new IllegalStateException();
627                }
628                try {
629                    free(lastPage);
630                    lastPage = null;
631                } catch (IOException e) {
632                    throw new RuntimeException(e);
633                }
634            }
635        };
636    }
637
638    ///////////////////////////////////////////////////////////////////
639    // Commit / Rollback related methods..
640    ///////////////////////////////////////////////////////////////////
641
642    /**
643     * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
644     * with the transaction are written to disk or none will.
645     */
646    public void commit() throws IOException {
647        if( writeTransactionId!=-1 ) {
648            if (tmpFile != null) {
649                tmpFile.close();
650                pageFile.removeTmpFile(getTempFile());
651                tmpFile = null;
652                txFile = null;
653            }
654            // Actually do the page writes...
655            pageFile.write(writes.entrySet());
656            // Release the pages that were freed up in the transaction..
657            freePages(freeList);
658
659            freeList.clear();
660            allocateList.clear();
661            writes.clear();
662            writeTransactionId = -1;
663        }
664        size = 0;
665    }
666
667    /**
668     * Rolls back the transaction.
669     */
670    public void rollback() throws IOException {
671        if( writeTransactionId!=-1 ) {
672            if (tmpFile != null) {
673                tmpFile.close();
674                pageFile.removeTmpFile(getTempFile());
675                tmpFile = null;
676                txFile = null;
677            }
678            // Release the pages that were allocated in the transaction...
679            freePages(allocateList);
680
681            freeList.clear();
682            allocateList.clear();
683            writes.clear();
684            writeTransactionId = -1;
685        }
686        size = 0;
687    }
688
689    private long getWriteTransactionId() {
690        if( writeTransactionId==-1 ) {
691            writeTransactionId = pageFile.getNextWriteTransactionId();
692        }
693        return writeTransactionId;
694    }
695
696
697    protected File getTempFile() {
698        if (txFile == null) {
699            txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-"+ Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp"));
700        }
701       return txFile;
702    }
703
704    /**
705     * Queues up a page write that should get done when commit() gets called.
706     */
707    private void write(final Page page, byte[] data) throws IOException {
708        Long key = page.getPageId();
709
710        // how much pages we have for this transaction
711        size = writes.size() * pageFile.getPageSize();
712
713        PageWrite write;
714
715        if (size > maxTransactionSize) {
716            if (tmpFile == null) {
717                tmpFile = new RandomAccessFile(getTempFile(), "rw");
718            }
719            long location = nextLocation;
720            tmpFile.seek(nextLocation);
721            tmpFile.write(data);
722            nextLocation = location + data.length;
723            write = new PageWrite(page, location, data.length, getTempFile());
724        } else {
725            write = new PageWrite(page, data);
726        }
727        writes.put(key, write);
728    }
729
730    /**
731     * @param list
732     * @throws RuntimeException
733     */
734    private void freePages(SequenceSet list) throws RuntimeException {
735        Sequence seq = list.getHead();
736        while( seq!=null ) {
737            seq.each(new Sequence.Closure<RuntimeException>(){
738                public void execute(long value) {
739                    pageFile.freePage(value);
740                }
741            });
742            seq = seq.getNext();
743        }
744    }
745
746    /**
747     * @return true if there are no uncommitted page file updates associated with this transaction.
748     */
749    public boolean isReadOnly() {
750        return writeTransactionId==-1;
751    }
752
753    ///////////////////////////////////////////////////////////////////
754    // Transaction closure helpers...
755    ///////////////////////////////////////////////////////////////////
756
757    /**
758     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
759     * If the closure throws an Exception, then the transaction is rolled back.
760     *
761     * @param <T>
762     * @param closure - the work to get exectued.
763     * @throws T if the closure throws it
764     * @throws IOException If the commit fails.
765     */
766    public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
767        boolean success = false;
768        try {
769            closure.execute(this);
770            success = true;
771        } finally {
772            if (success) {
773                commit();
774            } else {
775                rollback();
776            }
777        }
778    }
779
780    /**
781     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
782     * If the closure throws an Exception, then the transaction is rolled back.
783     *
784     * @param <T>
785     * @param closure - the work to get exectued.
786     * @throws T if the closure throws it
787     * @throws IOException If the commit fails.
788     */
789    public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
790        boolean success = false;
791        try {
792            R rc = closure.execute(this);
793            success = true;
794            return rc;
795        } finally {
796            if (success) {
797                commit();
798            } else {
799                rollback();
800            }
801        }
802    }
803}