001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.async;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FilenameFilter;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.LinkedHashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.atomic.AtomicLong;
037import java.util.concurrent.atomic.AtomicReference;
038import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
039import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
040import org.apache.activemq.thread.Scheduler;
041import org.apache.activemq.util.ByteSequence;
042import org.apache.activemq.util.IOHelper;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046
047
048/**
049 * Manages DataFiles
050 * 
051 * 
052 */
053public class AsyncDataManager {
054
055    public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
056    public static final int ITEM_HEAD_RESERVED_SPACE = 21;
057    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
058    public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
059    public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
060    public static final int ITEM_FOOT_SPACE = 3; // EOR
061
062    public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
063
064    public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 
065    public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 
066
067    public static final byte DATA_ITEM_TYPE = 1;
068    public static final byte REDO_ITEM_TYPE = 2;
069    public static final String DEFAULT_DIRECTORY = "data";
070    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
071    public static final String DEFAULT_FILE_PREFIX = "data-";
072    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
073    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
074    public static final int PREFERED_DIFF = 1024 * 512;
075
076    private static final Logger LOG = LoggerFactory.getLogger(AsyncDataManager.class);
077    protected Scheduler scheduler;
078
079    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
080
081    protected File directory = new File(DEFAULT_DIRECTORY);
082    protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
083    protected String filePrefix = DEFAULT_FILE_PREFIX;
084    protected ControlFile controlFile;
085    protected boolean started;
086    protected boolean useNio = true;
087
088    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
089    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
090
091    protected DataFileAppender appender;
092    protected DataFileAccessorPool accessorPool;
093
094    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
095    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
096    protected DataFile currentWriteFile;
097
098    protected Location mark;
099    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
100    protected Runnable cleanupTask;
101    protected final AtomicLong storeSize;
102    protected boolean archiveDataLogs;
103    
104    public AsyncDataManager(AtomicLong storeSize) {
105        this.storeSize=storeSize;
106    }
107    
108    public AsyncDataManager() {
109        this(new AtomicLong());
110    }
111
112    @SuppressWarnings("unchecked")
113    public synchronized void start() throws IOException {
114        if (started) {
115            return;
116        }
117
118        started = true;
119        preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
120        lock();
121
122        accessorPool = new DataFileAccessorPool(this);
123        ByteSequence sequence = controlFile.load();
124        if (sequence != null && sequence.getLength() > 0) {
125            unmarshallState(sequence);
126        }
127        if (useNio) {
128            appender = new NIODataFileAppender(this);
129        } else {
130            appender = new DataFileAppender(this);
131        }
132
133        File[] files = directory.listFiles(new FilenameFilter() {
134            public boolean accept(File dir, String n) {
135                return dir.equals(directory) && n.startsWith(filePrefix);
136            }
137        });
138       
139        if (files != null) {
140            for (int i = 0; i < files.length; i++) {
141                try {
142                    File file = files[i];
143                    String n = file.getName();
144                    String numStr = n.substring(filePrefix.length(), n.length());
145                    int num = Integer.parseInt(numStr);
146                    DataFile dataFile = new DataFile(file, num, preferedFileLength);
147                    fileMap.put(dataFile.getDataFileId(), dataFile);
148                    storeSize.addAndGet(dataFile.getLength());
149                } catch (NumberFormatException e) {
150                    // Ignore file that do not match the pattern.
151                }
152            }
153
154            // Sort the list so that we can link the DataFiles together in the
155            // right order.
156            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
157            Collections.sort(l);
158            currentWriteFile = null;
159            for (DataFile df : l) {
160                if (currentWriteFile != null) {
161                    currentWriteFile.linkAfter(df);
162                }
163                currentWriteFile = df;
164                fileByFileMap.put(df.getFile(), df);
165            }
166        }
167
168        // Need to check the current Write File to see if there was a partial
169        // write to it.
170        if (currentWriteFile != null) {
171
172            // See if the lastSyncedLocation is valid..
173            Location l = lastAppendLocation.get();
174            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
175                l = null;
176            }
177
178            // If we know the last location that was ok.. then we can skip lots
179            // of checking
180            try{
181            l = recoveryCheck(currentWriteFile, l);
182            lastAppendLocation.set(l);
183            }catch(IOException e){
184                LOG.warn("recovery check failed", e);
185            }
186        }
187
188        storeState(false);
189
190        cleanupTask = new Runnable() {
191            public void run() {
192                cleanup();
193            }
194        };
195        this.scheduler = new Scheduler("AsyncDataManager Scheduler");
196        try {
197            this.scheduler.start();
198        } catch (Exception e) {
199            IOException ioe =  new IOException("scheduler start: " + e);
200            ioe.initCause(e);
201            throw ioe;
202        }
203        this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
204    }
205
206    public void lock() throws IOException {
207        synchronized (this) {
208            if (controlFile == null || controlFile.isDisposed()) {
209                IOHelper.mkdirs(directory);
210                controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
211            }
212            controlFile.lock();
213        }
214    }
215
216    protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
217        if (location == null) {
218            location = new Location();
219            location.setDataFileId(dataFile.getDataFileId());
220            location.setOffset(0);
221        }
222        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
223        try {
224            reader.readLocationDetails(location);
225            while (reader.readLocationDetailsAndValidate(location)) {
226                location.setOffset(location.getOffset() + location.getSize());
227            }
228        } finally {
229            accessorPool.closeDataFileAccessor(reader);
230        }
231        dataFile.setLength(location.getOffset());
232        return location;
233    }
234
235    protected void unmarshallState(ByteSequence sequence) throws IOException {
236        ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
237        DataInputStream dis = new DataInputStream(bais);
238        if (dis.readBoolean()) {
239            mark = new Location();
240            mark.readExternal(dis);
241        } else {
242            mark = null;
243        }
244        if (dis.readBoolean()) {
245            Location l = new Location();
246            l.readExternal(dis);
247            lastAppendLocation.set(l);
248        } else {
249            lastAppendLocation.set(null);
250        }
251    }
252
253    private synchronized ByteSequence marshallState() throws IOException {
254        ByteArrayOutputStream baos = new ByteArrayOutputStream();
255        DataOutputStream dos = new DataOutputStream(baos);
256
257        if (mark != null) {
258            dos.writeBoolean(true);
259            mark.writeExternal(dos);
260        } else {
261            dos.writeBoolean(false);
262        }
263        Location l = lastAppendLocation.get();
264        if (l != null) {
265            dos.writeBoolean(true);
266            l.writeExternal(dos);
267        } else {
268            dos.writeBoolean(false);
269        }
270
271        byte[] bs = baos.toByteArray();
272        return new ByteSequence(bs, 0, bs.length);
273    }
274
275    synchronized DataFile allocateLocation(Location location) throws IOException {
276        if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
277            int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
278
279            String fileName = filePrefix + nextNum;
280            File file = new File(directory, fileName);
281            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
282            //actually allocate the disk space
283            nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
284            fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
285            fileByFileMap.put(file, nextWriteFile);
286            if (currentWriteFile != null) {
287                currentWriteFile.linkAfter(nextWriteFile);
288                if (currentWriteFile.isUnused()) {
289                    removeDataFile(currentWriteFile);
290                }
291            }
292            currentWriteFile = nextWriteFile;
293
294        }
295        location.setOffset(currentWriteFile.getLength());
296        location.setDataFileId(currentWriteFile.getDataFileId().intValue());
297        int size = location.getSize();
298        currentWriteFile.incrementLength(size);
299        currentWriteFile.increment();
300        storeSize.addAndGet(size);
301        return currentWriteFile;
302    }
303    
304    public synchronized void removeLocation(Location location) throws IOException{
305       
306        DataFile dataFile = getDataFile(location);
307        dataFile.decrement();
308    }
309
310    synchronized DataFile getDataFile(Location item) throws IOException {
311        Integer key = Integer.valueOf(item.getDataFileId());
312        DataFile dataFile = fileMap.get(key);
313        if (dataFile == null) {
314            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
315            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
316        }
317        return dataFile;
318    }
319    
320    synchronized File getFile(Location item) throws IOException {
321        Integer key = Integer.valueOf(item.getDataFileId());
322        DataFile dataFile = fileMap.get(key);
323        if (dataFile == null) {
324            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
325            throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
326        }
327        return dataFile.getFile();
328    }
329
330    private DataFile getNextDataFile(DataFile dataFile) {
331        return (DataFile)dataFile.getNext();
332    }
333
334    public synchronized void close() throws IOException {
335        if (!started) {
336            return;
337        }
338        this.scheduler.cancel(cleanupTask);
339        try {
340            this.scheduler.stop();
341        } catch (Exception e) {
342            IOException ioe = new IOException("scheduler stop: " + e);
343            ioe.initCause(e);
344            throw ioe;
345        }
346        accessorPool.close();
347        storeState(false);
348        appender.close();
349        fileMap.clear();
350        fileByFileMap.clear();
351        controlFile.unlock();
352        controlFile.dispose();
353        started = false;
354    }
355
356    synchronized void cleanup() {
357        if (accessorPool != null) {
358            accessorPool.disposeUnused();
359        }
360    }
361
362    public synchronized boolean delete() throws IOException {
363
364        // Close all open file handles...
365        appender.close();
366        accessorPool.close();
367
368        boolean result = true;
369        for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
370            DataFile dataFile = (DataFile)i.next();
371            storeSize.addAndGet(-dataFile.getLength());
372            result &= dataFile.delete();
373        }
374        fileMap.clear();
375        fileByFileMap.clear();
376        lastAppendLocation.set(null);
377        mark = null;
378        currentWriteFile = null;
379
380        // reopen open file handles...
381        accessorPool = new DataFileAccessorPool(this);
382        if (useNio) {
383            appender = new NIODataFileAppender(this);
384        } else {
385            appender = new DataFileAppender(this);
386        }
387        return result;
388    }
389
390    public synchronized void addInterestInFile(int file) throws IOException {
391        if (file >= 0) {
392            Integer key = Integer.valueOf(file);
393            DataFile dataFile = fileMap.get(key);
394            if (dataFile == null) {
395                throw new IOException("That data file does not exist");
396            }
397            addInterestInFile(dataFile);
398        }
399    }
400
401    synchronized void addInterestInFile(DataFile dataFile) {
402        if (dataFile != null) {
403            dataFile.increment();
404        }
405    }
406
407    public synchronized void removeInterestInFile(int file) throws IOException {
408        if (file >= 0) {
409            Integer key = Integer.valueOf(file);
410            DataFile dataFile = fileMap.get(key);
411            removeInterestInFile(dataFile);
412        }
413       
414    }
415
416    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
417        if (dataFile != null) {
418            if (dataFile.decrement() <= 0) {
419                removeDataFile(dataFile);
420            }
421        }
422    }
423
424    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
425        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
426        unUsed.removeAll(inUse);
427        unUsed.removeAll(inProgress);
428                
429        List<DataFile> purgeList = new ArrayList<DataFile>();
430        for (Integer key : unUsed) {
431            DataFile dataFile = fileMap.get(key);
432            purgeList.add(dataFile);
433        }
434        for (DataFile dataFile : purgeList) {
435            if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
436                forceRemoveDataFile(dataFile);
437            }
438        }
439    }
440
441    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
442        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
443        unUsed.removeAll(inUse);
444                
445        List<DataFile> purgeList = new ArrayList<DataFile>();
446        for (Integer key : unUsed) {
447                // Only add files less than the lastFile..
448                if( key.intValue() < lastFile.intValue() ) {
449                DataFile dataFile = fileMap.get(key);
450                purgeList.add(dataFile);
451                }
452        }
453        if (LOG.isDebugEnabled()) {
454            LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList);
455        }
456        for (DataFile dataFile : purgeList) {
457            forceRemoveDataFile(dataFile);
458        }
459        }
460
461    public synchronized void consolidateDataFiles() throws IOException {
462        List<DataFile> purgeList = new ArrayList<DataFile>();
463        for (DataFile dataFile : fileMap.values()) {
464            if (dataFile.isUnused()) {
465                purgeList.add(dataFile);
466            }
467        }
468        for (DataFile dataFile : purgeList) {
469            removeDataFile(dataFile);
470        }
471    }
472
473    private synchronized void removeDataFile(DataFile dataFile) throws IOException {
474
475        // Make sure we don't delete too much data.
476        if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
477            LOG.debug("Won't remove DataFile" + dataFile);
478                return;
479        }
480        forceRemoveDataFile(dataFile);
481    }
482    
483    private synchronized void forceRemoveDataFile(DataFile dataFile)
484            throws IOException {
485        accessorPool.disposeDataFileAccessors(dataFile);
486        fileByFileMap.remove(dataFile.getFile());
487        fileMap.remove(dataFile.getDataFileId());
488        storeSize.addAndGet(-dataFile.getLength());
489        dataFile.unlink();
490        if (archiveDataLogs) {
491            dataFile.move(getDirectoryArchive());
492            LOG.debug("moved data file " + dataFile + " to "
493                    + getDirectoryArchive());
494        } else {
495            boolean result = dataFile.delete();
496            if (!result) {
497                LOG.info("Failed to discard data file " + dataFile);
498            }
499        }
500    }
501
502    /**
503     * @return the maxFileLength
504     */
505    public int getMaxFileLength() {
506        return maxFileLength;
507    }
508
509    /**
510     * @param maxFileLength the maxFileLength to set
511     */
512    public void setMaxFileLength(int maxFileLength) {
513        this.maxFileLength = maxFileLength;
514    }
515
516    @Override
517    public String toString() {
518        return "DataManager:(" + filePrefix + ")";
519    }
520
521    public synchronized Location getMark() throws IllegalStateException {
522        return mark;
523    }
524
525    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
526
527        Location cur = null;
528        while (true) {
529            if (cur == null) {
530                if (location == null) {
531                    DataFile head = (DataFile)currentWriteFile.getHeadNode();
532                    cur = new Location();
533                    cur.setDataFileId(head.getDataFileId());
534                    cur.setOffset(0);
535                } else {
536                    // Set to the next offset..
537                        if( location.getSize() == -1 ) {
538                                cur = new Location(location);
539                        }  else {
540                                cur = new Location(location);
541                                cur.setOffset(location.getOffset()+location.getSize());
542                        }
543                }
544            } else {
545                cur.setOffset(cur.getOffset() + cur.getSize());
546            }
547
548            DataFile dataFile = getDataFile(cur);
549
550            // Did it go into the next file??
551            if (dataFile.getLength() <= cur.getOffset()) {
552                dataFile = getNextDataFile(dataFile);
553                if (dataFile == null) {
554                    return null;
555                } else {
556                    cur.setDataFileId(dataFile.getDataFileId().intValue());
557                    cur.setOffset(0);
558                }
559            }
560
561            // Load in location size and type.
562            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
563            try {
564                reader.readLocationDetails(cur);
565            } finally {
566                accessorPool.closeDataFileAccessor(reader);
567            }
568
569            if (cur.getType() == 0) {
570                return null;
571            } else if (cur.getType() > 0) {
572                // Only return user records.
573                return cur;
574            }
575        }
576    }
577    
578    public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
579        DataFile df = fileByFileMap.get(file);
580        return getNextLocation(df, lastLocation,thisFileOnly);
581    }
582    
583    public synchronized Location getNextLocation(DataFile dataFile,
584            Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
585
586        Location cur = null;
587        while (true) {
588            if (cur == null) {
589                if (lastLocation == null) {
590                    DataFile head = (DataFile)dataFile.getHeadNode();
591                    cur = new Location();
592                    cur.setDataFileId(head.getDataFileId());
593                    cur.setOffset(0);
594                } else {
595                    // Set to the next offset..
596                    cur = new Location(lastLocation);
597                    cur.setOffset(cur.getOffset() + cur.getSize());
598                }
599            } else {
600                cur.setOffset(cur.getOffset() + cur.getSize());
601            }
602
603            
604            // Did it go into the next file??
605            if (dataFile.getLength() <= cur.getOffset()) {
606                if (thisFileOnly) {
607                    return null;
608                }else {
609                dataFile = getNextDataFile(dataFile);
610                if (dataFile == null) {
611                    return null;
612                } else {
613                    cur.setDataFileId(dataFile.getDataFileId().intValue());
614                    cur.setOffset(0);
615                }
616                }
617            }
618
619            // Load in location size and type.
620            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
621            try {
622                reader.readLocationDetails(cur);
623            } finally {
624                accessorPool.closeDataFileAccessor(reader);
625            }
626
627            if (cur.getType() == 0) {
628                return null;
629            } else if (cur.getType() > 0) {
630                // Only return user records.
631                return cur;
632            }
633        }
634    }
635
636    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
637        DataFile dataFile = getDataFile(location);
638        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
639        ByteSequence rc = null;
640        try {
641            rc = reader.readRecord(location);
642        } finally {
643            accessorPool.closeDataFileAccessor(reader);
644        }
645        return rc;
646    }
647
648    public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
649        synchronized (this) {
650            mark = location;
651        }
652        storeState(sync);
653    }
654
655    protected synchronized void storeState(boolean sync) throws IOException {
656        ByteSequence state = marshallState();
657        appender.storeItem(state, Location.MARK_TYPE, sync);
658        controlFile.store(state, sync);
659    }
660
661    public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
662        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
663        return loc;
664    }
665    
666    public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
667        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
668        return loc;
669    }
670
671    public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
672        return appender.storeItem(data, type, sync);
673    }
674
675    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
676        DataFile dataFile = getDataFile(location);
677        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
678        try {
679            updater.updateRecord(location, data, sync);
680        } finally {
681            accessorPool.closeDataFileAccessor(updater);
682        }
683    }
684
685    public File getDirectory() {
686        return directory;
687    }
688
689    public void setDirectory(File directory) {
690        this.directory = directory;
691    }
692
693    public String getFilePrefix() {
694        return filePrefix;
695    }
696
697    public void setFilePrefix(String filePrefix) {
698        this.filePrefix = IOHelper.toFileSystemSafeName(filePrefix);
699    }
700
701    public Map<WriteKey, WriteCommand> getInflightWrites() {
702        return inflightWrites;
703    }
704
705    public Location getLastAppendLocation() {
706        return lastAppendLocation.get();
707    }
708
709    public void setLastAppendLocation(Location lastSyncedLocation) {
710        this.lastAppendLocation.set(lastSyncedLocation);
711    }
712
713        public boolean isUseNio() {
714                return useNio;
715        }
716
717        public void setUseNio(boolean useNio) {
718                this.useNio = useNio;
719        }
720        
721        public File getDirectoryArchive() {
722        return directoryArchive;
723    }
724
725    public void setDirectoryArchive(File directoryArchive) {
726        this.directoryArchive = directoryArchive;
727    }
728    
729    public boolean isArchiveDataLogs() {
730        return archiveDataLogs;
731    }
732
733    public void setArchiveDataLogs(boolean archiveDataLogs) {
734        this.archiveDataLogs = archiveDataLogs;
735    }
736
737    synchronized public Integer getCurrentDataFileId() {
738        if( currentWriteFile==null )
739            return null;
740        return currentWriteFile.getDataFileId();
741    }
742    
743    /**
744     * Get a set of files - only valid after start()
745     * @return files currently being used
746     */
747    public Set<File> getFiles(){
748        return fileByFileMap.keySet();
749    }
750
751        synchronized public long getDiskSize() {
752                long rc=0;
753        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
754        while( cur !=null ) {
755                rc += cur.getLength();
756                cur = (DataFile) cur.getNext();
757        }
758                return rc;
759        }
760
761        synchronized public long getDiskSizeUntil(Location startPosition) {
762                long rc=0;
763        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
764        while( cur !=null ) {
765                if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
766                        return rc + startPosition.getOffset();
767                }
768                rc += cur.getLength();
769                cur = (DataFile) cur.getNext();
770        }
771                return rc;
772        }
773
774}