001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.TreeSet;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.Future;
027
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.command.TransactionId;
034import org.apache.activemq.command.XATransactionId;
035import org.apache.activemq.store.AbstractMessageStore;
036import org.apache.activemq.store.MessageStore;
037import org.apache.activemq.store.ProxyMessageStore;
038import org.apache.activemq.store.ProxyTopicMessageStore;
039import org.apache.activemq.store.TopicMessageStore;
040import org.apache.activemq.store.TransactionRecoveryListener;
041import org.apache.activemq.store.TransactionStore;
042import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
043import org.apache.activemq.store.kahadb.data.KahaEntryType;
044import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
045import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
046import org.apache.activemq.util.IOHelper;
047import org.apache.kahadb.journal.Journal;
048import org.apache.kahadb.journal.Location;
049import org.apache.kahadb.util.DataByteArrayInputStream;
050import org.apache.kahadb.util.DataByteArrayOutputStream;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054public class MultiKahaDBTransactionStore implements TransactionStore {
055    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
056    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
057    final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
058    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
059    private Journal journal;
060    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
061    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
062
063    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
064        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
065    }
066
067    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
068        return new ProxyMessageStore(messageStore) {
069            @Override
070            public void addMessage(ConnectionContext context, final Message send) throws IOException {
071                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
072            }
073
074            @Override
075            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
076                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
077            }
078
079            @Override
080            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
081                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
082            }
083
084            @Override
085            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
086                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
087            }
088
089            @Override
090            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
091                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
092            }
093
094            @Override
095            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
096                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
097            }
098        };
099    }
100
101    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
102        return new ProxyTopicMessageStore(messageStore) {
103            @Override
104            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
105                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
106            }
107
108            @Override
109            public void addMessage(ConnectionContext context, final Message send) throws IOException {
110                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
111            }
112
113            @Override
114            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
115                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
116            }
117
118            @Override
119            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
120                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
121            }
122
123            @Override
124            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
125                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
126            }
127
128            @Override
129            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
130                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
131            }
132
133            @Override
134            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
135                                    MessageId messageId, MessageAck ack) throws IOException {
136                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
137                        subscriptionName, messageId, ack);
138            }
139        };
140    }
141
142    public void deleteAllMessages() {
143        IOHelper.deleteChildren(getDirectory());
144    }
145
146    public int getJournalMaxFileLength() {
147        return journalMaxFileLength;
148    }
149
150    public void setJournalMaxFileLength(int journalMaxFileLength) {
151        this.journalMaxFileLength = journalMaxFileLength;
152    }
153
154    public int getJournalMaxWriteBatchSize() {
155        return journalWriteBatchSize;
156    }
157
158    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
159        this.journalWriteBatchSize = journalWriteBatchSize;
160    }
161
162    public class Tx {
163        private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
164        private int prepareLocationId = 0;
165
166        public void trackStore(TransactionStore store) {
167            stores.add(store);
168        }
169
170        public Set<TransactionStore> getStores() {
171            return stores;
172        }
173
174        public void trackPrepareLocation(Location location) {
175            this.prepareLocationId = location.getDataFileId();
176        }
177
178        public int getPreparedLocationId() {
179            return prepareLocationId;
180        }
181    }
182
183    public Tx getTx(TransactionId txid) {
184        Tx tx = inflightTransactions.get(txid);
185        if (tx == null) {
186            tx = new Tx();
187            inflightTransactions.put(txid, tx);
188        }
189        return tx;
190    }
191
192    public Tx removeTx(TransactionId txid) {
193        return inflightTransactions.remove(txid);
194    }
195
196    public void prepare(TransactionId txid) throws IOException {
197        Tx tx = getTx(txid);
198        for (TransactionStore store : tx.getStores()) {
199            store.prepare(txid);
200        }
201    }
202
203    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
204            throws IOException {
205
206        if (preCommit != null) {
207            preCommit.run();
208        }
209
210        Tx tx = getTx(txid);
211        if (wasPrepared) {
212            for (TransactionStore store : tx.getStores()) {
213                store.commit(txid, true, null, null);
214            }
215        } else {
216            // can only do 1pc on a single store
217            if (tx.getStores().size() == 1) {
218                for (TransactionStore store : tx.getStores()) {
219                    store.commit(txid, false, null, null);
220                }
221            } else {
222                // need to do local 2pc
223                for (TransactionStore store : tx.getStores()) {
224                    store.prepare(txid);
225                }
226                persistOutcome(tx, txid);
227                for (TransactionStore store : tx.getStores()) {
228                    store.commit(txid, true, null, null);
229                }
230                persistCompletion(txid);
231            }
232        }
233        removeTx(txid);
234        if (postCommit != null) {
235            postCommit.run();
236        }
237    }
238
239    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
240        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
241    }
242
243    public void persistCompletion(TransactionId txid) throws IOException {
244        store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
245    }
246
247    private Location store(JournalCommand<?> data) throws IOException {
248        int size = data.serializedSizeFramed();
249        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
250        os.writeByte(data.type().getNumber());
251        data.writeFramed(os);
252        Location location = journal.write(os.toByteSequence(), true);
253        journal.setLastAppendLocation(location);
254        return location;
255    }
256
257    public void rollback(TransactionId txid) throws IOException {
258        Tx tx = removeTx(txid);
259        if (tx != null) {
260            for (TransactionStore store : tx.getStores()) {
261                store.rollback(txid);
262            }
263        }
264    }
265
266    public void start() throws Exception {
267        journal = new Journal() {
268            @Override
269            protected void cleanup() {
270                super.cleanup();
271                txStoreCleanup();
272            }
273        };
274        journal.setDirectory(getDirectory());
275        journal.setMaxFileLength(journalMaxFileLength);
276        journal.setWriteBatchSize(journalWriteBatchSize);
277        IOHelper.mkdirs(journal.getDirectory());
278        journal.start();
279        recoverPendingLocalTransactions();
280        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
281    }
282
283    private void txStoreCleanup() {
284        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
285        for (Tx tx : inflightTransactions.values()) {
286            knownDataFileIds.remove(tx.getPreparedLocationId());
287        }
288        try {
289            journal.removeDataFiles(knownDataFileIds);
290        } catch (Exception e) {
291            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
292        }
293    }
294
295    private File getDirectory() {
296        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
297    }
298
299    public void stop() throws Exception {
300        journal.close();
301        journal = null;
302    }
303
304    private void recoverPendingLocalTransactions() throws IOException {
305        Location location = journal.getNextLocation(null);
306        while (location != null) {
307            process(load(location));
308            location = journal.getNextLocation(location);
309        }
310        recoveredPendingCommit.addAll(inflightTransactions.keySet());
311        LOG.info("pending local transactions: " + recoveredPendingCommit);
312    }
313
314    public JournalCommand<?> load(Location location) throws IOException {
315        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
316        byte readByte = is.readByte();
317        KahaEntryType type = KahaEntryType.valueOf(readByte);
318        if (type == null) {
319            throw new IOException("Could not load journal record. Invalid location: " + location);
320        }
321        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
322        message.mergeFramed(is);
323        return message;
324    }
325
326    public void process(JournalCommand<?> command) throws IOException {
327        switch (command.type()) {
328            case KAHA_PREPARE_COMMAND:
329                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
330                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
331                break;
332            case KAHA_COMMIT_COMMAND:
333                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
334                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
335                break;
336            case KAHA_TRACE_COMMAND:
337                break;
338            default:
339                throw new IOException("Unexpected command in transaction journal: " + command);
340        }
341    }
342
343
344    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
345
346        for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
347            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
348                @Override
349                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
350                    try {
351                        getTx(xid).trackStore(adapter.createTransactionStore());
352                    } catch (IOException e) {
353                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
354                    }
355                    listener.recover(xid, addedMessages, acks);
356                }
357            });
358        }
359
360        try {
361            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
362            // force completion of local xa
363            for (TransactionId txid : broker.getPreparedTransactions(null)) {
364                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
365                    try {
366                        if (recoveredPendingCommit.contains(txid)) {
367                            LOG.info("delivering pending commit outcome for tid: " + txid);
368                            broker.commitTransaction(null, txid, false);
369
370                        } else {
371                            LOG.info("delivering rollback outcome to store for tid: " + txid);
372                            broker.forgetTransaction(null, txid);
373                        }
374                        persistCompletion(txid);
375                    } catch (Exception ex) {
376                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
377                    }
378                }
379            }
380        } catch (Exception e) {
381            LOG.error("failed to resolve pending local transactions", e);
382        }
383    }
384
385    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
386            throws IOException {
387        if (message.getTransactionId() != null) {
388            getTx(message.getTransactionId()).trackStore(transactionStore);
389        }
390        destination.addMessage(context, message);
391    }
392
393    Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
394            throws IOException {
395        if (message.getTransactionId() != null) {
396            getTx(message.getTransactionId()).trackStore(transactionStore);
397            destination.addMessage(context, message);
398            return AbstractMessageStore.FUTURE;
399        } else {
400            return destination.asyncAddQueueMessage(context, message);
401        }
402    }
403
404    Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
405            throws IOException {
406
407        if (message.getTransactionId() != null) {
408            getTx(message.getTransactionId()).trackStore(transactionStore);
409            destination.addMessage(context, message);
410            return AbstractMessageStore.FUTURE;
411        } else {
412            return destination.asyncAddTopicMessage(context, message);
413        }
414    }
415
416    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
417            throws IOException {
418        if (ack.getTransactionId() != null) {
419            getTx(ack.getTransactionId()).trackStore(transactionStore);
420        }
421        destination.removeMessage(context, ack);
422    }
423
424    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
425            throws IOException {
426        if (ack.getTransactionId() != null) {
427            getTx(ack.getTransactionId()).trackStore(transactionStore);
428        }
429        destination.removeAsyncMessage(context, ack);
430    }
431
432    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
433                           final String clientId, final String subscriptionName,
434                           final MessageId messageId, final MessageAck ack) throws IOException {
435        if (ack.getTransactionId() != null) {
436            getTx(ack.getTransactionId()).trackStore(transactionStore);
437        }
438        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
439    }
440}