001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.File;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.Set;
030
031import org.apache.activemq.broker.BrokerService;
032import org.apache.activemq.broker.BrokerServiceAware;
033import org.apache.activemq.thread.Scheduler;
034import org.apache.activemq.util.IOHelper;
035import org.apache.activemq.util.ServiceStopper;
036import org.apache.activemq.util.ServiceSupport;
037import org.apache.kahadb.index.BTreeIndex;
038import org.apache.kahadb.journal.Journal;
039import org.apache.kahadb.journal.Location;
040import org.apache.kahadb.page.Page;
041import org.apache.kahadb.page.PageFile;
042import org.apache.kahadb.page.Transaction;
043import org.apache.kahadb.util.ByteSequence;
044import org.apache.kahadb.util.LockFile;
045import org.apache.kahadb.util.StringMarshaller;
046import org.apache.kahadb.util.VariableMarshaller;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * @org.apache.xbean.XBean
052 */
053public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
054    static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
055    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
056
057    static final int CLOSED_STATE = 1;
058    static final int OPEN_STATE = 2;
059
060    private File directory;
061    PageFile pageFile;
062    private Journal journal;
063    private LockFile lockFile;
064    private boolean failIfDatabaseIsLocked;
065    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
066    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
067    private boolean enableIndexWriteAsync = false;
068    private boolean initialized = false;
069    private boolean lazyInit = true;
070    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
071    MetaData metaData = new MetaData(this);
072    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
073    Map<String, PList> persistentLists = new HashMap<String, PList>();
074    final Object indexLock = new Object();
075    private Scheduler scheduler;
076    private long cleanupInterval = 30000;
077
078    private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
079    private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
080    private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
081    private boolean indexEnablePageCaching = true;
082
083    public Object getIndexLock() {
084        return indexLock;
085    }
086
087    @Override
088    public void setBrokerService(BrokerService brokerService) {
089        this.scheduler = brokerService.getScheduler();
090    }
091
092    public int getIndexPageSize() {
093        return indexPageSize;
094    }
095
096    public int getIndexCacheSize() {
097        return indexCacheSize;
098    }
099
100    public int getIndexWriteBatchSize() {
101        return indexWriteBatchSize;
102    }
103
104    public void setIndexPageSize(int indexPageSize) {
105        this.indexPageSize = indexPageSize;
106    }
107
108    public void setIndexCacheSize(int indexCacheSize) {
109        this.indexCacheSize = indexCacheSize;
110    }
111
112    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
113        this.indexWriteBatchSize = indexWriteBatchSize;
114    }
115
116    public boolean getIndexEnablePageCaching() {
117        return indexEnablePageCaching;
118    }
119
120    public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
121        this.indexEnablePageCaching = indexEnablePageCaching;
122    }
123
124    protected class MetaData {
125        protected MetaData(PListStore store) {
126            this.store = store;
127        }
128
129        private final PListStore store;
130        Page<MetaData> page;
131        BTreeIndex<String, PList> lists;
132
133        void createIndexes(Transaction tx) throws IOException {
134            this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
135        }
136
137        void load(Transaction tx) throws IOException {
138            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
139            this.lists.setValueMarshaller(new PListMarshaller(this.store));
140            this.lists.load(tx);
141        }
142
143        void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
144            for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
145                Entry<String, PList> entry = i.next();
146                entry.getValue().load(tx);
147                lists.put(entry.getKey(), entry.getValue());
148            }
149        }
150
151        public void read(DataInput is) throws IOException {
152            this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
153            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
154            this.lists.setValueMarshaller(new PListMarshaller(this.store));
155        }
156
157        public void write(DataOutput os) throws IOException {
158            os.writeLong(this.lists.getPageId());
159        }
160    }
161
162    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
163        private final PListStore store;
164
165        MetaDataMarshaller(PListStore store) {
166            this.store = store;
167        }
168        public MetaData readPayload(DataInput dataIn) throws IOException {
169            MetaData rc = new MetaData(this.store);
170            rc.read(dataIn);
171            return rc;
172        }
173
174        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
175            object.write(dataOut);
176        }
177    }
178
179    class PListMarshaller extends VariableMarshaller<PList> {
180        private final PListStore store;
181        PListMarshaller(PListStore store) {
182            this.store = store;
183        }
184        public PList readPayload(DataInput dataIn) throws IOException {
185            PList result = new PList(this.store);
186            result.read(dataIn);
187            return result;
188        }
189
190        public void writePayload(PList list, DataOutput dataOut) throws IOException {
191            list.write(dataOut);
192        }
193    }
194
195    public Journal getJournal() {
196        return this.journal;
197    }
198
199    public File getDirectory() {
200        return directory;
201    }
202
203    public void setDirectory(File directory) {
204        this.directory = directory;
205    }
206
207    public long size() {
208        synchronized (this) {
209            if (!initialized) {
210                return 0;
211            }
212        }
213        try {
214            return journal.getDiskSize() + pageFile.getDiskSize();
215        } catch (IOException e) {
216            throw new RuntimeException(e);
217        }
218    }
219
220    public PList getPList(final String name) throws Exception {
221        if (!isStarted()) {
222            throw new IllegalStateException("Not started");
223        }
224        intialize();
225        synchronized (indexLock) {
226            synchronized (this) {
227                PList result = this.persistentLists.get(name);
228                if (result == null) {
229                    final PList pl = new PList(this);
230                    pl.setName(name);
231                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
232                        public void execute(Transaction tx) throws IOException {
233                            pl.setHeadPageId(tx.allocate().getPageId());
234                            pl.load(tx);
235                            metaData.lists.put(tx, name, pl);
236                        }
237                    });
238                    result = pl;
239                    this.persistentLists.put(name, pl);
240                }
241                final PList toLoad = result;
242                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
243                    public void execute(Transaction tx) throws IOException {
244                        toLoad.load(tx);
245                    }
246                });
247
248                return result;
249            }
250        }
251    }
252
253    public boolean removePList(final String name) throws Exception {
254        boolean result = false;
255        synchronized (indexLock) {
256            synchronized (this) {
257                final PList pl = this.persistentLists.remove(name);
258                result = pl != null;
259                if (result) {
260                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
261                        public void execute(Transaction tx) throws IOException {
262                            metaData.lists.remove(tx, name);
263                            pl.destroy();
264                        }
265                    });
266                }
267            }
268        }
269        return result;
270    }
271
272    protected synchronized void intialize() throws Exception {
273        if (isStarted()) {
274            if (this.initialized == false) {
275                if (this.directory == null) {
276                    this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
277                }
278                IOHelper.mkdirs(this.directory);
279                lock();
280                this.journal = new Journal();
281                this.journal.setDirectory(directory);
282                this.journal.setMaxFileLength(getJournalMaxFileLength());
283                this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
284                this.journal.start();
285                this.pageFile = new PageFile(directory, "tmpDB");
286                this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
287                this.pageFile.setPageSize(getIndexPageSize());
288                this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
289                this.pageFile.setPageCacheSize(getIndexCacheSize());
290                this.pageFile.load();
291
292                this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
293                    public void execute(Transaction tx) throws IOException {
294                        if (pageFile.getPageCount() == 0) {
295                            Page<MetaData> page = tx.allocate();
296                            assert page.getPageId() == 0;
297                            page.set(metaData);
298                            metaData.page = page;
299                            metaData.createIndexes(tx);
300                            tx.store(metaData.page, metaDataMarshaller, true);
301
302                        } else {
303                            Page<MetaData> page = tx.load(0, metaDataMarshaller);
304                            metaData = page.get();
305                            metaData.page = page;
306                        }
307                        metaData.load(tx);
308                        metaData.loadLists(tx, persistentLists);
309                    }
310                });
311                this.pageFile.flush();
312
313                if (cleanupInterval > 0) {
314                    if (scheduler == null) {
315                        scheduler = new Scheduler(PListStore.class.getSimpleName());
316                        scheduler.start();
317                    }
318                    scheduler.executePeriodically(this, cleanupInterval);
319                }
320                this.initialized = true;
321                LOG.info(this + " initialized");
322            }
323        }
324    }
325
326    @Override
327    protected synchronized void doStart() throws Exception {
328        if (!lazyInit) {
329            intialize();
330        }
331        LOG.info(this + " started");
332    }
333
334    @Override
335    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
336        if (scheduler != null) {
337            if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
338                scheduler.stop();
339                scheduler = null;
340            }
341        }
342        for (PList pl : this.persistentLists.values()) {
343            pl.unload(null);
344        }
345        if (this.pageFile != null) {
346            this.pageFile.unload();
347        }
348        if (this.journal != null) {
349            journal.close();
350        }
351        if (this.lockFile != null) {
352            this.lockFile.unlock();
353        }
354        this.lockFile = null;
355        this.initialized = false;
356        LOG.info(this + " stopped");
357
358    }
359
360    public void run() {
361        try {
362            if (isStopping()) {
363                return;
364            }
365            final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
366            final Set<Integer> candidates = journal.getFileMap().keySet();
367            LOG.trace("Full gc candidate set:" + candidates);
368            if (candidates.size() > 1) {
369                // prune current write
370                for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
371                    if (iterator.next() >= lastJournalFileId) {
372                        iterator.remove();
373                    }
374                }
375                List<PList> plists = null;
376                synchronized (indexLock) {
377                    synchronized (this) {
378                        plists = new ArrayList<PList>(persistentLists.values());
379                    }
380                }
381                for (PList list : plists) {
382                    list.claimFileLocations(candidates);
383                    if (isStopping()) {
384                        return;
385                    }
386                    LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
387                }
388                LOG.trace("GC Candidate set:" + candidates);
389                this.journal.removeDataFiles(candidates);
390            }
391        } catch (IOException e) {
392            LOG.error("Exception on periodic cleanup: " + e, e);
393        }
394    }
395
396    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
397        ByteSequence result = null;
398        result = this.journal.read(location);
399        return result;
400    }
401
402    Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
403        return this.journal.write(payload, sync);
404    }
405
406    private void lock() throws IOException {
407        if (lockFile == null) {
408            File lockFileName = new File(directory, "lock");
409            lockFile = new LockFile(lockFileName, true);
410            if (failIfDatabaseIsLocked) {
411                lockFile.lock();
412            } else {
413                while (true) {
414                    try {
415                        lockFile.lock();
416                        break;
417                    } catch (IOException e) {
418                        LOG.info("Database " + lockFileName + " is locked... waiting "
419                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
420                                + " seconds for the database to be unlocked. Reason: " + e);
421                        try {
422                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
423                        } catch (InterruptedException e1) {
424                        }
425                    }
426                }
427            }
428        }
429    }
430
431    PageFile getPageFile() {
432        this.pageFile.isLoaded();
433        return this.pageFile;
434    }
435
436    public boolean isFailIfDatabaseIsLocked() {
437        return failIfDatabaseIsLocked;
438    }
439
440    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
441        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
442    }
443
444    public int getJournalMaxFileLength() {
445        return journalMaxFileLength;
446    }
447
448    public void setJournalMaxFileLength(int journalMaxFileLength) {
449        this.journalMaxFileLength = journalMaxFileLength;
450    }
451
452    public int getJournalMaxWriteBatchSize() {
453        return journalMaxWriteBatchSize;
454    }
455
456    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
457        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
458    }
459
460    public boolean isEnableIndexWriteAsync() {
461        return enableIndexWriteAsync;
462    }
463
464    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
465        this.enableIndexWriteAsync = enableIndexWriteAsync;
466    }
467
468    public long getCleanupInterval() {
469        return cleanupInterval;
470    }
471
472    public void setCleanupInterval(long cleanupInterval) {
473        this.cleanupInterval = cleanupInterval;
474    }
475
476    public boolean isLazyInit() {
477        return lazyInit;
478    }
479
480    public void setLazyInit(boolean lazyInit) {
481        this.lazyInit = lazyInit;
482    }
483
484    @Override
485    public String toString() {
486        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
487        return "PListStore:[" + path + " ]";
488    }
489}