001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026
027import javax.jms.JMSException;
028import javax.transaction.xa.XAException;
029
030import org.apache.activemq.ActiveMQMessageAudit;
031import org.apache.activemq.broker.jmx.ManagedRegionBroker;
032import org.apache.activemq.broker.region.Destination;
033import org.apache.activemq.broker.region.Queue;
034import org.apache.activemq.command.ActiveMQDestination;
035import org.apache.activemq.command.BaseCommand;
036import org.apache.activemq.command.ConnectionInfo;
037import org.apache.activemq.command.LocalTransactionId;
038import org.apache.activemq.command.Message;
039import org.apache.activemq.command.MessageAck;
040import org.apache.activemq.command.ProducerInfo;
041import org.apache.activemq.command.TransactionId;
042import org.apache.activemq.command.XATransactionId;
043import org.apache.activemq.state.ProducerState;
044import org.apache.activemq.store.TransactionRecoveryListener;
045import org.apache.activemq.store.TransactionStore;
046import org.apache.activemq.transaction.LocalTransaction;
047import org.apache.activemq.transaction.Synchronization;
048import org.apache.activemq.transaction.Transaction;
049import org.apache.activemq.transaction.XATransaction;
050import org.apache.activemq.util.IOExceptionSupport;
051import org.apache.activemq.util.WrappedException;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * This broker filter handles the transaction related operations in the Broker
057 * interface.
058 * 
059 * 
060 */
061public class TransactionBroker extends BrokerFilter {
062
063    private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
064
065    // The prepared XA transactions.
066    private TransactionStore transactionStore;
067    private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
068    private ActiveMQMessageAudit audit;
069
070    public TransactionBroker(Broker next, TransactionStore transactionStore) {
071        super(next);
072        this.transactionStore = transactionStore;
073    }
074
075    // ////////////////////////////////////////////////////////////////////////////
076    //
077    // Life cycle Methods
078    //
079    // ////////////////////////////////////////////////////////////////////////////
080
081    /**
082     * Recovers any prepared transactions.
083     */
084    public void start() throws Exception {
085        transactionStore.start();
086        try {
087            final ConnectionContext context = new ConnectionContext();
088            context.setBroker(this);
089            context.setInRecoveryMode(true);
090            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
091            context.setProducerFlowControl(false);
092            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
093            producerExchange.setMutable(true);
094            producerExchange.setConnectionContext(context);
095            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
096            final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
097            consumerExchange.setConnectionContext(context);
098            transactionStore.recover(new TransactionRecoveryListener() {
099                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
100                    try {
101                        beginTransaction(context, xid);
102                        XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
103                        for (int i = 0; i < addedMessages.length; i++) {
104                            forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
105                        }
106                        for (int i = 0; i < aks.length; i++) {
107                            forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
108                        }
109                        transaction.setState(Transaction.PREPARED_STATE);
110                        registerMBean(transaction);
111                        if (LOG.isDebugEnabled()) {
112                            LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
113                        }
114                    } catch (Throwable e) {
115                        throw new WrappedException(e);
116                    }
117                }
118            });
119        } catch (WrappedException e) {
120            Throwable cause = e.getCause();
121            throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
122        }
123        next.start();
124    }
125
126    private void registerMBean(XATransaction transaction) {
127        if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
128            ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
129            managedRegionBroker.registerRecoveredTransactionMBean(transaction);
130        }
131    }
132
133    private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
134                                                    ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
135        Destination destination =  addDestination(context, amqDestination, false);
136        registerSync(destination, transaction, ack);
137    }
138
139    private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
140        if (destination instanceof Queue) {
141            Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
142            // ensure one per destination in the list
143            transaction.removeSynchronization(sync);
144            transaction.addSynchronization(sync);
145        }
146    }
147
148    static class PreparedDestinationCompletion extends Synchronization {
149        final Queue queue;
150        final boolean messageSend;
151        public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
152            this.queue = queue;
153            // rollback relevant to acks, commit to sends
154            this.messageSend = messageSend;
155        }
156
157        @Override
158        public int hashCode() {
159            return System.identityHashCode(queue) +
160                    System.identityHashCode(Boolean.valueOf(messageSend));
161        }
162
163        @Override
164        public boolean equals(Object other) {
165            return other instanceof PreparedDestinationCompletion &&
166                    queue.equals(((PreparedDestinationCompletion) other).queue) &&
167                    messageSend == ((PreparedDestinationCompletion) other).messageSend;
168        }
169
170        @Override
171        public void afterRollback() throws Exception {
172            if (!messageSend) {
173                queue.clearPendingMessages();
174                if (LOG.isDebugEnabled()) {
175                    LOG.debug("cleared pending from afterRollback : " + queue);
176                }
177            }
178        }
179
180        @Override
181        public void afterCommit() throws Exception {
182            if (messageSend) {
183                queue.clearPendingMessages();
184                if (LOG.isDebugEnabled()) {
185                    LOG.debug("cleared pending from afterCommit : " + queue);
186                }
187            }
188        }
189    }
190
191    public void stop() throws Exception {
192        transactionStore.stop();
193        next.stop();
194    }
195
196    // ////////////////////////////////////////////////////////////////////////////
197    //
198    // BrokerFilter overrides
199    //
200    // ////////////////////////////////////////////////////////////////////////////
201    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
202        List<TransactionId> txs = new ArrayList<TransactionId>();
203        synchronized (xaTransactions) {
204            for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
205                Transaction tx = iter.next();
206                if (tx.isPrepared()) {
207                    if (LOG.isDebugEnabled()) {
208                        LOG.debug("prepared transaction: " + tx.getTransactionId());
209                    }
210                    txs.add(tx.getTransactionId());
211                }
212            }
213        }
214        XATransactionId rc[] = new XATransactionId[txs.size()];
215        txs.toArray(rc);
216        if (LOG.isDebugEnabled()) {
217            LOG.debug("prepared transaction list size: " + rc.length);
218        }
219        return rc;
220    }
221
222    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
223        // the transaction may have already been started.
224        if (xid.isXATransaction()) {
225            XATransaction transaction = null;
226            synchronized (xaTransactions) {
227                transaction = xaTransactions.get(xid);
228                if (transaction != null) {
229                    return;
230                }
231                transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
232                xaTransactions.put(xid, transaction);
233            }
234        } else {
235            Map<TransactionId, Transaction> transactionMap = context.getTransactions();
236            Transaction transaction = transactionMap.get(xid);
237            if (transaction != null) {
238                throw new JMSException("Transaction '" + xid + "' has already been started.");
239            }
240            transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
241            transactionMap.put(xid, transaction);
242        }
243    }
244
245    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
246        Transaction transaction = getTransaction(context, xid, false);
247        return transaction.prepare();
248    }
249
250    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
251        Transaction transaction = getTransaction(context, xid, true);
252        transaction.commit(onePhase);
253    }
254
255    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
256        Transaction transaction = getTransaction(context, xid, true);
257        transaction.rollback();
258    }
259
260    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
261        Transaction transaction = getTransaction(context, xid, true);
262        transaction.rollback();
263    }
264
265    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
266        // This method may be invoked recursively.
267        // Track original tx so that it can be restored.
268        final ConnectionContext context = consumerExchange.getConnectionContext();
269        Transaction originalTx = context.getTransaction();
270        Transaction transaction = null;
271        if (ack.isInTransaction()) {
272            transaction = getTransaction(context, ack.getTransactionId(), false);
273        }
274        context.setTransaction(transaction);
275        try {
276            next.acknowledge(consumerExchange, ack);
277        } finally {
278            context.setTransaction(originalTx);
279        }
280    }
281
282    public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
283        // This method may be invoked recursively.
284        // Track original tx so that it can be restored.
285        final ConnectionContext context = producerExchange.getConnectionContext();
286        Transaction originalTx = context.getTransaction();
287        Transaction transaction = null;
288        Synchronization sync = null;
289        if (message.getTransactionId() != null) {
290            transaction = getTransaction(context, message.getTransactionId(), false);
291            if (transaction != null) {
292                sync = new Synchronization() {
293
294                    public void afterRollback() {
295                        if (audit != null) {
296                            audit.rollback(message);
297                        }
298                    }
299                };
300                transaction.addSynchronization(sync);
301            }
302        }
303        if (audit == null || !audit.isDuplicate(message)) {
304            context.setTransaction(transaction);
305            try {
306                next.send(producerExchange, message);
307            } finally {
308                context.setTransaction(originalTx);
309            }
310        } else {
311            if (sync != null && transaction != null) {
312                transaction.removeSynchronization(sync);
313            }
314            if (LOG.isDebugEnabled()) {
315                LOG.debug("IGNORING duplicate message " + message);
316            }
317        }
318    }
319
320    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
321        for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
322            try {
323                Transaction transaction = iter.next();
324                transaction.rollback();
325            } catch (Exception e) {
326                LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
327            }
328            iter.remove();
329        }
330
331        synchronized (xaTransactions) {
332            // first find all txs that belongs to the connection
333            ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
334            for (XATransaction tx : xaTransactions.values()) {
335                if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
336                    txs.add(tx);
337                }
338            }
339
340            // then remove them
341            // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
342            for (XATransaction tx : txs) {
343                try {
344                    tx.rollback();
345                } catch (Exception e) {
346                    LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
347                }
348            }
349
350        }
351        next.removeConnection(context, info, error);
352    }
353
354    // ////////////////////////////////////////////////////////////////////////////
355    //
356    // Implementation help methods.
357    //
358    // ////////////////////////////////////////////////////////////////////////////
359    public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
360        Map transactionMap = null;
361        synchronized (xaTransactions) {
362            transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
363        }
364        Transaction transaction = (Transaction)transactionMap.get(xid);
365        if (transaction != null) {
366            return transaction;
367        }
368        if (xid.isXATransaction()) {
369            XAException e = new XAException("Transaction '" + xid + "' has not been started.");
370            e.errorCode = XAException.XAER_NOTA;
371            throw e;
372        } else {
373            throw new JMSException("Transaction '" + xid + "' has not been started.");
374        }
375    }
376
377    public void removeTransaction(XATransactionId xid) {
378        synchronized (xaTransactions) {
379            xaTransactions.remove(xid);
380        }
381    }
382
383    public synchronized void brokerServiceStarted() {
384        super.brokerServiceStarted();
385        if (getBrokerService().isSupportFailOver() && audit == null) {
386            audit = new ActiveMQMessageAudit();
387        }
388    }
389
390}