001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.SQLException; 023import java.sql.Statement; 024 025import javax.sql.DataSource; 026 027import org.apache.activemq.util.IOExceptionSupport; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Helps keep track of the current transaction/JDBC connection. 033 * 034 * 035 */ 036public class TransactionContext { 037 038 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 039 040 private final DataSource dataSource; 041 private final JDBCPersistenceAdapter persistenceAdapter; 042 private Connection connection; 043 private boolean inTx; 044 private PreparedStatement addMessageStatement; 045 private PreparedStatement removedMessageStatement; 046 private PreparedStatement updateLastAckStatement; 047 // a cheap dirty level that we can live with 048 private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; 049 050 public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException { 051 this.persistenceAdapter = persistenceAdapter; 052 this.dataSource = persistenceAdapter.getDataSource(); 053 } 054 055 public Connection getConnection() throws IOException { 056 if (connection == null) { 057 try { 058 connection = dataSource.getConnection(); 059 boolean autoCommit = !inTx; 060 if (connection.getAutoCommit() != autoCommit) { 061 connection.setAutoCommit(autoCommit); 062 } 063 } catch (SQLException e) { 064 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); 065 IOException ioe = IOExceptionSupport.create(e); 066 persistenceAdapter.getBrokerService().handleIOException(ioe); 067 throw ioe; 068 069 } 070 071 try { 072 connection.setTransactionIsolation(transactionIsolation); 073 } catch (Throwable e) { 074 } 075 } 076 return connection; 077 } 078 079 public void executeBatch() throws SQLException { 080 try { 081 executeBatch(addMessageStatement, "Failed add a message"); 082 } finally { 083 addMessageStatement = null; 084 try { 085 executeBatch(removedMessageStatement, "Failed to remove a message"); 086 } finally { 087 removedMessageStatement = null; 088 try { 089 executeBatch(updateLastAckStatement, "Failed to ack a message"); 090 } finally { 091 updateLastAckStatement = null; 092 } 093 } 094 } 095 } 096 097 private void executeBatch(PreparedStatement p, String message) throws SQLException { 098 if (p == null) { 099 return; 100 } 101 102 try { 103 int[] rc = p.executeBatch(); 104 for (int i = 0; i < rc.length; i++) { 105 int code = rc[i]; 106 if (code < 0 && code != Statement.SUCCESS_NO_INFO) { 107 throw new SQLException(message + ". Response code: " + code); 108 } 109 } 110 } finally { 111 try { 112 p.close(); 113 } catch (Throwable e) { 114 } 115 } 116 } 117 118 public void close() throws IOException { 119 if (!inTx) { 120 try { 121 122 /** 123 * we are not in a transaction so should not be committing ?? 124 * This was previously commented out - but had adverse affects 125 * on testing - so it's back! 126 * 127 */ 128 try { 129 executeBatch(); 130 } finally { 131 if (connection != null && !connection.getAutoCommit()) { 132 connection.commit(); 133 } 134 } 135 136 } catch (SQLException e) { 137 JDBCPersistenceAdapter.log("Error while closing connection: ", e); 138 throw IOExceptionSupport.create(e); 139 } finally { 140 try { 141 if (connection != null) { 142 connection.close(); 143 } 144 } catch (Throwable e) { 145 LOG.warn("Close failed: " + e.getMessage(), e); 146 } finally { 147 connection = null; 148 } 149 } 150 } 151 } 152 153 public void begin() throws IOException { 154 if (inTx) { 155 throw new IOException("Already started."); 156 } 157 inTx = true; 158 connection = getConnection(); 159 } 160 161 public void commit() throws IOException { 162 if (!inTx) { 163 throw new IOException("Not started."); 164 } 165 try { 166 executeBatch(); 167 if (!connection.getAutoCommit()) { 168 connection.commit(); 169 } 170 } catch (SQLException e) { 171 JDBCPersistenceAdapter.log("Commit failed: ", e); 172 173 this.rollback(); 174 175 throw IOExceptionSupport.create(e); 176 } finally { 177 inTx = false; 178 close(); 179 } 180 } 181 182 public void rollback() throws IOException { 183 if (!inTx) { 184 throw new IOException("Not started."); 185 } 186 try { 187 if (addMessageStatement != null) { 188 addMessageStatement.close(); 189 addMessageStatement = null; 190 } 191 if (removedMessageStatement != null) { 192 removedMessageStatement.close(); 193 removedMessageStatement = null; 194 } 195 if (updateLastAckStatement != null) { 196 updateLastAckStatement.close(); 197 updateLastAckStatement = null; 198 } 199 connection.rollback(); 200 201 } catch (SQLException e) { 202 JDBCPersistenceAdapter.log("Rollback failed: ", e); 203 throw IOExceptionSupport.create(e); 204 } finally { 205 inTx = false; 206 close(); 207 } 208 } 209 210 public PreparedStatement getAddMessageStatement() { 211 return addMessageStatement; 212 } 213 214 public void setAddMessageStatement(PreparedStatement addMessageStatement) { 215 this.addMessageStatement = addMessageStatement; 216 } 217 218 public PreparedStatement getUpdateLastAckStatement() { 219 return updateLastAckStatement; 220 } 221 222 public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { 223 this.updateLastAckStatement = ackMessageStatement; 224 } 225 226 public PreparedStatement getRemovedMessageStatement() { 227 return removedMessageStatement; 228 } 229 230 public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { 231 this.removedMessageStatement = removedMessageStatement; 232 } 233 234 public void setTransactionIsolation(int transactionIsolation) { 235 this.transactionIsolation = transactionIsolation; 236 } 237 238}