001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.pool; 018 019 import java.io.IOException; 020 021 import javax.jms.ConnectionFactory; 022 import javax.jms.Session; 023 import javax.jms.JMSException; 024 import javax.transaction.SystemException; 025 import javax.transaction.TransactionManager; 026 027 import javax.transaction.xa.XAResource; 028 import org.apache.geronimo.transaction.manager.NamedXAResourceFactory; 029 import org.slf4j.Logger; 030 import org.slf4j.LoggerFactory; 031 import org.apache.activemq.ActiveMQConnectionFactory; 032 import org.apache.activemq.ActiveMQConnection; 033 import org.apache.activemq.ActiveMQSession; 034 import org.apache.activemq.util.IOExceptionSupport; 035 import org.apache.geronimo.transaction.manager.RecoverableTransactionManager; 036 import org.apache.geronimo.transaction.manager.NamedXAResource; 037 import org.apache.geronimo.transaction.manager.WrapperNamedXAResource; 038 039 040 /** 041 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager 042 * in a way that will allow the transaction manager to correctly recover XA transactions. 043 * 044 * For example, it can be used the following way: 045 * <pre> 046 * <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 047 * <property name="brokerURL" value="tcp://localhost:61616" /> 048 * </bean> 049 * 050 * <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean"> 051 * <property name="maxConnections" value="8" /> 052 * <property name="transactionManager" ref="transactionManager" /> 053 * <property name="connectionFactory" ref="activemqConnectionFactory" /> 054 * <property name="resourceName" value="activemq.broker" /> 055 * </bean> 056 * 057 * <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> 058 * <property name="transactionManager" ref="transactionManager" /> 059 * <property name="connectionFactory" ref="activemqConnectionFactory" /> 060 * <property name="resourceName" value="activemq.broker" /> 061 * </bean> 062 * </pre> 063 */ 064 public class ActiveMQResourceManager { 065 066 private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQResourceManager.class); 067 068 private String resourceName; 069 070 private TransactionManager transactionManager; 071 072 private ConnectionFactory connectionFactory; 073 074 public void recoverResource() { 075 try { 076 if (!Recovery.recover(this)) { 077 LOGGER.info("Resource manager is unrecoverable"); 078 } 079 } catch (NoClassDefFoundError e) { 080 LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e); 081 } catch (Throwable e) { 082 LOGGER.warn("Error while recovering resource manager", e); 083 } 084 } 085 086 public String getResourceName() { 087 return resourceName; 088 } 089 090 public void setResourceName(String resourceName) { 091 this.resourceName = resourceName; 092 } 093 094 public TransactionManager getTransactionManager() { 095 return transactionManager; 096 } 097 098 public void setTransactionManager(TransactionManager transactionManager) { 099 this.transactionManager = transactionManager; 100 } 101 102 public ConnectionFactory getConnectionFactory() { 103 return connectionFactory; 104 } 105 106 public void setConnectionFactory(ConnectionFactory connectionFactory) { 107 this.connectionFactory = connectionFactory; 108 } 109 110 /** 111 * This class will ensure the broker is properly recovered when wired with 112 * the Geronimo transaction manager. 113 */ 114 public static class Recovery { 115 116 public static boolean isRecoverable(ActiveMQResourceManager rm) { 117 return rm.getConnectionFactory() instanceof ActiveMQConnectionFactory && 118 rm.getTransactionManager() instanceof RecoverableTransactionManager && 119 rm.getResourceName() != null && !"".equals(rm.getResourceName()); 120 } 121 122 public static boolean recover(final ActiveMQResourceManager rm) throws IOException { 123 if (isRecoverable(rm)) { 124 try { 125 final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory(); 126 ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection(); 127 final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED); 128 NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName()); 129 130 RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager(); 131 rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() { 132 133 @Override 134 public String getName() { 135 return rm.getResourceName(); 136 } 137 138 @Override 139 public NamedXAResource getNamedXAResource() throws SystemException { 140 try { 141 final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection(); 142 final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED); 143 activeConn.start(); 144 LOGGER.debug("new namedXAResource's connection: " + activeConn); 145 146 return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn); 147 } catch (Exception e) { 148 SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage()); 149 se.initCause(e); 150 LOGGER.error(se.getLocalizedMessage(), se); 151 throw se; 152 } 153 } 154 155 @Override 156 public void returnNamedXAResource(NamedXAResource namedXaResource) { 157 if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) { 158 try { 159 LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection); 160 ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close(); 161 } catch (Exception ignored) { 162 LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored); 163 } 164 } 165 } 166 }); 167 return true; 168 } catch (JMSException e) { 169 throw IOExceptionSupport.create(e); 170 } 171 } else { 172 return false; 173 } 174 } 175 } 176 177 public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource { 178 final ActiveMQConnection connection; 179 public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) { 180 super(xaResource, name); 181 this.connection = connection; 182 } 183 } 184 }