001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.journal;
018
019import java.io.File;
020import java.io.FilenameFilter;
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.Timer;
032import java.util.TimerTask;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.zip.Adler32;
038import java.util.zip.Checksum;
039import org.apache.kahadb.util.LinkedNode;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.kahadb.util.ByteSequence;
043import org.apache.kahadb.util.DataByteArrayInputStream;
044import org.apache.kahadb.util.DataByteArrayOutputStream;
045import org.apache.kahadb.util.LinkedNodeList;
046import org.apache.kahadb.util.SchedulerTimerTask;
047import org.apache.kahadb.util.Sequence;
048
049/**
050 * Manages DataFiles
051 *
052 *
053 */
054public class Journal {
055    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
056    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
057
058    private static final int MAX_BATCH_SIZE = 32*1024*1024;
059
060    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
061    public static final int RECORD_HEAD_SPACE = 4 + 1;
062
063    public static final byte USER_RECORD_TYPE = 1;
064    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
065    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
066    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
067    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
068    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
069
070    private static byte[] createBatchControlRecordHeader() {
071        try {
072            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
073            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
074            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
075            os.write(BATCH_CONTROL_RECORD_MAGIC);
076            ByteSequence sequence = os.toByteSequence();
077            sequence.compact();
078            return sequence.getData();
079        } catch (IOException e) {
080            throw new RuntimeException("Could not create batch control record header.", e);
081        }
082    }
083
084    public static final String DEFAULT_DIRECTORY = ".";
085    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
086    public static final String DEFAULT_FILE_PREFIX = "db-";
087    public static final String DEFAULT_FILE_SUFFIX = ".log";
088    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
089    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
090    public static final int PREFERED_DIFF = 1024 * 512;
091    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
092
093    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
094
095    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
096
097    protected File directory = new File(DEFAULT_DIRECTORY);
098    protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
099    protected String filePrefix = DEFAULT_FILE_PREFIX;
100    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
101    protected boolean started;
102
103    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
104    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
105    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
106
107    protected FileAppender appender;
108    protected DataFileAccessorPool accessorPool;
109
110    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
111    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
112    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
113
114    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
115    protected Runnable cleanupTask;
116    protected AtomicLong totalLength = new AtomicLong();
117    protected boolean archiveDataLogs;
118    private ReplicationTarget replicationTarget;
119    protected boolean checksum;
120    protected boolean checkForCorruptionOnStartup;
121    protected boolean enableAsyncDiskSync = true;
122    private Timer timer;
123
124    public synchronized void start() throws IOException {
125        if (started) {
126            return;
127        }
128
129        long start = System.currentTimeMillis();
130        accessorPool = new DataFileAccessorPool(this);
131        started = true;
132        preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
133
134        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
135
136        File[] files = directory.listFiles(new FilenameFilter() {
137            public boolean accept(File dir, String n) {
138                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
139            }
140        });
141
142        if (files != null) {
143            for (File file : files) {
144                try {
145                    String n = file.getName();
146                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
147                    int num = Integer.parseInt(numStr);
148                    DataFile dataFile = new DataFile(file, num, preferedFileLength);
149                    fileMap.put(dataFile.getDataFileId(), dataFile);
150                    totalLength.addAndGet(dataFile.getLength());
151                } catch (NumberFormatException e) {
152                    // Ignore file that do not match the pattern.
153                }
154            }
155
156            // Sort the list so that we can link the DataFiles together in the
157            // right order.
158            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
159            Collections.sort(l);
160            for (DataFile df : l) {
161                if (df.getLength() == 0) {
162                    // possibly the result of a previous failed write
163                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
164                    continue;
165                }
166                dataFiles.addLast(df);
167                fileByFileMap.put(df.getFile(), df);
168
169                if( isCheckForCorruptionOnStartup() ) {
170                    lastAppendLocation.set(recoveryCheck(df));
171                }
172            }
173        }
174
175        getCurrentWriteFile();
176
177        if( lastAppendLocation.get()==null ) {
178            DataFile df = dataFiles.getTail();
179            lastAppendLocation.set(recoveryCheck(df));
180        }
181
182        cleanupTask = new Runnable() {
183            public void run() {
184                cleanup();
185            }
186        };
187        this.timer = new Timer("KahaDB Scheduler", true);
188        TimerTask task = new SchedulerTimerTask(cleanupTask);
189        this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
190        long end = System.currentTimeMillis();
191        LOG.trace("Startup took: "+(end-start)+" ms");
192    }
193
194    private static byte[] bytes(String string) {
195        try {
196            return string.getBytes("UTF-8");
197        } catch (UnsupportedEncodingException e) {
198            throw new RuntimeException(e);
199        }
200    }
201
202    protected Location recoveryCheck(DataFile dataFile) throws IOException {
203        Location location = new Location();
204        location.setDataFileId(dataFile.getDataFileId());
205        location.setOffset(0);
206
207        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
208        try {
209            while( true ) {
210                int size = checkBatchRecord(reader, location.getOffset());
211                if ( size>=0 ) {
212                    location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
213                } else {
214
215                    // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
216                    // may have subsequent valid batch records.
217                    int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
218                    if( nextOffset >=0 ) {
219                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
220                        LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
221                        dataFile.corruptedBlocks.add(sequence);
222                        location.setOffset(nextOffset);
223                    } else {
224                        break;
225                    }
226                }
227            }
228
229        } catch (IOException e) {
230        } finally {
231            accessorPool.closeDataFileAccessor(reader);
232        }
233
234        int existingLen = dataFile.getLength();
235        dataFile.setLength(location.getOffset());
236        if (existingLen > dataFile.getLength()) {
237            totalLength.addAndGet(dataFile.getLength() - existingLen);
238        }
239
240        if( !dataFile.corruptedBlocks.isEmpty() ) {
241            // Is the end of the data file corrupted?
242            if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
243                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
244            }
245        }
246
247        return location;
248    }
249
250    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
251        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
252        byte data[] = new byte[1024*4];
253        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
254
255        int pos = 0;
256        while( true ) {
257            pos = bs.indexOf(header, pos);
258            if( pos >= 0 ) {
259                return offset+pos;
260            } else {
261                // need to load the next data chunck in..
262                if( bs.length != data.length ) {
263                    // If we had a short read then we were at EOF
264                    return -1;
265                }
266                offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
267                bs = new ByteSequence(data, 0, reader.read(offset, data));
268                pos=0;
269            }
270        }
271    }
272
273
274    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
275        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
276        DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
277
278        reader.readFully(offset, controlRecord);
279
280        // Assert that it's  a batch record.
281        for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
282            if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
283                return -1;
284            }
285        }
286
287        int size = controlIs.readInt();
288        if( size > MAX_BATCH_SIZE ) {
289            return -1;
290        }
291
292        if( isChecksum() ) {
293
294            long expectedChecksum = controlIs.readLong();
295            if( expectedChecksum == 0 ) {
296                // Checksuming was not enabled when the record was stored.
297                // we can't validate the record :(
298                return size;
299            }
300
301            byte data[] = new byte[size];
302            reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
303
304            Checksum checksum = new Adler32();
305            checksum.update(data, 0, data.length);
306
307            if( expectedChecksum!=checksum.getValue() ) {
308                return -1;
309            }
310
311        }
312        return size;
313    }
314
315
316    void addToTotalLength(int size) {
317        totalLength.addAndGet(size);
318    }
319
320    public long length() {
321        return totalLength.get();
322    }
323
324    synchronized DataFile getCurrentWriteFile() throws IOException {
325        if (dataFiles.isEmpty()) {
326            rotateWriteFile();
327        }
328        return dataFiles.getTail();
329    }
330
331    synchronized DataFile rotateWriteFile() {
332        int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
333        File file = getFile(nextNum);
334        DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
335        // actually allocate the disk space
336        fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
337        fileByFileMap.put(file, nextWriteFile);
338        dataFiles.addLast(nextWriteFile);
339        return nextWriteFile;
340    }
341
342    public File getFile(int nextNum) {
343        String fileName = filePrefix + nextNum + fileSuffix;
344        File file = new File(directory, fileName);
345        return file;
346    }
347
348    synchronized DataFile getDataFile(Location item) throws IOException {
349        Integer key = Integer.valueOf(item.getDataFileId());
350        DataFile dataFile = fileMap.get(key);
351        if (dataFile == null) {
352            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
353            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
354        }
355        return dataFile;
356    }
357
358    synchronized File getFile(Location item) throws IOException {
359        Integer key = Integer.valueOf(item.getDataFileId());
360        DataFile dataFile = fileMap.get(key);
361        if (dataFile == null) {
362            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
363            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
364        }
365        return dataFile.getFile();
366    }
367
368    private DataFile getNextDataFile(DataFile dataFile) {
369        return dataFile.getNext();
370    }
371
372    public synchronized void close() throws IOException {
373        if (!started) {
374            return;
375        }
376        if (this.timer != null) {
377            this.timer.cancel();
378        }
379        accessorPool.close();
380        appender.close();
381        fileMap.clear();
382        fileByFileMap.clear();
383        dataFiles.clear();
384        lastAppendLocation.set(null);
385        started = false;
386    }
387
388    protected synchronized void cleanup() {
389        if (accessorPool != null) {
390            accessorPool.disposeUnused();
391        }
392    }
393
394    public synchronized boolean delete() throws IOException {
395
396        // Close all open file handles...
397        appender.close();
398        accessorPool.close();
399
400        boolean result = true;
401        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
402            DataFile dataFile = i.next();
403            totalLength.addAndGet(-dataFile.getLength());
404            result &= dataFile.delete();
405        }
406        fileMap.clear();
407        fileByFileMap.clear();
408        lastAppendLocation.set(null);
409        dataFiles = new LinkedNodeList<DataFile>();
410
411        // reopen open file handles...
412        accessorPool = new DataFileAccessorPool(this);
413        appender = new DataFileAppender(this);
414        return result;
415    }
416
417    public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
418        for (Integer key : files) {
419            // Can't remove the data file (or subsequent files) that is currently being written to.
420            if( key >= lastAppendLocation.get().getDataFileId() ) {
421                continue;
422            }
423            DataFile dataFile = fileMap.get(key);
424            if( dataFile!=null ) {
425                forceRemoveDataFile(dataFile);
426            }
427        }
428    }
429
430    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
431        accessorPool.disposeDataFileAccessors(dataFile);
432        fileByFileMap.remove(dataFile.getFile());
433        fileMap.remove(dataFile.getDataFileId());
434        totalLength.addAndGet(-dataFile.getLength());
435        dataFile.unlink();
436        if (archiveDataLogs) {
437            dataFile.move(getDirectoryArchive());
438            LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
439        } else {
440            if ( dataFile.delete() ) {
441                LOG.debug("Discarded data file " + dataFile);
442            } else {
443                LOG.warn("Failed to discard data file " + dataFile.getFile());
444            }
445        }
446    }
447
448    /**
449     * @return the maxFileLength
450     */
451    public int getMaxFileLength() {
452        return maxFileLength;
453    }
454
455    /**
456     * @param maxFileLength the maxFileLength to set
457     */
458    public void setMaxFileLength(int maxFileLength) {
459        this.maxFileLength = maxFileLength;
460    }
461
462    @Override
463    public String toString() {
464        return directory.toString();
465    }
466
467    public synchronized void appendedExternally(Location loc, int length) throws IOException {
468        DataFile dataFile = null;
469        if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
470            // It's an update to the current log file..
471            dataFile = dataFiles.getTail();
472            dataFile.incrementLength(length);
473        } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
474            // It's an update to the next log file.
475            int nextNum = loc.getDataFileId();
476            File file = getFile(nextNum);
477            dataFile = new DataFile(file, nextNum, preferedFileLength);
478            // actually allocate the disk space
479            fileMap.put(dataFile.getDataFileId(), dataFile);
480            fileByFileMap.put(file, dataFile);
481            dataFiles.addLast(dataFile);
482        } else {
483            throw new IOException("Invalid external append.");
484        }
485    }
486
487    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
488
489        Location cur = null;
490        while (true) {
491            if (cur == null) {
492                if (location == null) {
493                    DataFile head = dataFiles.getHead();
494                    if( head == null ) {
495                        return null;
496                    }
497                    cur = new Location();
498                    cur.setDataFileId(head.getDataFileId());
499                    cur.setOffset(0);
500                } else {
501                    // Set to the next offset..
502                    if (location.getSize() == -1) {
503                        cur = new Location(location);
504                    } else {
505                        cur = new Location(location);
506                        cur.setOffset(location.getOffset() + location.getSize());
507                    }
508                }
509            } else {
510                cur.setOffset(cur.getOffset() + cur.getSize());
511            }
512
513            DataFile dataFile = getDataFile(cur);
514
515            // Did it go into the next file??
516            if (dataFile.getLength() <= cur.getOffset()) {
517                dataFile = getNextDataFile(dataFile);
518                if (dataFile == null) {
519                    return null;
520                } else {
521                    cur.setDataFileId(dataFile.getDataFileId().intValue());
522                    cur.setOffset(0);
523                }
524            }
525
526            // Load in location size and type.
527            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
528            try {
529                reader.readLocationDetails(cur);
530            } finally {
531                accessorPool.closeDataFileAccessor(reader);
532            }
533
534            if (cur.getType() == 0) {
535                return null;
536            } else if (cur.getType() == USER_RECORD_TYPE) {
537                // Only return user records.
538                return cur;
539            }
540        }
541    }
542
543    public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
544        DataFile df = fileByFileMap.get(file);
545        return getNextLocation(df, lastLocation, thisFileOnly);
546    }
547
548    public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
549
550        Location cur = null;
551        while (true) {
552            if (cur == null) {
553                if (lastLocation == null) {
554                    DataFile head = dataFile.getHeadNode();
555                    cur = new Location();
556                    cur.setDataFileId(head.getDataFileId());
557                    cur.setOffset(0);
558                } else {
559                    // Set to the next offset..
560                    cur = new Location(lastLocation);
561                    cur.setOffset(cur.getOffset() + cur.getSize());
562                }
563            } else {
564                cur.setOffset(cur.getOffset() + cur.getSize());
565            }
566
567            // Did it go into the next file??
568            if (dataFile.getLength() <= cur.getOffset()) {
569                if (thisFileOnly) {
570                    return null;
571                } else {
572                    dataFile = getNextDataFile(dataFile);
573                    if (dataFile == null) {
574                        return null;
575                    } else {
576                        cur.setDataFileId(dataFile.getDataFileId().intValue());
577                        cur.setOffset(0);
578                    }
579                }
580            }
581
582            // Load in location size and type.
583            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
584            try {
585                reader.readLocationDetails(cur);
586            } finally {
587                accessorPool.closeDataFileAccessor(reader);
588            }
589
590            if (cur.getType() == 0) {
591                return null;
592            } else if (cur.getType() > 0) {
593                // Only return user records.
594                return cur;
595            }
596        }
597    }
598
599    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
600        DataFile dataFile = getDataFile(location);
601        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
602        ByteSequence rc = null;
603        try {
604            rc = reader.readRecord(location);
605        } finally {
606            accessorPool.closeDataFileAccessor(reader);
607        }
608        return rc;
609    }
610
611    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
612        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
613        return loc;
614    }
615
616    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
617        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
618        return loc;
619    }
620
621    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
622        DataFile dataFile = getDataFile(location);
623        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
624        try {
625            updater.updateRecord(location, data, sync);
626        } finally {
627            accessorPool.closeDataFileAccessor(updater);
628        }
629    }
630
631    public File getDirectory() {
632        return directory;
633    }
634
635    public void setDirectory(File directory) {
636        this.directory = directory;
637    }
638
639    public String getFilePrefix() {
640        return filePrefix;
641    }
642
643    public void setFilePrefix(String filePrefix) {
644        this.filePrefix = filePrefix;
645    }
646
647    public Map<WriteKey, WriteCommand> getInflightWrites() {
648        return inflightWrites;
649    }
650
651    public Location getLastAppendLocation() {
652        return lastAppendLocation.get();
653    }
654
655    public void setLastAppendLocation(Location lastSyncedLocation) {
656        this.lastAppendLocation.set(lastSyncedLocation);
657    }
658
659    public File getDirectoryArchive() {
660        return directoryArchive;
661    }
662
663    public void setDirectoryArchive(File directoryArchive) {
664        this.directoryArchive = directoryArchive;
665    }
666
667    public boolean isArchiveDataLogs() {
668        return archiveDataLogs;
669    }
670
671    public void setArchiveDataLogs(boolean archiveDataLogs) {
672        this.archiveDataLogs = archiveDataLogs;
673    }
674
675    synchronized public Integer getCurrentDataFileId() {
676        if (dataFiles.isEmpty())
677            return null;
678        return dataFiles.getTail().getDataFileId();
679    }
680
681    /**
682     * Get a set of files - only valid after start()
683     *
684     * @return files currently being used
685     */
686    public Set<File> getFiles() {
687        return fileByFileMap.keySet();
688    }
689
690    public synchronized Map<Integer, DataFile> getFileMap() {
691        return new TreeMap<Integer, DataFile>(fileMap);
692    }
693
694    public long getDiskSize() {
695        long tailLength=0;
696        synchronized( this ) {
697            if( !dataFiles.isEmpty() ) {
698                tailLength = dataFiles.getTail().getLength();
699            }
700        }
701
702        long rc = totalLength.get();
703
704        // The last file is actually at a minimum preferedFileLength big.
705        if( tailLength < preferedFileLength ) {
706            rc -= tailLength;
707            rc += preferedFileLength;
708        }
709        return rc;
710    }
711
712    public void setReplicationTarget(ReplicationTarget replicationTarget) {
713        this.replicationTarget = replicationTarget;
714    }
715    public ReplicationTarget getReplicationTarget() {
716        return replicationTarget;
717    }
718
719    public String getFileSuffix() {
720        return fileSuffix;
721    }
722
723    public void setFileSuffix(String fileSuffix) {
724        this.fileSuffix = fileSuffix;
725    }
726
727    public boolean isChecksum() {
728        return checksum;
729    }
730
731    public void setChecksum(boolean checksumWrites) {
732        this.checksum = checksumWrites;
733    }
734
735    public boolean isCheckForCorruptionOnStartup() {
736        return checkForCorruptionOnStartup;
737    }
738
739    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
740        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
741    }
742
743    public void setWriteBatchSize(int writeBatchSize) {
744        this.writeBatchSize = writeBatchSize;
745    }
746
747    public int getWriteBatchSize() {
748        return writeBatchSize;
749    }
750
751    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
752       this.totalLength = storeSizeAccumulator;
753    }
754
755    public void setEnableAsyncDiskSync(boolean val) {
756        this.enableAsyncDiskSync = val;
757    }
758
759    public boolean isEnableAsyncDiskSync() {
760        return enableAsyncDiskSync;
761    }
762
763    public static class WriteCommand extends LinkedNode<WriteCommand> {
764        public final Location location;
765        public final ByteSequence data;
766        final boolean sync;
767        public final Runnable onComplete;
768
769        public WriteCommand(Location location, ByteSequence data, boolean sync) {
770            this.location = location;
771            this.data = data;
772            this.sync = sync;
773            this.onComplete = null;
774        }
775
776        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
777            this.location = location;
778            this.data = data;
779            this.onComplete = onComplete;
780            this.sync = false;
781        }
782    }
783
784    public static class WriteKey {
785        private final int file;
786        private final long offset;
787        private final int hash;
788
789        public WriteKey(Location item) {
790            file = item.getDataFileId();
791            offset = item.getOffset();
792            // TODO: see if we can build a better hash
793            hash = (int)(file ^ offset);
794        }
795
796        public int hashCode() {
797            return hash;
798        }
799
800        public boolean equals(Object obj) {
801            if (obj instanceof WriteKey) {
802                WriteKey di = (WriteKey)obj;
803                return di.file == file && di.offset == offset;
804            }
805            return false;
806        }
807    }
808}