001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.activemq.broker.BrokerService;
028import org.apache.activemq.broker.BrokerServiceAware;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQQueue;
032import org.apache.activemq.command.ActiveMQTopic;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.ProducerId;
036import org.apache.activemq.kaha.CommandMarshaller;
037import org.apache.activemq.kaha.ContainerId;
038import org.apache.activemq.kaha.ListContainer;
039import org.apache.activemq.kaha.MapContainer;
040import org.apache.activemq.kaha.Marshaller;
041import org.apache.activemq.kaha.MessageIdMarshaller;
042import org.apache.activemq.kaha.MessageMarshaller;
043import org.apache.activemq.kaha.Store;
044import org.apache.activemq.kaha.StoreFactory;
045import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
046import org.apache.activemq.openwire.OpenWireFormat;
047import org.apache.activemq.store.MessageStore;
048import org.apache.activemq.store.PersistenceAdapter;
049import org.apache.activemq.store.TopicMessageStore;
050import org.apache.activemq.store.TransactionStore;
051import org.apache.activemq.usage.SystemUsage;
052import org.apache.activemq.util.IOHelper;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * @org.apache.xbean.XBean
058 * 
059 */
060public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
061
062    private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
063    private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class);
064    private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
065
066    protected OpenWireFormat wireFormat = new OpenWireFormat();
067    protected KahaTransactionStore transactionStore;
068    protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
069    protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
070    protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
071
072    private long maxDataFileLength = 32 * 1024 * 1024;
073    private File directory;
074    private String brokerName;
075    private Store theStore;
076    private boolean initialized;
077    private final AtomicLong storeSize;
078    private boolean persistentIndex = true;
079    private BrokerService brokerService;
080
081    
082    public KahaPersistenceAdapter(AtomicLong size) {
083        this.storeSize=size;
084    }
085    
086    public KahaPersistenceAdapter() {
087        this(new AtomicLong());
088    }
089    
090    public Set<ActiveMQDestination> getDestinations() {
091        Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
092        try {
093            Store store = getStore();
094            for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
095                ContainerId id = (ContainerId)i.next();
096                Object obj = id.getKey();
097                if (obj instanceof ActiveMQDestination) {
098                    rc.add((ActiveMQDestination)obj);
099                }
100            }
101        } catch (IOException e) {
102            LOG.error("Failed to get destinations ", e);
103        }
104        return rc;
105    }
106
107    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
108        MessageStore rc = queues.get(destination);
109        if (rc == null) {
110            rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
111            messageStores.put(destination, rc);
112            if (transactionStore != null) {
113                rc = transactionStore.proxy(rc);
114            }
115            queues.put(destination, rc);
116        }
117        return rc;
118    }
119
120    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
121        throws IOException {
122        TopicMessageStore rc = topics.get(destination);
123        if (rc == null) {
124            Store store = getStore();
125            MapContainer messageContainer = getMapContainer(destination, "topic-data");
126            MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
127                                                             "topic-subs");
128            ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
129                                                                             "topic-acks");
130            ackContainer.setMarshaller(new TopicSubAckMarshaller());
131            rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
132            messageStores.put(destination, rc);
133            if (transactionStore != null) {
134                rc = transactionStore.proxy(rc);
135            }
136            topics.put(destination, rc);
137        }
138        return rc;
139    }
140
141    /**
142     * Cleanup method to remove any state associated with the given destination
143     *
144     * @param destination Destination to forget
145     */
146    public void removeQueueMessageStore(ActiveMQQueue destination) {
147        queues.remove(destination);
148        try{
149                if(theStore!=null){
150                        theStore.deleteMapContainer(destination,"queue-data");
151                }
152        }catch(IOException e ){
153                LOG.error("Failed to remove store map container for queue:"+destination, e);
154        }
155    }
156
157    /**
158     * Cleanup method to remove any state associated with the given destination
159     *
160     * @param destination Destination to forget
161     */
162    public void removeTopicMessageStore(ActiveMQTopic destination) {
163        topics.remove(destination);
164    }
165
166    protected MessageStore retrieveMessageStore(Object id) {
167        MessageStore result = messageStores.get(id);
168        return result;
169    }
170
171    public TransactionStore createTransactionStore() throws IOException {
172        if (transactionStore == null) {
173            while (true) {
174                try {
175                    Store store = getStore();
176                    MapContainer container = store
177                        .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
178                    container.setKeyMarshaller(new CommandMarshaller(wireFormat));
179                    container.setValueMarshaller(new TransactionMarshaller(wireFormat));
180                    container.load();
181                    transactionStore = new KahaTransactionStore(this, container);
182                    transactionStore.setBrokerService(brokerService);
183                    break;
184                } catch (StoreLockedExcpetion e) {
185                    LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
186                             + " seconds for the Store to be unlocked.");
187                    try {
188                        Thread.sleep(STORE_LOCKED_WAIT_DELAY);
189                    } catch (InterruptedException e1) {
190                    }
191                }
192            }
193        }
194        return transactionStore;
195    }
196
197    public void beginTransaction(ConnectionContext context) {
198    }
199
200    public void commitTransaction(ConnectionContext context) throws IOException {
201        if (theStore != null) {
202            theStore.force();
203        }
204    }
205
206    public void rollbackTransaction(ConnectionContext context) {
207    }
208
209    public void start() throws Exception {
210        initialize();
211    }
212
213    public void stop() throws Exception {
214        if (theStore != null) {
215            theStore.close();
216        }
217    }
218
219    public long getLastMessageBrokerSequenceId() throws IOException {
220        return 0;
221    }
222
223    public void deleteAllMessages() throws IOException {
224        if (theStore != null) {
225            if (theStore.isInitialized()) {
226                theStore.clear();
227            } else {
228                theStore.delete();
229            }
230        } else {
231            StoreFactory.delete(getStoreDirectory());
232        }
233    }
234
235    protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
236        throws IOException {
237        Store store = getStore();
238        MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
239        container.setKeyMarshaller(new MessageIdMarshaller());
240        container.setValueMarshaller(new MessageMarshaller(wireFormat));
241        container.load();
242        return container;
243    }
244
245    protected MapContainer getSubsMapContainer(Object id, String containerName)
246        throws IOException {
247        Store store = getStore();
248        MapContainer container = store.getMapContainer(id, containerName);
249        container.setKeyMarshaller(Store.STRING_MARSHALLER);
250        container.setValueMarshaller(createMessageMarshaller());
251        container.load();
252        return container;
253    }
254
255    protected Marshaller<Object> createMessageMarshaller() {
256        return new CommandMarshaller(wireFormat);
257    }
258
259    protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException {
260        Store store = getStore();
261        ListContainer<TopicSubAck> container = store.getListContainer(id, containerName);
262        container.setMarshaller(createMessageMarshaller());
263        container.load();
264        return container;
265    }
266
267    /**
268     * @param usageManager The UsageManager that is controlling the broker's
269     *                memory usage.
270     */
271    public void setUsageManager(SystemUsage usageManager) {
272    }
273
274    /**
275     * @return the maxDataFileLength
276     */
277    public long getMaxDataFileLength() {
278        return maxDataFileLength;
279    }
280    
281    public boolean isPersistentIndex() {
282                return persistentIndex;
283        }
284
285        public void setPersistentIndex(boolean persistentIndex) {
286                this.persistentIndex = persistentIndex;
287        }
288
289    /**
290     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
291     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
292     */
293    public void setMaxDataFileLength(long maxDataFileLength) {
294        this.maxDataFileLength = maxDataFileLength;
295    }
296
297    protected final synchronized Store getStore() throws IOException {
298        if (theStore == null) {
299            theStore = createStore();
300        }
301        return theStore;
302    }
303    
304    protected final Store createStore() throws IOException {
305        Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
306        result.setMaxDataFileLength(maxDataFileLength);
307        result.setPersistentIndex(isPersistentIndex());
308        result.setDefaultContainerName("container-roots");
309        return result;
310    }
311
312    private String getStoreName() {
313        initialize();
314        return directory.getAbsolutePath();
315    }
316
317    private File getStoreDirectory() {
318        initialize();
319        return directory;
320    }
321
322    public String toString() {
323        return "KahaPersistenceAdapter(" + getStoreName() + ")";
324    }
325
326    public void setBrokerName(String brokerName) {
327        this.brokerName = brokerName;
328    }
329
330    public String getBrokerName() {
331        return brokerName;
332    }
333
334    public File getDirectory() {
335        return this.directory;
336    }
337
338    public void setDirectory(File directory) {
339        this.directory = directory;
340    }
341
342    public void checkpoint(boolean sync) throws IOException {
343        if (sync) {
344            getStore().force();
345        }
346    }
347   
348    public long size(){
349       return storeSize.get();
350    }
351
352    private void initialize() {
353        if (!initialized) {
354            initialized = true;
355            if (this.directory == null) {
356                File file = new File(IOHelper.getDefaultDataDirectory());
357                file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore");
358                setDirectory(file);
359            }
360            try {
361                IOHelper.mkdirs(this.directory);
362            } catch (IOException e) {
363                throw new RuntimeException(e);
364            }
365            wireFormat.setCacheEnabled(false);
366            wireFormat.setTightEncodingEnabled(true);
367        }
368    }
369
370        public void setBrokerService(BrokerService brokerService) {
371                this.brokerService = brokerService;
372        }
373
374    public long getLastProducerSequenceId(ProducerId id) {
375        // reference store send has adequate duplicate suppression
376        return -1;
377    }
378  
379
380}