001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.async;
018
019import java.io.IOException;
020
021import org.apache.activemq.kaha.Marshaller;
022import org.apache.activemq.kaha.StoreLocation;
023import org.apache.activemq.kaha.impl.data.RedoListener;
024import org.apache.activemq.util.ByteSequence;
025import org.apache.activemq.util.DataByteArrayInputStream;
026import org.apache.activemq.util.DataByteArrayOutputStream;
027
028/**
029 * Provides a Kaha DataManager Facade to the DataManager.
030 * 
031 * 
032 */
033public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager {
034
035    private static final ByteSequence FORCE_COMMAND = new ByteSequence(new byte[] {'F', 'O', 'R', 'C', 'E'});
036
037    private AsyncDataManager dataManager;
038    private final String name;
039    private Marshaller redoMarshaller;
040
041    private static class StoreLocationFacade implements StoreLocation {
042        private final Location location;
043
044        public StoreLocationFacade(Location location) {
045            this.location = location;
046        }
047
048        public int getFile() {
049            return location.getDataFileId();
050        }
051
052        public long getOffset() {
053            return location.getOffset();
054        }
055
056        public int getSize() {
057            return location.getSize();
058        }
059
060        public Location getLocation() {
061            return location;
062        }
063    }
064
065    public DataManagerFacade(AsyncDataManager dataManager, String name) {
066        this.dataManager = dataManager;
067        this.name = name;
068    }
069
070    private static StoreLocation convertToStoreLocation(Location location) {
071        if (location == null) {
072            return null;
073        }
074        return new StoreLocationFacade(location);
075    }
076
077    private static Location convertFromStoreLocation(StoreLocation location) {
078
079        if (location == null) {
080            return null;
081        }
082
083        if (location.getClass() == StoreLocationFacade.class) {
084            return ((StoreLocationFacade)location).getLocation();
085        }
086
087        Location l = new Location();
088        l.setOffset((int)location.getOffset());
089        l.setSize(location.getSize());
090        l.setDataFileId(location.getFile());
091        return l;
092    }
093
094
095    public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
096        ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
097        DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
098        return marshaller.readPayload(dataIn);
099    }
100
101    public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
102        final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
103        marshaller.writePayload(payload, buffer);
104        ByteSequence data = buffer.toByteSequence();
105        return convertToStoreLocation(dataManager.write(data, (byte)1, false));
106    }
107
108    public void force() throws IOException {
109        dataManager.write(FORCE_COMMAND, (byte)2, true);
110    }
111
112    public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
113        final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
114        marshaller.writePayload(payload, buffer);
115        ByteSequence data = buffer.toByteSequence();
116        dataManager.update(convertFromStoreLocation(location), data, false);
117    }
118
119    public void close() throws IOException {
120        dataManager.close();
121    }
122
123    public void consolidateDataFiles() throws IOException {
124        dataManager.consolidateDataFiles();
125    }
126
127    public boolean delete() throws IOException {
128        return dataManager.delete();
129    }
130
131    public void addInterestInFile(int file) throws IOException {
132        dataManager.addInterestInFile(file);
133    }
134
135    public void removeInterestInFile(int file) throws IOException {
136        dataManager.removeInterestInFile(file);
137    }
138
139    public void recoverRedoItems(RedoListener listener) throws IOException {
140        throw new RuntimeException("Not Implemented..");
141    }
142
143    public StoreLocation storeRedoItem(Object payload) throws IOException {
144        throw new RuntimeException("Not Implemented..");
145    }
146
147    public Marshaller getRedoMarshaller() {
148        return redoMarshaller;
149    }
150
151    public void setRedoMarshaller(Marshaller redoMarshaller) {
152        this.redoMarshaller = redoMarshaller;
153    }
154
155    public String getName() {
156        return name;
157    }
158
159}