001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store.journal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.LinkedHashMap;
024import java.util.Map;
025import javax.transaction.xa.XAException;
026import org.apache.activeio.journal.RecordLocation;
027import org.apache.activemq.command.JournalTopicAck;
028import org.apache.activemq.command.JournalTransaction;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.TransactionId;
032import org.apache.activemq.command.XATransactionId;
033import org.apache.activemq.store.TransactionRecoveryListener;
034import org.apache.activemq.store.TransactionStore;
035
036/**
037 */
038public class JournalTransactionStore implements TransactionStore {
039
040    private final JournalPersistenceAdapter peristenceAdapter;
041    private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
042    private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
043    private boolean doingRecover;
044
045    public static class TxOperation {
046
047        static final byte ADD_OPERATION_TYPE = 0;
048        static final byte REMOVE_OPERATION_TYPE = 1;
049        static final byte ACK_OPERATION_TYPE = 3;
050
051        public byte operationType;
052        public JournalMessageStore store;
053        public Object data;
054
055        public TxOperation(byte operationType, JournalMessageStore store, Object data) {
056            this.operationType = operationType;
057            this.store = store;
058            this.data = data;
059        }
060
061    }
062
063    /**
064     * Operations
065     * 
066     * 
067     */
068    public static class Tx {
069
070        private final RecordLocation location;
071        private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
072
073        public Tx(RecordLocation location) {
074            this.location = location;
075        }
076
077        public void add(JournalMessageStore store, Message msg) {
078            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
079        }
080
081        public void add(JournalMessageStore store, MessageAck ack) {
082            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
083        }
084
085        public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
086            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
087        }
088
089        public Message[] getMessages() {
090            ArrayList<Object> list = new ArrayList<Object>();
091            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
092                TxOperation op = iter.next();
093                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
094                    list.add(op.data);
095                }
096            }
097            Message rc[] = new Message[list.size()];
098            list.toArray(rc);
099            return rc;
100        }
101
102        public MessageAck[] getAcks() {
103            ArrayList<Object> list = new ArrayList<Object>();
104            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
105                TxOperation op = iter.next();
106                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
107                    list.add(op.data);
108                }
109            }
110            MessageAck rc[] = new MessageAck[list.size()];
111            list.toArray(rc);
112            return rc;
113        }
114
115        public ArrayList<TxOperation> getOperations() {
116            return operations;
117        }
118
119    }
120
121    public JournalTransactionStore(JournalPersistenceAdapter adapter) {
122        this.peristenceAdapter = adapter;
123    }
124
125    /**
126     * @throws IOException
127     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
128     */
129    public void prepare(TransactionId txid) throws IOException {
130        Tx tx = null;
131        synchronized (inflightTransactions) {
132            tx = inflightTransactions.remove(txid);
133        }
134        if (tx == null) {
135            return;
136        }
137        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
138                                       true);
139        synchronized (preparedTransactions) {
140            preparedTransactions.put(txid, tx);
141        }
142    }
143
144    /**
145     * @throws IOException
146     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
147     */
148    public void replayPrepare(TransactionId txid) throws IOException {
149        Tx tx = null;
150        synchronized (inflightTransactions) {
151            tx = inflightTransactions.remove(txid);
152        }
153        if (tx == null) {
154            return;
155        }
156        synchronized (preparedTransactions) {
157            preparedTransactions.put(txid, tx);
158        }
159    }
160
161    public Tx getTx(Object txid, RecordLocation location) {
162        Tx tx = null;
163        synchronized (inflightTransactions) {
164            tx = inflightTransactions.get(txid);
165        }
166        if (tx == null) {
167            tx = new Tx(location);
168            inflightTransactions.put(txid, tx);
169        }
170        return tx;
171    }
172
173    /**
174     * @throws XAException
175     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
176     */
177    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
178        Tx tx;
179        if (preCommit != null) {
180            preCommit.run();
181        }
182        if (wasPrepared) {
183            synchronized (preparedTransactions) {
184                tx = preparedTransactions.remove(txid);
185            }
186        } else {
187            synchronized (inflightTransactions) {
188                tx = inflightTransactions.remove(txid);
189            }
190        }
191        if (tx == null) {
192            if (postCommit != null) {
193                postCommit.run();
194            }
195            return;
196        }
197        if (txid.isXATransaction()) {
198            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
199                                                                  wasPrepared), true);
200        } else {
201            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
202                                                                  wasPrepared), true);
203        }
204        if (postCommit != null) {
205            postCommit.run();
206        }
207    }
208
209    /**
210     * @throws XAException
211     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
212     */
213    public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
214        if (wasPrepared) {
215            synchronized (preparedTransactions) {
216                return preparedTransactions.remove(txid);
217            }
218        } else {
219            synchronized (inflightTransactions) {
220                return inflightTransactions.remove(txid);
221            }
222        }
223    }
224
225    /**
226     * @throws IOException
227     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
228     */
229    public void rollback(TransactionId txid) throws IOException {
230        Tx tx = null;
231        synchronized (inflightTransactions) {
232            tx = inflightTransactions.remove(txid);
233        }
234        if (tx != null) {
235            synchronized (preparedTransactions) {
236                tx = preparedTransactions.remove(txid);
237            }
238        }
239        if (tx != null) {
240            if (txid.isXATransaction()) {
241                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
242                                                                      false), true);
243            } else {
244                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
245                                                                      txid, false), true);
246            }
247        }
248    }
249
250    /**
251     * @throws IOException
252     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
253     */
254    public void replayRollback(TransactionId txid) throws IOException {
255        boolean inflight = false;
256        synchronized (inflightTransactions) {
257            inflight = inflightTransactions.remove(txid) != null;
258        }
259        if (inflight) {
260            synchronized (preparedTransactions) {
261                preparedTransactions.remove(txid);
262            }
263        }
264    }
265
266    public void start() throws Exception {
267    }
268
269    public void stop() throws Exception {
270    }
271
272    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
273        // All the in-flight transactions get rolled back..
274        synchronized (inflightTransactions) {
275            inflightTransactions.clear();
276        }
277        this.doingRecover = true;
278        try {
279            Map<TransactionId, Tx> txs = null;
280            synchronized (preparedTransactions) {
281                txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
282            }
283            for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
284                Object txid = iter.next();
285                Tx tx = txs.get(txid);
286                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
287            }
288        } finally {
289            this.doingRecover = false;
290        }
291    }
292
293    /**
294     * @param message
295     * @throws IOException
296     */
297    void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException {
298        Tx tx = getTx(message.getTransactionId(), location);
299        tx.add(store, message);
300    }
301
302    /**
303     * @param ack
304     * @throws IOException
305     */
306    public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
307        throws IOException {
308        Tx tx = getTx(ack.getTransactionId(), location);
309        tx.add(store, ack);
310    }
311
312    public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
313        Tx tx = getTx(ack.getTransactionId(), location);
314        tx.add(store, ack);
315    }
316
317    public RecordLocation checkpoint() throws IOException {
318        // Nothing really to checkpoint.. since, we don't
319        // checkpoint tx operations in to long term store until they are
320        // committed.
321        // But we keep track of the first location of an operation
322        // that was associated with an active tx. The journal can not
323        // roll over active tx records.
324        RecordLocation rc = null;
325        synchronized (inflightTransactions) {
326            for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
327                Tx tx = iter.next();
328                RecordLocation location = tx.location;
329                if (rc == null || rc.compareTo(location) < 0) {
330                    rc = location;
331                }
332            }
333        }
334        synchronized (preparedTransactions) {
335            for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
336                Tx tx = iter.next();
337                RecordLocation location = tx.location;
338                if (rc == null || rc.compareTo(location) < 0) {
339                    rc = location;
340                }
341            }
342            return rc;
343        }
344    }
345
346    public boolean isDoingRecover() {
347        return doingRecover;
348    }
349
350}