001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.AtomicLong;
029
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ActiveMQQueue;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.kaha.CommandMarshaller;
038import org.apache.activemq.kaha.ListContainer;
039import org.apache.activemq.kaha.MapContainer;
040import org.apache.activemq.kaha.MessageIdMarshaller;
041import org.apache.activemq.kaha.Store;
042import org.apache.activemq.kaha.StoreFactory;
043import org.apache.activemq.kaha.impl.index.hash.HashIndex;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.ReferenceStore;
046import org.apache.activemq.store.ReferenceStoreAdapter;
047import org.apache.activemq.store.TopicMessageStore;
048import org.apache.activemq.store.TopicReferenceStore;
049import org.apache.activemq.store.amq.AMQTx;
050import org.apache.activemq.util.IOHelper;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
055
056    
057
058    private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStoreAdapter.class);
059    private static final String STORE_STATE = "store-state";
060    private static final String QUEUE_DATA = "queue-data";
061    private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
062    private static final Integer INDEX_VERSION = new Integer(7);
063    private static final String RECORD_REFERENCES = "record-references";
064    private static final String TRANSACTIONS = "transactions-state";
065    private MapContainer stateMap;
066    private MapContainer<TransactionId, AMQTx> preparedTransactions;
067    private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>();
068    private ListContainer<SubscriptionInfo> durableSubscribers;
069    private boolean storeValid;
070    private Store stateStore;
071    private boolean persistentIndex = true;
072    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
073    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
074    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
075    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
076    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
077   
078
079    public KahaReferenceStoreAdapter(AtomicLong size){
080        super(size);
081    }
082    
083    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
084        throw new RuntimeException("Use createQueueReferenceStore instead");
085    }
086
087    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
088        throws IOException {
089        throw new RuntimeException("Use createTopicReferenceStore instead");
090    }
091
092    @Override
093    public synchronized void start() throws Exception {
094        super.start();
095        Store store = getStateStore();        
096        boolean empty = store.getMapContainerIds().isEmpty();
097        stateMap = store.getMapContainer("state", STORE_STATE);
098        stateMap.load();
099        storeValid=true;
100        if (!empty) {
101            AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
102            if (status != null) {
103                storeValid = status.get();
104            }
105           
106            if (storeValid) {
107                //check what version the indexes are at
108                Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
109                if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) {
110                    storeValid = false;
111                    LOG.warn("Indexes at an older version - need to regenerate");
112                }
113            }
114            if (storeValid) {
115                if (stateMap.containsKey(RECORD_REFERENCES)) {
116                    recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
117                }
118            }
119        }
120        stateMap.put(STORE_STATE, new AtomicBoolean());
121        stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
122        durableSubscribers = store.getListContainer("durableSubscribers");
123        durableSubscribers.setMarshaller(new CommandMarshaller());
124        preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
125        // need to set the Marshallers here
126        preparedTransactions.setKeyMarshaller(Store.COMMAND_MARSHALLER);
127        preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
128    }
129
130    @Override
131    public synchronized void stop() throws Exception {
132        stateMap.put(RECORD_REFERENCES, recordReferences);
133        stateMap.put(STORE_STATE, new AtomicBoolean(true));
134        stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
135        if (this.stateStore != null) {
136            this.stateStore.close();
137            this.stateStore = null;
138            this.stateMap = null;
139        }
140        super.stop();
141    }
142    
143    public void commitTransaction(ConnectionContext context) throws IOException {
144        //we don;t need to force on a commit - as the reference store
145        //is rebuilt on a non clean shutdown
146    }
147
148    public boolean isStoreValid() {
149        return storeValid;
150    }
151
152    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
153        ReferenceStore rc = (ReferenceStore)queues.get(destination);
154        if (rc == null) {
155            rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, QUEUE_DATA),
156                                        destination);
157            messageStores.put(destination, rc);
158            // if(transactionStore!=null){
159            // rc=transactionStore.proxy(rc);
160            // }
161            queues.put(destination, rc);
162        }
163        return rc;
164    }
165
166    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
167        TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination);
168        if (rc == null) {
169            Store store = getStore();
170            MapContainer messageContainer = getMapReferenceContainer(destination.getPhysicalName(), "topic-data");
171            MapContainer subsContainer = getSubsMapContainer(destination.getPhysicalName() + "-Subscriptions", "blob");
172            ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.getPhysicalName(), "topic-acks");
173            ackContainer.setMarshaller(new TopicSubAckMarshaller());
174            rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer,
175                                             destination);
176            messageStores.put(destination, rc);
177            // if(transactionStore!=null){
178            // rc=transactionStore.proxy(rc);
179            // }
180            topics.put(destination, rc);
181        }
182        return rc;
183    }
184
185    public void removeReferenceStore(KahaReferenceStore referenceStore) {
186        ActiveMQDestination destination = referenceStore.getDestination();
187        if (destination.isQueue()) {
188            queues.remove(destination);
189            try {
190                getStore().deleteMapContainer(destination, QUEUE_DATA);
191            } catch (IOException e) {
192                LOG.error("Failed to delete " + QUEUE_DATA + " map container for destination: " + destination, e);
193            }
194        } else {
195            topics.remove(destination);
196        }
197        messageStores.remove(destination);
198    }
199/*
200    public void buildReferenceFileIdsInUse() throws IOException {
201        recordReferences = new HashMap<Integer, AtomicInteger>();
202        Set<ActiveMQDestination> destinations = getDestinations();
203        for (ActiveMQDestination destination : destinations) {
204            if (destination.isQueue()) {
205                KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
206                store.addReferenceFileIdsInUse();
207            } else {
208                KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
209                store.addReferenceFileIdsInUse();
210            }
211        }
212    }
213    */
214
215    protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
216                                                                                String containerName)
217        throws IOException {
218        Store store = getStore();
219        MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex);
220        container.setIndexBinSize(getIndexBinSize());
221        container.setIndexKeySize(getIndexKeySize());
222        container.setIndexPageSize(getIndexPageSize());
223        container.setIndexMaxBinSize(getIndexMaxBinSize());
224        container.setIndexLoadFactor(getIndexLoadFactor());
225        container.setKeyMarshaller(new MessageIdMarshaller());
226        container.setValueMarshaller(new ReferenceRecordMarshaller());
227        container.load();
228        return container;
229    }
230
231    synchronized void addInterestInRecordFile(int recordNumber) {
232        Integer key = Integer.valueOf(recordNumber);
233        AtomicInteger rr = recordReferences.get(key);
234        if (rr == null) {
235            rr = new AtomicInteger();
236            recordReferences.put(key, rr);
237        }
238        rr.incrementAndGet();
239    }
240
241    synchronized void removeInterestInRecordFile(int recordNumber) {
242        Integer key = Integer.valueOf(recordNumber);
243        AtomicInteger rr = recordReferences.get(key);
244        if (rr != null && rr.decrementAndGet() <= 0) {
245            recordReferences.remove(key);
246        }
247    }
248
249    /**
250     * @return
251     * @throws IOException
252     * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
253     */
254    public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
255        Set inUse = new HashSet<Integer>(recordReferences.keySet());
256
257        Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator();
258        while (ackReferences.hasNext()) {
259            Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next();
260            if (!inUse.contains(ackReference.getKey())) {
261                // should we keep this data file
262                for (Integer referencedFileId : ackReference.getValue()) {
263                    if (inUse.contains(referencedFileId)) {
264                        // keep this ack file
265                        inUse.add(ackReference.getKey());
266                        LOG.debug("not removing data file: " + ackReference.getKey()
267                                        + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue());
268                        break;
269                    }
270                }
271            }
272            if (!inUse.contains(ackReference.getKey())) {
273               ackReferences.remove();
274            }
275        }
276
277        return inUse;
278    }
279
280    Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
281    public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) {
282        Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId));
283        if (referenceFileIds == null) {
284            referenceFileIds = new HashSet<Integer>();
285            referenceFileIds.add(Integer.valueOf(messageFileId));
286            ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds);
287        } else {
288            Integer id = Integer.valueOf(messageFileId);
289            if (!referenceFileIds.contains(id)) {
290                referenceFileIds.add(id);
291            }
292        }
293    }
294
295    /**
296     *
297     * @throws IOException
298     * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
299     */
300    public void clearMessages() throws IOException {
301        //don't delete messages as it will clear state - call base
302        //class method to clear out the data instead
303        super.deleteAllMessages();
304    }
305
306    /**
307     *
308     * @throws IOException
309     * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
310     */
311
312    public void recoverState() throws IOException {
313        Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
314        for (SubscriptionInfo info:set) {
315            LOG.info("Recovering subscriber state for durable subscriber: " + info);
316            TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
317            ts.addSubsciption(info, false);
318        }
319    }
320    
321    public void recoverSubscription(SubscriptionInfo info) throws IOException {
322        TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
323        LOG.info("Recovering subscriber state for durable subscriber: " + info);
324        ts.addSubsciption(info, false);
325    }
326    
327
328    public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
329        Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
330        preparedTransactions.load();
331        for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) {
332            TransactionId key = i.next();
333            AMQTx value = preparedTransactions.get(key);
334            result.put(key, value);
335        }
336        return result;
337    }
338
339    public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException {
340        preparedTransactions.clear();
341        for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) {
342            Map.Entry<TransactionId, AMQTx> entry = iter.next();
343            preparedTransactions.put(entry.getKey(), entry.getValue());
344        }
345    }
346
347    @Override
348    public synchronized void setDirectory(File directory) {
349        File file = new File(directory, "data");
350        super.setDirectory(file);
351        this.stateStore = createStateStore(directory);
352    }
353
354    protected synchronized Store getStateStore() throws IOException {
355        if (this.stateStore == null) {
356            File stateDirectory = new File(getDirectory(), "kr-state");
357            IOHelper.mkdirs(stateDirectory);
358            this.stateStore = createStateStore(getDirectory());
359        }
360        return this.stateStore;
361    }
362
363    public void deleteAllMessages() throws IOException {
364        super.deleteAllMessages();
365        if (stateStore != null) {
366            if (stateStore.isInitialized()) {
367                stateStore.clear();
368            } else {
369                stateStore.delete();
370            }
371        } else {
372            File stateDirectory = new File(getDirectory(), "kr-state");
373            StoreFactory.delete(stateDirectory);
374        }
375    }
376
377    public boolean isPersistentIndex() {
378                return persistentIndex;
379        }
380
381        public void setPersistentIndex(boolean persistentIndex) {
382                this.persistentIndex = persistentIndex;
383        }
384
385    private Store createStateStore(File directory) {
386        File stateDirectory = new File(directory, "state");
387        try {
388            IOHelper.mkdirs(stateDirectory);
389            return StoreFactory.open(stateDirectory, "rw");
390        } catch (IOException e) {
391            LOG.error("Failed to create the state store", e);
392        }
393        return null;
394    }
395
396    protected void addSubscriberState(SubscriptionInfo info) throws IOException {
397        durableSubscribers.add(info);
398    }
399
400    protected void removeSubscriberState(SubscriptionInfo info) {
401        durableSubscribers.remove(info);
402    }
403
404    public int getIndexBinSize() {
405        return indexBinSize;
406    }
407
408    public void setIndexBinSize(int indexBinSize) {
409        this.indexBinSize = indexBinSize;
410    }
411
412    public int getIndexKeySize() {
413        return indexKeySize;
414    }
415
416    public void setIndexKeySize(int indexKeySize) {
417        this.indexKeySize = indexKeySize;
418    }
419
420    public int getIndexPageSize() {
421        return indexPageSize;
422    }
423
424    public void setIndexPageSize(int indexPageSize) {
425        this.indexPageSize = indexPageSize;
426    }
427
428    public int getIndexMaxBinSize() {
429        return indexMaxBinSize;
430    }
431
432    public void setIndexMaxBinSize(int maxBinSize) {
433        this.indexMaxBinSize = maxBinSize;
434    }
435
436    /**
437     * @return the loadFactor
438     */
439    public int getIndexLoadFactor() {
440        return indexLoadFactor;
441    }
442
443    /**
444     * @param loadFactor the loadFactor to set
445     */
446    public void setIndexLoadFactor(int loadFactor) {
447        this.indexLoadFactor = loadFactor;
448    }
449
450
451}