001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.io.IOException;
020
021import javax.jms.ConnectionFactory;
022import javax.jms.Session;
023import javax.jms.JMSException;
024import javax.transaction.SystemException;
025import javax.transaction.TransactionManager;
026
027import javax.transaction.xa.XAResource;
028import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.activemq.ActiveMQConnectionFactory;
032import org.apache.activemq.ActiveMQConnection;
033import org.apache.activemq.ActiveMQSession;
034import org.apache.activemq.util.IOExceptionSupport;
035import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
036import org.apache.geronimo.transaction.manager.NamedXAResource;
037import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
038
039
040/**
041 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
042 * in a way that will allow the transaction manager to correctly recover XA transactions.
043 *
044 * For example, it can be used the following way:
045 * <pre>
046 *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
047 *      <property name="brokerURL" value="tcp://localhost:61616" />
048 *   </bean>
049 *
050 *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
051 *       <property name="maxConnections" value="8" />
052 *       <property name="transactionManager" ref="transactionManager" />
053 *       <property name="connectionFactory" ref="activemqConnectionFactory" />
054 *       <property name="resourceName" value="activemq.broker" />
055 *   </bean>
056 *
057 *   <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
058 *         <property name="transactionManager" ref="transactionManager" />
059 *         <property name="connectionFactory" ref="activemqConnectionFactory" />
060 *         <property name="resourceName" value="activemq.broker" />
061 *   </bean>
062 * </pre>
063 */
064public class ActiveMQResourceManager {
065
066    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQResourceManager.class);
067
068    private String resourceName;
069
070    private TransactionManager transactionManager;
071
072    private ConnectionFactory connectionFactory;
073
074    public void recoverResource() {
075        try {
076            if (!Recovery.recover(this)) {
077                LOGGER.info("Resource manager is unrecoverable");
078            }
079        } catch (NoClassDefFoundError e) {
080            LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
081        } catch (Throwable e) {
082            LOGGER.warn("Error while recovering resource manager", e);
083        }
084    }
085
086    public String getResourceName() {
087        return resourceName;
088    }
089
090    public void setResourceName(String resourceName) {
091        this.resourceName = resourceName;
092    }
093
094    public TransactionManager getTransactionManager() {
095        return transactionManager;
096    }
097
098    public void setTransactionManager(TransactionManager transactionManager) {
099        this.transactionManager = transactionManager;
100    }
101
102    public ConnectionFactory getConnectionFactory() {
103        return connectionFactory;
104    }
105
106    public void setConnectionFactory(ConnectionFactory connectionFactory) {
107        this.connectionFactory = connectionFactory;
108    }
109
110    /**
111     * This class will ensure the broker is properly recovered when wired with
112     * the Geronimo transaction manager.
113     */
114    public static class Recovery {
115
116        public static boolean isRecoverable(ActiveMQResourceManager rm) {
117            return  rm.getConnectionFactory() instanceof ActiveMQConnectionFactory &&
118                    rm.getTransactionManager() instanceof RecoverableTransactionManager &&
119                    rm.getResourceName() != null && !"".equals(rm.getResourceName());
120        }
121
122        public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
123            if (isRecoverable(rm)) {
124                try {
125                    final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
126                    ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
127                    final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
128                    NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
129
130                    RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
131                    rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
132
133                        @Override
134                        public String getName() {
135                            return rm.getResourceName();
136                        }
137
138                        @Override
139                        public NamedXAResource getNamedXAResource() throws SystemException {
140                            try {
141                                final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
142                                final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
143                                activeConn.start();
144                                LOGGER.debug("new namedXAResource's connection: " + activeConn);
145
146                                return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
147                            } catch (Exception e) {
148                                SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
149                                se.initCause(e);
150                                LOGGER.error(se.getLocalizedMessage(), se);
151                                throw se;
152                            }
153                        }
154
155                        @Override
156                        public void returnNamedXAResource(NamedXAResource namedXaResource) {
157                            if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
158                                try {
159                                    LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
160                                    ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
161                                } catch (Exception ignored) {
162                                    LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
163                                }
164                            }
165                        }
166                    });
167                    return true;
168                } catch (JMSException e) {
169                  throw IOExceptionSupport.create(e);
170                }
171            } else {
172                return false;
173            }
174        }
175    }
176
177    public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
178        final ActiveMQConnection connection;
179        public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
180            super(xaResource, name);
181            this.connection = connection;
182        }
183    }
184}