001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.kaha.impl.data; 018 019import java.io.IOException; 020import java.io.RandomAccessFile; 021 022import org.apache.activemq.kaha.Marshaller; 023import org.apache.activemq.util.DataByteArrayOutputStream; 024 025/** 026 * Optimized Store writer. Synchronously marshalls and writes to the data file. 027 * Simple but may introduce a bit of contention when put under load. 028 * 029 * 030 */ 031public final class SyncDataFileWriter { 032 033 private DataByteArrayOutputStream buffer; 034 private DataManagerImpl dataManager; 035 036 /** 037 * Construct a Store writer 038 * 039 * @param fileId 040 */ 041 SyncDataFileWriter(DataManagerImpl fileManager) { 042 this.dataManager = fileManager; 043 this.buffer = new DataByteArrayOutputStream(); 044 } 045 046 /* 047 * (non-Javadoc) 048 * 049 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller, 050 * java.lang.Object, byte) 051 */ 052 public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) 053 throws IOException { 054 055 // Write the packet our internal buffer. 056 buffer.reset(); 057 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); 058 marshaller.writePayload(payload, buffer); 059 int size = buffer.size(); 060 int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE; 061 buffer.reset(); 062 buffer.writeByte(type); 063 buffer.writeInt(payloadSize); 064 065 // Find the position where this item will land at. 066 DataItem item = new DataItem(); 067 item.setSize(payloadSize); 068 DataFile dataFile = dataManager.findSpaceForData(item); 069 070 // Now splat the buffer to the file. 071 dataFile.getRandomAccessFile().seek(item.getOffset()); 072 dataFile.getRandomAccessFile().write(buffer.getData(), 0, size); 073 dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker.. 074 075 dataManager.addInterestInFile(dataFile); 076 return item; 077 } 078 079 /* 080 * (non-Javadoc) 081 * 082 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation, 083 * org.apache.activemq.kaha.Marshaller, java.lang.Object, byte) 084 */ 085 public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type) 086 throws IOException { 087 // Write the packet our internal buffer. 088 buffer.reset(); 089 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); 090 marshaller.writePayload(payload, buffer); 091 int size = buffer.size(); 092 int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE; 093 buffer.reset(); 094 buffer.writeByte(type); 095 buffer.writeInt(payloadSize); 096 item.setSize(payloadSize); 097 DataFile dataFile = dataManager.getDataFile(item); 098 RandomAccessFile file = dataFile.getRandomAccessFile(); 099 file.seek(item.getOffset()); 100 file.write(buffer.getData(), 0, size); 101 dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker.. 102 } 103 104 public synchronized void force(DataFile dataFile) throws IOException { 105 // If our dirty marker was set.. then we need to sync 106 if (dataFile.getWriterData() != null && dataFile.isDirty()) { 107 dataFile.getRandomAccessFile().getFD().sync(); 108 dataFile.setWriterData(null); 109 dataFile.setDirty(false); 110 } 111 } 112 113 public void close() throws IOException { 114 } 115}