001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.File;
020    import java.io.FileFilter;
021    import java.io.IOException;
022    import java.nio.charset.Charset;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.BrokerServiceAware;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.command.ActiveMQDestination;
033    import org.apache.activemq.command.ActiveMQQueue;
034    import org.apache.activemq.command.ActiveMQTopic;
035    import org.apache.activemq.command.LocalTransactionId;
036    import org.apache.activemq.command.ProducerId;
037    import org.apache.activemq.command.TransactionId;
038    import org.apache.activemq.command.XATransactionId;
039    import org.apache.activemq.filter.AnyDestination;
040    import org.apache.activemq.filter.DestinationMap;
041    import org.apache.activemq.protobuf.Buffer;
042    import org.apache.activemq.store.MessageStore;
043    import org.apache.activemq.store.PersistenceAdapter;
044    import org.apache.activemq.store.TopicMessageStore;
045    import org.apache.activemq.store.TransactionStore;
046    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
047    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
048    import org.apache.activemq.usage.SystemUsage;
049    import org.apache.activemq.util.IOHelper;
050    import org.apache.activemq.util.IntrospectionSupport;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
056     * distribution of destinations across multiple kahaDB persistence adapters
057     *
058     * @org.apache.xbean.XBean element="mKahaDB"
059     */
060    public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
061        static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
062    
063        final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
064        final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
065    
066        BrokerService brokerService;
067        List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
068        private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
069    
070        MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
071    
072        // all local store transactions are XA, 2pc if more than one adapter involved
073        TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
074            @Override
075            public KahaTransactionInfo transform(TransactionId txid) {
076                if (txid == null) {
077                    return null;
078                }
079                KahaTransactionInfo rc = new KahaTransactionInfo();
080                KahaXATransactionId kahaTxId = new KahaXATransactionId();
081                if (txid.isLocalTransaction()) {
082                    LocalTransactionId t = (LocalTransactionId) txid;
083                    kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
084                    kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
085                    kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
086                } else {
087                    XATransactionId t = (XATransactionId) txid;
088                    kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
089                    kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
090                    kahaTxId.setFormatId(t.getFormatId());
091                }
092                rc.setXaTransacitonId(kahaTxId);
093                return rc;
094            }
095        };
096    
097        /**
098         * Sets the  FilteredKahaDBPersistenceAdapter entries
099         *
100         * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
101         */
102        @SuppressWarnings({ "rawtypes", "unchecked" })
103        public void setFilteredPersistenceAdapters(List entries) {
104            for (Object entry : entries) {
105                FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
106                KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
107                if (filteredAdapter.getDestination() == null) {
108                    filteredAdapter.setDestination(matchAll);
109                }
110    
111                if (filteredAdapter.isPerDestination()) {
112                    configureDirectory(adapter, null);
113                    // per destination adapters will be created on demand or during recovery
114                    continue;
115                } else {
116                    configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
117                }
118    
119                configureAdapter(adapter);
120                adapters.add(adapter);
121            }
122            super.setEntries(entries);
123        }
124    
125        private String nameFromDestinationFilter(ActiveMQDestination destination) {
126            return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
127        }
128    
129        public boolean isLocalXid(TransactionId xid) {
130            return xid instanceof XATransactionId &&
131                    ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
132        }
133    
134        public void beginTransaction(ConnectionContext context) throws IOException {
135            throw new IllegalStateException();
136        }
137    
138        public void checkpoint(final boolean sync) throws IOException {
139            for (PersistenceAdapter persistenceAdapter : adapters) {
140                persistenceAdapter.checkpoint(sync);
141            }
142        }
143    
144        public void commitTransaction(ConnectionContext context) throws IOException {
145            throw new IllegalStateException();
146        }
147    
148        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
149            PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
150            return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
151        }
152    
153        private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
154            Object result = this.chooseValue(destination);
155            if (result == null) {
156                throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
157            }
158            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
159            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
160                result = addAdapter(filteredAdapter, destination);
161                startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
162                if (LOG.isTraceEnabled()) {
163                    LOG.info("created per destination adapter for: " + destination  + ", " + result);
164                }
165            }
166            return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
167        }
168    
169        private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
170            try {
171                kahaDBPersistenceAdapter.start();
172            } catch (Exception e) {
173                RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
174                LOG.error(detail.toString(), e);
175                throw detail;
176            }
177        }
178    
179        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
180            PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
181            return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
182        }
183    
184        public TransactionStore createTransactionStore() throws IOException {
185            return transactionStore;
186        }
187    
188        public void deleteAllMessages() throws IOException {
189            for (PersistenceAdapter persistenceAdapter : adapters) {
190                persistenceAdapter.deleteAllMessages();
191            }
192            transactionStore.deleteAllMessages();
193            IOHelper.deleteChildren(getDirectory());
194        }
195    
196        public Set<ActiveMQDestination> getDestinations() {
197            Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
198            for (PersistenceAdapter persistenceAdapter : adapters) {
199                results.addAll(persistenceAdapter.getDestinations());
200            }
201            return results;
202        }
203    
204        public long getLastMessageBrokerSequenceId() throws IOException {
205            long maxId = -1;
206            for (PersistenceAdapter persistenceAdapter : adapters) {
207                maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
208            }
209            return maxId;
210        }
211    
212        public long getLastProducerSequenceId(ProducerId id) throws IOException {
213            long maxId = -1;
214            for (PersistenceAdapter persistenceAdapter : adapters) {
215                maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
216            }
217            return maxId;
218        }
219    
220        public void removeQueueMessageStore(ActiveMQQueue destination) {
221            getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
222        }
223    
224        public void removeTopicMessageStore(ActiveMQTopic destination) {
225            getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
226        }
227    
228        public void rollbackTransaction(ConnectionContext context) throws IOException {
229            throw new IllegalStateException();
230        }
231    
232        public void setBrokerName(String brokerName) {
233            for (PersistenceAdapter persistenceAdapter : adapters) {
234                persistenceAdapter.setBrokerName(brokerName);
235            }
236        }
237    
238        public void setUsageManager(SystemUsage usageManager) {
239            for (PersistenceAdapter persistenceAdapter : adapters) {
240                persistenceAdapter.setUsageManager(usageManager);
241            }
242        }
243    
244        public long size() {
245            long size = 0;
246            for (PersistenceAdapter persistenceAdapter : adapters) {
247                size += persistenceAdapter.size();
248            }
249            return size;
250        }
251    
252        public void start() throws Exception {
253            Object result = this.chooseValue(matchAll);
254            if (result != null) {
255                FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
256                if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
257                    findAndRegisterExistingAdapters(filteredAdapter);
258                }
259            }
260            for (PersistenceAdapter persistenceAdapter : adapters) {
261                persistenceAdapter.start();
262            }
263        }
264    
265        private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
266            FileFilter destinationNames = new FileFilter() {
267                @Override
268                public boolean accept(File file) {
269                    return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
270                }
271            };
272            File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
273            if (candidates != null) {
274                for (File candidate : candidates) {
275                    registerExistingAdapter(template, candidate);
276                }
277            }
278        }
279    
280        private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
281            KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
282            startAdapter(adapter, candidate.getName());
283            registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]);
284        }
285    
286        private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
287            KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
288            return registerAdapter(adapter, destination);
289        }
290    
291        private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
292            KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
293            configureAdapter(adapter);
294            configureDirectory(adapter, destinationName);
295            return adapter;
296        }
297    
298        private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
299            File directory = null;
300            if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
301                // not set so inherit from mkahadb
302                directory = getDirectory();
303            } else {
304                directory = adapter.getDirectory();
305            }
306            if (fileName != null) {
307                directory = new File(directory, fileName);
308            }
309            adapter.setDirectory(directory);
310        }
311    
312        private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
313            adapters.add(adapter);
314            FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
315            put(destination, result);
316            return result;
317        }
318    
319        private void configureAdapter(KahaDBPersistenceAdapter adapter) {
320            // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
321            adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
322            adapter.setBrokerService(getBrokerService());
323        }
324    
325        private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
326            Map<String, Object> configuration = new HashMap<String, Object>();
327            IntrospectionSupport.getProperties(template, configuration, null);
328            KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
329            IntrospectionSupport.setProperties(adapter, configuration);
330            return adapter;
331        }
332    
333        public void stop() throws Exception {
334            for (PersistenceAdapter persistenceAdapter : adapters) {
335                persistenceAdapter.stop();
336            }
337        }
338    
339        public File getDirectory() {
340            return this.directory;
341        }
342    
343        @Override
344        public void setDirectory(File directory) {
345            this.directory = directory;
346        }
347    
348        public void setBrokerService(BrokerService brokerService) {
349            for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
350                persistenceAdapter.setBrokerService(brokerService);
351            }
352            this.brokerService = brokerService;
353        }
354    
355        public BrokerService getBrokerService() {
356            return brokerService;
357        }
358    
359        public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
360            this.transactionStore = transactionStore;
361        }
362    
363        /**
364         * Set the max file length of the transaction journal
365         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
366         * be used
367         *
368         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
369         */
370        public void setJournalMaxFileLength(int maxFileLength) {
371            transactionStore.setJournalMaxFileLength(maxFileLength);
372        }
373    
374        public int getJournalMaxFileLength() {
375            return transactionStore.getJournalMaxFileLength();
376        }
377    
378        /**
379         * Set the max write batch size of  the transaction journal
380         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
381         * be used
382         *
383         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
384         */
385        public void setJournalWriteBatchSize(int journalWriteBatchSize) {
386            transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
387        }
388    
389        public int getJournalWriteBatchSize() {
390            return transactionStore.getJournalMaxWriteBatchSize();
391        }
392    
393        @Override
394        public String toString() {
395            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
396            return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
397        }
398    
399    }