001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.data;
018
019import java.io.IOException;
020import java.io.RandomAccessFile;
021
022import org.apache.activemq.kaha.Marshaller;
023import org.apache.activemq.util.DataByteArrayOutputStream;
024
025/**
026 * Optimized Store writer. Synchronously marshalls and writes to the data file.
027 * Simple but may introduce a bit of contention when put under load.
028 * 
029 * 
030 */
031public final class SyncDataFileWriter {
032
033    private DataByteArrayOutputStream buffer;
034    private DataManagerImpl dataManager;
035
036    /**
037     * Construct a Store writer
038     * 
039     * @param fileId
040     */
041    SyncDataFileWriter(DataManagerImpl fileManager) {
042        this.dataManager = fileManager;
043        this.buffer = new DataByteArrayOutputStream();
044    }
045
046    /*
047     * (non-Javadoc)
048     * 
049     * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
050     *      java.lang.Object, byte)
051     */
052    public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
053        throws IOException {
054
055        // Write the packet our internal buffer.
056        buffer.reset();
057        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
058        marshaller.writePayload(payload, buffer);
059        int size = buffer.size();
060        int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
061        buffer.reset();
062        buffer.writeByte(type);
063        buffer.writeInt(payloadSize);
064
065        // Find the position where this item will land at.
066        DataItem item = new DataItem();
067        item.setSize(payloadSize);
068        DataFile dataFile = dataManager.findSpaceForData(item);
069
070        // Now splat the buffer to the file.
071        dataFile.getRandomAccessFile().seek(item.getOffset());
072        dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
073        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
074
075        dataManager.addInterestInFile(dataFile);
076        return item;
077    }
078
079    /*
080     * (non-Javadoc)
081     * 
082     * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
083     *      org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
084     */
085    public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
086        throws IOException {
087        // Write the packet our internal buffer.
088        buffer.reset();
089        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
090        marshaller.writePayload(payload, buffer);
091        int size = buffer.size();
092        int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
093        buffer.reset();
094        buffer.writeByte(type);
095        buffer.writeInt(payloadSize);
096        item.setSize(payloadSize);
097        DataFile dataFile = dataManager.getDataFile(item);
098        RandomAccessFile file = dataFile.getRandomAccessFile();
099        file.seek(item.getOffset());
100        file.write(buffer.getData(), 0, size);
101        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
102    }
103
104    public synchronized void force(DataFile dataFile) throws IOException {
105        // If our dirty marker was set.. then we need to sync
106        if (dataFile.getWriterData() != null && dataFile.isDirty()) {
107            dataFile.getRandomAccessFile().getFD().sync();
108            dataFile.setWriterData(null);
109            dataFile.setDirty(false);
110        }
111    }
112
113    public void close() throws IOException {
114    }
115}