001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.command.MessageAck;
033import org.apache.activemq.command.MessageId;
034import org.apache.activemq.command.TransactionId;
035import org.apache.activemq.command.XATransactionId;
036import org.apache.activemq.openwire.OpenWireFormat;
037import org.apache.activemq.protobuf.Buffer;
038import org.apache.activemq.store.AbstractMessageStore;
039import org.apache.activemq.store.MessageStore;
040import org.apache.activemq.store.ProxyMessageStore;
041import org.apache.activemq.store.ProxyTopicMessageStore;
042import org.apache.activemq.store.TopicMessageStore;
043import org.apache.activemq.store.TransactionRecoveryListener;
044import org.apache.activemq.store.TransactionStore;
045import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
046import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
048import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052import org.apache.activemq.wireformat.WireFormat;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Provides a TransactionStore implementation that can create transaction aware
058 * MessageStore objects from non transaction aware MessageStore objects.
059 *
060 *
061 */
062public class KahaDBTransactionStore implements TransactionStore {
063    static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065    private final WireFormat wireFormat = new OpenWireFormat();
066    private final KahaDBStore theStore;
067
068    public KahaDBTransactionStore(KahaDBStore theStore) {
069        this.theStore = theStore;
070    }
071
072    public class Tx {
073        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
074
075        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
076
077        public void add(AddMessageCommand msg) {
078            messages.add(msg);
079        }
080
081        public void add(RemoveMessageCommand ack) {
082            acks.add(ack);
083        }
084
085        public Message[] getMessages() {
086            Message rc[] = new Message[messages.size()];
087            int count = 0;
088            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
089                AddMessageCommand cmd = iter.next();
090                rc[count++] = cmd.getMessage();
091            }
092            return rc;
093        }
094
095        public MessageAck[] getAcks() {
096            MessageAck rc[] = new MessageAck[acks.size()];
097            int count = 0;
098            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
099                RemoveMessageCommand cmd = iter.next();
100                rc[count++] = cmd.getMessageAck();
101            }
102            return rc;
103        }
104
105        /**
106         * @return true if something to commit
107         * @throws IOException
108         */
109        public List<Future<Object>> commit() throws IOException {
110            List<Future<Object>> results = new ArrayList<Future<Object>>();
111            // Do all the message adds.
112            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
113                AddMessageCommand cmd = iter.next();
114                results.add(cmd.run());
115
116            }
117            // And removes..
118            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
119                RemoveMessageCommand cmd = iter.next();
120                cmd.run();
121                results.add(cmd.run());
122            }
123
124            return results;
125        }
126    }
127
128    public abstract class AddMessageCommand {
129        private final ConnectionContext ctx;
130        AddMessageCommand(ConnectionContext ctx) {
131            this.ctx = ctx;
132        }
133        abstract Message getMessage();
134        Future<Object> run() throws IOException {
135            return run(this.ctx);
136        }
137        abstract Future<Object> run(ConnectionContext ctx) throws IOException;
138    }
139
140    public abstract class RemoveMessageCommand {
141
142        private final ConnectionContext ctx;
143        RemoveMessageCommand(ConnectionContext ctx) {
144            this.ctx = ctx;
145        }
146        abstract MessageAck getMessageAck();
147        Future<Object> run() throws IOException {
148            return run(this.ctx);
149        }
150        abstract Future<Object> run(ConnectionContext context) throws IOException;
151    }
152
153    public MessageStore proxy(MessageStore messageStore) {
154        return new ProxyMessageStore(messageStore) {
155            @Override
156            public void addMessage(ConnectionContext context, final Message send) throws IOException {
157                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
158            }
159
160            @Override
161            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
162                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
163            }
164
165            @Override
166            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
167                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
168            }
169
170            @Override
171            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
172                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
173            }
174
175            @Override
176            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
177                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
178            }
179
180            @Override
181            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
182                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
183            }
184        };
185    }
186
187    public TopicMessageStore proxy(TopicMessageStore messageStore) {
188        return new ProxyTopicMessageStore(messageStore) {
189            @Override
190            public void addMessage(ConnectionContext context, final Message send) throws IOException {
191                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
192            }
193
194            @Override
195            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
196                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
197            }
198
199            @Override
200            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
201                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
202            }
203
204            @Override
205            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
206                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
207            }
208
209            @Override
210            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
211                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
212            }
213
214            @Override
215            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
216                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
217            }
218
219            @Override
220            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
221                            MessageId messageId, MessageAck ack) throws IOException {
222                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
223                        subscriptionName, messageId, ack);
224            }
225
226        };
227    }
228
229    /**
230     * @throws IOException
231     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
232     */
233    public void prepare(TransactionId txid) throws IOException {
234        KahaTransactionInfo info = getTransactionInfo(txid);
235        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
236            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
237        } else {
238            Tx tx = inflightTransactions.remove(txid);
239            if (tx != null) {
240               theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
241            }
242        }
243    }
244
245    public Tx getTx(Object txid) {
246        Tx tx = inflightTransactions.get(txid);
247        if (tx == null) {
248            tx = new Tx();
249            inflightTransactions.put(txid, tx);
250        }
251        return tx;
252    }
253
254    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
255            throws IOException {
256        if (txid != null) {
257            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
258                if (preCommit != null) {
259                    preCommit.run();
260                }
261                Tx tx = inflightTransactions.remove(txid);
262                if (tx != null) {
263                    List<Future<Object>> results = tx.commit();
264                    boolean doneSomething = false;
265                    for (Future<Object> result : results) {
266                        try {
267                            result.get();
268                        } catch (InterruptedException e) {
269                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
270                        } catch (ExecutionException e) {
271                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
272                        }catch(CancellationException e) {
273                        }
274                        if (!result.isCancelled()) {
275                            doneSomething = true;
276                        }
277                    }
278                    if (postCommit != null) {
279                        postCommit.run();
280                    }
281                    if (doneSomething) {
282                        KahaTransactionInfo info = getTransactionInfo(txid);
283                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
284                    }
285                }else {
286                    //The Tx will be null for failed over clients - lets run their post commits
287                    if (postCommit != null) {
288                        postCommit.run();
289                    }
290                }
291
292            } else {
293                KahaTransactionInfo info = getTransactionInfo(txid);
294                theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
295                forgetRecoveredAcks(txid);
296            }
297        }else {
298           LOG.error("Null transaction passed on commit");
299        }
300    }
301
302    /**
303     * @throws IOException
304     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
305     */
306    public void rollback(TransactionId txid) throws IOException {
307        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
308            KahaTransactionInfo info = getTransactionInfo(txid);
309            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
310            forgetRecoveredAcks(txid);
311        } else {
312            inflightTransactions.remove(txid);
313        }
314    }
315
316    protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
317        if (txid.isXATransaction()) {
318            XATransactionId xaTid = ((XATransactionId) txid);
319            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
320        }
321    }
322
323    public void start() throws Exception {
324    }
325
326    public void stop() throws Exception {
327    }
328
329    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
330        for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
331            XATransactionId xid = (XATransactionId) entry.getKey();
332            ArrayList<Message> messageList = new ArrayList<Message>();
333            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
334
335            for (Operation op : entry.getValue()) {
336                if (op.getClass() == AddOpperation.class) {
337                    AddOpperation addOp = (AddOpperation) op;
338                    Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
339                            .newInput()));
340                    messageList.add(msg);
341                } else {
342                    RemoveOpperation rmOp = (RemoveOpperation) op;
343                    Buffer ackb = rmOp.getCommand().getAck();
344                    MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
345                    ackList.add(ack);
346                }
347            }
348
349            Message[] addedMessages = new Message[messageList.size()];
350            MessageAck[] acks = new MessageAck[ackList.size()];
351            messageList.toArray(addedMessages);
352            ackList.toArray(acks);
353            xid.setPreparedAcks(ackList);
354            theStore.trackRecoveredAcks(ackList);
355            listener.recover(xid, addedMessages, acks);
356        }
357    }
358
359    /**
360     * @param message
361     * @throws IOException
362     */
363    void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
364            throws IOException {
365
366        if (message.getTransactionId() != null) {
367            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
368                destination.addMessage(context, message);
369            } else {
370                Tx tx = getTx(message.getTransactionId());
371                tx.add(new AddMessageCommand(context) {
372                    @Override
373                    public Message getMessage() {
374                        return message;
375                    }
376                    @Override
377                    public Future<Object> run(ConnectionContext ctx) throws IOException {
378                        destination.addMessage(ctx, message);
379                        return AbstractMessageStore.FUTURE;
380                    }
381
382                });
383            }
384        } else {
385            destination.addMessage(context, message);
386        }
387    }
388
389    Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
390            throws IOException {
391
392        if (message.getTransactionId() != null) {
393            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
394                destination.addMessage(context, message);
395                return AbstractMessageStore.FUTURE;
396            } else {
397                Tx tx = getTx(message.getTransactionId());
398                tx.add(new AddMessageCommand(context) {
399                    @Override
400                    public Message getMessage() {
401                        return message;
402                    }
403                    @Override
404                    public Future<Object> run(ConnectionContext ctx) throws IOException {
405                        return destination.asyncAddQueueMessage(ctx, message);
406                    }
407
408                });
409                return AbstractMessageStore.FUTURE;
410            }
411        } else {
412            return destination.asyncAddQueueMessage(context, message);
413        }
414    }
415
416    Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
417            throws IOException {
418
419        if (message.getTransactionId() != null) {
420            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
421                destination.addMessage(context, message);
422                return AbstractMessageStore.FUTURE;
423            } else {
424                Tx tx = getTx(message.getTransactionId());
425                tx.add(new AddMessageCommand(context) {
426                    @Override
427                    public Message getMessage() {
428                        return message;
429                    }
430                    @Override
431                    public Future<Object> run(ConnectionContext ctx) throws IOException {
432                        return destination.asyncAddTopicMessage(ctx, message);
433                    }
434
435                });
436                return AbstractMessageStore.FUTURE;
437            }
438        } else {
439            return destination.asyncAddTopicMessage(context, message);
440        }
441    }
442
443    /**
444     * @param ack
445     * @throws IOException
446     */
447    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
448            throws IOException {
449
450        if (ack.isInTransaction()) {
451            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
452                destination.removeMessage(context, ack);
453            } else {
454                Tx tx = getTx(ack.getTransactionId());
455                tx.add(new RemoveMessageCommand(context) {
456                    @Override
457                    public MessageAck getMessageAck() {
458                        return ack;
459                    }
460
461                    @Override
462                    public Future<Object> run(ConnectionContext ctx) throws IOException {
463                        destination.removeMessage(ctx, ack);
464                        return AbstractMessageStore.FUTURE;
465                    }
466                });
467            }
468        } else {
469            destination.removeMessage(context, ack);
470        }
471    }
472
473    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
474            throws IOException {
475
476        if (ack.isInTransaction()) {
477            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
478                destination.removeAsyncMessage(context, ack);
479            } else {
480                Tx tx = getTx(ack.getTransactionId());
481                tx.add(new RemoveMessageCommand(context) {
482                    @Override
483                    public MessageAck getMessageAck() {
484                        return ack;
485                    }
486
487                    @Override
488                    public Future<Object> run(ConnectionContext ctx) throws IOException {
489                        destination.removeMessage(ctx, ack);
490                        return AbstractMessageStore.FUTURE;
491                    }
492                });
493            }
494        } else {
495            destination.removeAsyncMessage(context, ack);
496        }
497    }
498
499    final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
500                           final MessageId messageId, final MessageAck ack) throws IOException {
501
502        if (ack.isInTransaction()) {
503            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
504                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
505            } else {
506                Tx tx = getTx(ack.getTransactionId());
507                tx.add(new RemoveMessageCommand(context) {
508                    public MessageAck getMessageAck() {
509                        return ack;
510                    }
511
512                    public Future<Object> run(ConnectionContext ctx) throws IOException {
513                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
514                        return AbstractMessageStore.FUTURE;
515                    }
516                });
517            }
518        } else {
519            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
520        }
521    }
522
523
524    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
525        return theStore.getTransactionIdTransformer().transform(txid);
526    }
527}