001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.InterruptedIOException;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024
025import javax.jms.JMSException;
026import javax.jms.TransactionInProgressException;
027import javax.jms.TransactionRolledBackException;
028import javax.transaction.xa.XAException;
029import javax.transaction.xa.XAResource;
030import javax.transaction.xa.Xid;
031
032import org.apache.activemq.command.Command;
033import org.apache.activemq.command.ConnectionId;
034import org.apache.activemq.command.DataArrayResponse;
035import org.apache.activemq.command.DataStructure;
036import org.apache.activemq.command.IntegerResponse;
037import org.apache.activemq.command.LocalTransactionId;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.command.TransactionId;
040import org.apache.activemq.command.TransactionInfo;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.transaction.Synchronization;
043import org.apache.activemq.util.JMSExceptionSupport;
044import org.apache.activemq.util.LongSequenceGenerator;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * A TransactionContext provides the means to control a JMS transaction. It
050 * provides a local transaction interface and also an XAResource interface. <p/>
051 * An application server controls the transactional assignment of an XASession
052 * by obtaining its XAResource. It uses the XAResource to assign the session to
053 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
054 * XAResource provides some fairly sophisticated facilities for interleaving
055 * work on multiple transactions, recovering a list of transactions in progress,
056 * and so on. A JTA aware JMS provider must fully implement this functionality.
057 * This could be done by using the services of a database that supports XA, or a
058 * JMS provider may choose to implement this functionality from scratch. <p/>
059 *
060 *
061 * @see javax.jms.Session
062 * @see javax.jms.QueueSession
063 * @see javax.jms.TopicSession
064 * @see javax.jms.XASession
065 */
066public class TransactionContext implements XAResource {
067
068    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
069
070    // XATransactionId -> ArrayList of TransactionContext objects
071    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
072                new HashMap<TransactionId, List<TransactionContext>>();
073
074    private final ActiveMQConnection connection;
075    private final LongSequenceGenerator localTransactionIdGenerator;
076    private final ConnectionId connectionId;
077    private List<Synchronization> synchronizations;
078
079    // To track XA transactions.
080    private Xid associatedXid;
081    private TransactionId transactionId;
082    private LocalTransactionEventListener localTransactionEventListener;
083    private int beforeEndIndex;
084
085    public TransactionContext(ActiveMQConnection connection) {
086        this.connection = connection;
087        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
088        this.connectionId = connection.getConnectionInfo().getConnectionId();
089    }
090
091    public boolean isInXATransaction() {
092        if (transactionId != null && transactionId.isXATransaction()) {
093                return true;
094        } else {
095                if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
096                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
097                                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
098                                        if (transactions.contains(this)) {
099                                                return true;
100                                        }
101                                }
102                        }
103                }
104        }
105
106        return false;
107    }
108
109    public boolean isInLocalTransaction() {
110        return transactionId != null && transactionId.isLocalTransaction();
111    }
112
113    public boolean isInTransaction() {
114        return transactionId != null;
115    }
116
117    /**
118     * @return Returns the localTransactionEventListener.
119     */
120    public LocalTransactionEventListener getLocalTransactionEventListener() {
121        return localTransactionEventListener;
122    }
123
124    /**
125     * Used by the resource adapter to listen to transaction events.
126     *
127     * @param localTransactionEventListener The localTransactionEventListener to
128     *                set.
129     */
130    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
131        this.localTransactionEventListener = localTransactionEventListener;
132    }
133
134    // ///////////////////////////////////////////////////////////
135    //
136    // Methods that work with the Synchronization objects registered with
137    // the transaction.
138    //
139    // ///////////////////////////////////////////////////////////
140
141    public void addSynchronization(Synchronization s) {
142        if (synchronizations == null) {
143            synchronizations = new ArrayList<Synchronization>(10);
144        }
145        synchronizations.add(s);
146    }
147
148    private void afterRollback() throws JMSException {
149        if (synchronizations == null) {
150            return;
151        }
152
153        Throwable firstException = null;
154        int size = synchronizations.size();
155        for (int i = 0; i < size; i++) {
156            try {
157                synchronizations.get(i).afterRollback();
158            } catch (Throwable t) {
159                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
160                if (firstException == null) {
161                    firstException = t;
162                }
163            }
164        }
165        synchronizations = null;
166        if (firstException != null) {
167            throw JMSExceptionSupport.create(firstException);
168        }
169    }
170
171    private void afterCommit() throws JMSException {
172        if (synchronizations == null) {
173            return;
174        }
175
176        Throwable firstException = null;
177        int size = synchronizations.size();
178        for (int i = 0; i < size; i++) {
179            try {
180                synchronizations.get(i).afterCommit();
181            } catch (Throwable t) {
182                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
183                if (firstException == null) {
184                    firstException = t;
185                }
186            }
187        }
188        synchronizations = null;
189        if (firstException != null) {
190            throw JMSExceptionSupport.create(firstException);
191        }
192    }
193
194    private void beforeEnd() throws JMSException {
195        if (synchronizations == null) {
196            return;
197        }
198
199        int size = synchronizations.size();
200        try {
201            for (;beforeEndIndex < size;) {
202                synchronizations.get(beforeEndIndex++).beforeEnd();
203            }
204        } catch (JMSException e) {
205            throw e;
206        } catch (Throwable e) {
207            throw JMSExceptionSupport.create(e);
208        }
209    }
210
211    public TransactionId getTransactionId() {
212        return transactionId;
213    }
214
215    // ///////////////////////////////////////////////////////////
216    //
217    // Local transaction interface.
218    //
219    // ///////////////////////////////////////////////////////////
220
221    /**
222     * Start a local transaction.
223     * @throws javax.jms.JMSException on internal error
224     */
225    public void begin() throws JMSException {
226
227        if (isInXATransaction()) {
228            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
229        }
230
231        if (transactionId == null) {
232            synchronizations = null;
233            beforeEndIndex = 0;
234            this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
235            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
236            this.connection.ensureConnectionInfoSent();
237            this.connection.asyncSendPacket(info);
238
239            // Notify the listener that the tx was started.
240            if (localTransactionEventListener != null) {
241                localTransactionEventListener.beginEvent();
242            }
243            if (LOG.isDebugEnabled()) {
244                LOG.debug("Begin:" + transactionId);
245            }
246        }
247
248    }
249
250    /**
251     * Rolls back any work done in this transaction and releases any locks
252     * currently held.
253     *
254     * @throws JMSException if the JMS provider fails to roll back the
255     *                 transaction due to some internal error.
256     * @throws javax.jms.IllegalStateException if the method is not called by a
257     *                 transacted session.
258     */
259    public void rollback() throws JMSException {
260        if (isInXATransaction()) {
261            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
262        }
263
264        try {
265            beforeEnd();
266        } catch (TransactionRolledBackException canOcurrOnFailover) {
267            LOG.warn("rollback processing error", canOcurrOnFailover);
268        }
269        if (transactionId != null) {
270            if (LOG.isDebugEnabled()) {
271                LOG.debug("Rollback: "  + transactionId
272                + " syncCount: "
273                + (synchronizations != null ? synchronizations.size() : 0));
274            }
275
276            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
277            this.transactionId = null;
278            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
279            this.connection.syncSendPacket(info);
280            // Notify the listener that the tx was rolled back
281            if (localTransactionEventListener != null) {
282                localTransactionEventListener.rollbackEvent();
283            }
284        }
285
286        afterRollback();
287    }
288
289    /**
290     * Commits all work done in this transaction and releases any locks
291     * currently held.
292     *
293     * @throws JMSException if the JMS provider fails to commit the transaction
294     *                 due to some internal error.
295     * @throws javax.jms.IllegalStateException if the method is not called by a
296     *                 transacted session.
297     */
298    public void commit() throws JMSException {
299        if (isInXATransaction()) {
300            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
301        }
302
303        try {
304            beforeEnd();
305        } catch (JMSException e) {
306            rollback();
307            throw e;
308        }
309
310        // Only send commit if the transaction was started.
311        if (transactionId != null) {
312            if (LOG.isDebugEnabled()) {
313                LOG.debug("Commit: "  + transactionId
314                        + " syncCount: "
315                        + (synchronizations != null ? synchronizations.size() : 0));
316            }
317
318            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
319            this.transactionId = null;
320            // Notify the listener that the tx was committed back
321            try {
322                syncSendPacketWithInterruptionHandling(info);
323                if (localTransactionEventListener != null) {
324                    localTransactionEventListener.commitEvent();
325                }
326                afterCommit();
327            } catch (JMSException cause) {
328                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
329                if (localTransactionEventListener != null) {
330                    localTransactionEventListener.rollbackEvent();
331                }
332                afterRollback();
333                throw cause;
334            }
335
336        }
337    }
338
339    // ///////////////////////////////////////////////////////////
340    //
341    // XAResource Implementation
342    //
343    // ///////////////////////////////////////////////////////////
344    /**
345     * Associates a transaction with the resource.
346     */
347    public void start(Xid xid, int flags) throws XAException {
348
349        if (LOG.isDebugEnabled()) {
350            LOG.debug("Start: " + xid);
351        }
352        if (isInLocalTransaction()) {
353            throw new XAException(XAException.XAER_PROTO);
354        }
355        // Are we already associated?
356        if (associatedXid != null) {
357            throw new XAException(XAException.XAER_PROTO);
358        }
359
360        // if ((flags & TMJOIN) == TMJOIN) {
361        // TODO: verify that the server has seen the xid
362        // // }
363        // if ((flags & TMJOIN) == TMRESUME) {
364        // // TODO: verify that the xid was suspended.
365        // }
366
367        // associate
368        synchronizations = null;
369        beforeEndIndex = 0;
370        setXid(xid);
371    }
372
373    /**
374     * @return connectionId for connection
375     */
376    private ConnectionId getConnectionId() {
377        return connection.getConnectionInfo().getConnectionId();
378    }
379
380    public void end(Xid xid, int flags) throws XAException {
381
382        if (LOG.isDebugEnabled()) {
383            LOG.debug("End: " + xid);
384        }
385
386        if (isInLocalTransaction()) {
387            throw new XAException(XAException.XAER_PROTO);
388        }
389
390        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
391            // You can only suspend the associated xid.
392            if (!equals(associatedXid, xid)) {
393                throw new XAException(XAException.XAER_PROTO);
394            }
395
396            // TODO: we may want to put the xid in a suspended list.
397            try {
398                beforeEnd();
399            } catch (JMSException e) {
400                throw toXAException(e);
401            }
402            setXid(null);
403        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
404            // set to null if this is the current xid.
405            // otherwise this could be an asynchronous success call
406            if (equals(associatedXid, xid)) {
407                try {
408                    beforeEnd();
409                } catch (JMSException e) {
410                    throw toXAException(e);
411                }
412                setXid(null);
413            }
414        } else {
415            throw new XAException(XAException.XAER_INVAL);
416        }
417    }
418
419    private boolean equals(Xid xid1, Xid xid2) {
420        if (xid1 == xid2) {
421            return true;
422        }
423        if (xid1 == null ^ xid2 == null) {
424            return false;
425        }
426        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
427               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
428    }
429
430    public int prepare(Xid xid) throws XAException {
431        if (LOG.isDebugEnabled()) {
432            LOG.debug("Prepare: " + xid);
433        }
434
435        // We allow interleaving multiple transactions, so
436        // we don't limit prepare to the associated xid.
437        XATransactionId x;
438        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
439        // called first
440        if (xid == null || (equals(associatedXid, xid))) {
441            throw new XAException(XAException.XAER_PROTO);
442        } else {
443            // TODO: cache the known xids so we don't keep recreating this one??
444            x = new XATransactionId(xid);
445        }
446
447        try {
448            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
449
450            // Find out if the server wants to commit or rollback.
451            IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
452            if (XAResource.XA_RDONLY == response.getResult()) {
453                // transaction stops now, may be syncs that need a callback
454                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
455                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
456                        if (l != null && !l.isEmpty()) {
457                            if (LOG.isDebugEnabled()) {
458                                LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
459                            }
460                            for (TransactionContext ctx : l) {
461                                ctx.afterCommit();
462                            }
463                        }
464                        }
465            }
466            return response.getResult();
467
468        } catch (JMSException e) {
469            LOG.warn("prepare of: " + x + " failed with: " + e, e);
470                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
471                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
472                    if (l != null && !l.isEmpty()) {
473                        for (TransactionContext ctx : l) {
474                            try {
475                                ctx.afterRollback();
476                            } catch (Throwable ignored) {
477                                if (LOG.isDebugEnabled()) {
478                                    LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " +
479                                                  x + ", context: " + ctx, ignored);
480                                }
481                            }
482                        }
483                    }
484                }
485            throw toXAException(e);
486        }
487    }
488
489    public void rollback(Xid xid) throws XAException {
490
491        if (LOG.isDebugEnabled()) {
492            LOG.debug("Rollback: " + xid);
493        }
494
495        // We allow interleaving multiple transactions, so
496        // we don't limit rollback to the associated xid.
497        XATransactionId x;
498        if (xid == null) {
499            throw new XAException(XAException.XAER_PROTO);
500        }
501        if (equals(associatedXid, xid)) {
502            // I think this can happen even without an end(xid) call. Need to
503            // check spec.
504            x = (XATransactionId)transactionId;
505        } else {
506            x = new XATransactionId(xid);
507        }
508
509        try {
510            this.connection.checkClosedOrFailed();
511            this.connection.ensureConnectionInfoSent();
512
513            // Let the server know that the tx is rollback.
514            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
515            syncSendPacketWithInterruptionHandling(info);
516
517                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
518                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
519                    if (l != null && !l.isEmpty()) {
520                        for (TransactionContext ctx : l) {
521                            ctx.afterRollback();
522                        }
523                    }
524                }
525        } catch (JMSException e) {
526            throw toXAException(e);
527        }
528    }
529
530    // XAResource interface
531    public void commit(Xid xid, boolean onePhase) throws XAException {
532
533        if (LOG.isDebugEnabled()) {
534            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
535        }
536
537        // We allow interleaving multiple transactions, so
538        // we don't limit commit to the associated xid.
539        XATransactionId x;
540        if (xid == null || (equals(associatedXid, xid))) {
541            // should never happen, end(xid,TMSUCCESS) must have been previously
542            // called
543            throw new XAException(XAException.XAER_PROTO);
544        } else {
545            x = new XATransactionId(xid);
546        }
547
548        try {
549            this.connection.checkClosedOrFailed();
550            this.connection.ensureConnectionInfoSent();
551
552            // Notify the server that the tx was committed back
553            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
554
555            syncSendPacketWithInterruptionHandling(info);
556
557                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
558                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
559                    if (l != null && !l.isEmpty()) {
560                        for (TransactionContext ctx : l) {
561                            try {
562                                ctx.afterCommit();
563                            } catch (Exception ignored) {
564                                LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
565                            }
566                        }
567                    }
568                }
569
570        } catch (JMSException e) {
571            LOG.warn("commit of: " + x + " failed with: " + e, e);
572            if (onePhase) {
573                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
574                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
575                        if (l != null && !l.isEmpty()) {
576                            for (TransactionContext ctx : l) {
577                                try {
578                                    ctx.afterRollback();
579                                } catch (Throwable ignored) {
580                                    if (LOG.isDebugEnabled()) {
581                                        LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
582                                    }
583                                }
584                            }
585                        }
586                        }
587            }
588            throw toXAException(e);
589        }
590
591    }
592
593    public void forget(Xid xid) throws XAException {
594        if (LOG.isDebugEnabled()) {
595            LOG.debug("Forget: " + xid);
596        }
597
598        // We allow interleaving multiple transactions, so
599        // we don't limit forget to the associated xid.
600        XATransactionId x;
601        if (xid == null) {
602            throw new XAException(XAException.XAER_PROTO);
603        }
604        if (equals(associatedXid, xid)) {
605            // TODO determine if this can happen... I think not.
606            x = (XATransactionId)transactionId;
607        } else {
608            x = new XATransactionId(xid);
609        }
610
611        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
612
613        try {
614            // Tell the server to forget the transaction.
615            syncSendPacketWithInterruptionHandling(info);
616        } catch (JMSException e) {
617            throw toXAException(e);
618        }
619        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
620                ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
621        }
622    }
623
624    public boolean isSameRM(XAResource xaResource) throws XAException {
625        if (xaResource == null) {
626            return false;
627        }
628        if (!(xaResource instanceof TransactionContext)) {
629            return false;
630        }
631        TransactionContext xar = (TransactionContext)xaResource;
632        try {
633            return getResourceManagerId().equals(xar.getResourceManagerId());
634        } catch (Throwable e) {
635            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
636        }
637    }
638
639    public Xid[] recover(int flag) throws XAException {
640        if (LOG.isDebugEnabled()) {
641            LOG.debug("Recover: " + flag);
642        }
643
644        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
645        try {
646            this.connection.checkClosedOrFailed();
647            this.connection.ensureConnectionInfoSent();
648
649            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
650            DataStructure[] data = receipt.getData();
651            XATransactionId[] answer;
652            if (data instanceof XATransactionId[]) {
653                answer = (XATransactionId[])data;
654            } else {
655                answer = new XATransactionId[data.length];
656                System.arraycopy(data, 0, answer, 0, data.length);
657            }
658            return answer;
659        } catch (JMSException e) {
660            throw toXAException(e);
661        }
662    }
663
664    public int getTransactionTimeout() throws XAException {
665        return 0;
666    }
667
668    public boolean setTransactionTimeout(int seconds) throws XAException {
669        return false;
670    }
671
672    // ///////////////////////////////////////////////////////////
673    //
674    // Helper methods.
675    //
676    // ///////////////////////////////////////////////////////////
677    private String getResourceManagerId() throws JMSException {
678        return this.connection.getResourceManagerId();
679    }
680
681    private void setXid(Xid xid) throws XAException {
682
683        try {
684            this.connection.checkClosedOrFailed();
685            this.connection.ensureConnectionInfoSent();
686        } catch (JMSException e) {
687            throw toXAException(e);
688        }
689
690        if (xid != null) {
691            // associate
692            associatedXid = xid;
693            transactionId = new XATransactionId(xid);
694
695            TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
696            try {
697                this.connection.asyncSendPacket(info);
698                if (LOG.isDebugEnabled()) {
699                    LOG.debug("Started XA transaction: " + transactionId);
700                }
701            } catch (JMSException e) {
702                throw toXAException(e);
703            }
704
705        } else {
706
707            if (transactionId != null) {
708                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
709                try {
710                    syncSendPacketWithInterruptionHandling(info);
711                    if (LOG.isDebugEnabled()) {
712                        LOG.debug("Ended XA transaction: " + transactionId);
713                    }
714                } catch (JMSException e) {
715                    throw toXAException(e);
716                }
717
718                // Add our self to the list of contexts that are interested in
719                // post commit/rollback events.
720                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
721                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
722                        if (l == null) {
723                            l = new ArrayList<TransactionContext>(3);
724                            ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
725                            l.add(this);
726                        } else if (!l.contains(this)) {
727                            l.add(this);
728                        }
729                        }
730            }
731
732            // dis-associate
733            associatedXid = null;
734            transactionId = null;
735        }
736    }
737
738    /**
739     * Sends the given command. Also sends the command in case of interruption,
740     * so that important commands like rollback and commit are never interrupted.
741     * If interruption occurred, set the interruption state of the current
742     * after performing the action again.
743     *
744     * @return the response
745     */
746    private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
747        try {
748            return this.connection.syncSendPacket(command);
749        } catch (JMSException e) {
750            if (e.getLinkedException() instanceof InterruptedIOException) {
751                try {
752                    Thread.interrupted();
753                    return this.connection.syncSendPacket(command);
754                } finally {
755                    Thread.currentThread().interrupt();
756                }
757            }
758
759            throw e;
760        }
761    }
762
763    /**
764     * Converts a JMSException from the server to an XAException. if the
765     * JMSException contained a linked XAException that is returned instead.
766     *
767     * @param e JMSException to convert
768     * @return XAException wrapping original exception or its message
769     */
770    private XAException toXAException(JMSException e) {
771        if (e.getCause() != null && e.getCause() instanceof XAException) {
772            XAException original = (XAException)e.getCause();
773            XAException xae = new XAException(original.getMessage());
774            xae.errorCode = original.errorCode;
775            xae.initCause(original);
776            return xae;
777        }
778
779        XAException xae = new XAException(e.getMessage());
780        xae.errorCode = XAException.XAER_RMFAIL;
781        xae.initCause(e);
782        return xae;
783    }
784
785    public ActiveMQConnection getConnection() {
786        return connection;
787    }
788
789    public void cleanup() {
790        associatedXid = null;
791        transactionId = null;
792    }
793
794    @Override
795    public String toString() {
796        return "TransactionContext{" +
797                "transactionId=" + transactionId +
798                '}';
799    }
800}