001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.SQLException;
023import java.sql.Statement;
024
025import javax.sql.DataSource;
026
027import org.apache.activemq.util.IOExceptionSupport;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Helps keep track of the current transaction/JDBC connection.
033 * 
034 * 
035 */
036public class TransactionContext {
037
038    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
039
040    private final DataSource dataSource;
041    private final JDBCPersistenceAdapter persistenceAdapter;
042    private Connection connection;
043    private boolean inTx;
044    private PreparedStatement addMessageStatement;
045    private PreparedStatement removedMessageStatement;
046    private PreparedStatement updateLastAckStatement;
047    // a cheap dirty level that we can live with    
048    private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
049    
050    public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
051        this.persistenceAdapter = persistenceAdapter;
052        this.dataSource = persistenceAdapter.getDataSource();
053    }
054
055    public Connection getConnection() throws IOException {
056        if (connection == null) {
057            try {
058                connection = dataSource.getConnection();
059                boolean autoCommit = !inTx;
060                if (connection.getAutoCommit() != autoCommit) {
061                    connection.setAutoCommit(autoCommit);
062                }
063            } catch (SQLException e) {
064                JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
065                IOException ioe = IOExceptionSupport.create(e);
066                persistenceAdapter.getBrokerService().handleIOException(ioe);
067                throw ioe;
068
069            }
070
071            try {
072                connection.setTransactionIsolation(transactionIsolation);
073            } catch (Throwable e) {
074            }
075        }
076        return connection;
077    }
078
079    public void executeBatch() throws SQLException {
080        try {
081            executeBatch(addMessageStatement, "Failed add a message");
082        } finally {
083            addMessageStatement = null;
084            try {
085                executeBatch(removedMessageStatement, "Failed to remove a message");
086            } finally {
087                removedMessageStatement = null;
088                try {
089                    executeBatch(updateLastAckStatement, "Failed to ack a message");
090                } finally {
091                    updateLastAckStatement = null;
092                }
093            }
094        }
095    }
096
097    private void executeBatch(PreparedStatement p, String message) throws SQLException {
098        if (p == null) {
099            return;
100        }
101
102        try {
103            int[] rc = p.executeBatch();
104            for (int i = 0; i < rc.length; i++) {
105                int code = rc[i];
106                if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
107                    throw new SQLException(message + ". Response code: " + code);
108                }
109            }
110        } finally {
111            try {
112                p.close();
113            } catch (Throwable e) {
114            }
115        }
116    }
117
118    public void close() throws IOException {
119        if (!inTx) {
120            try {
121
122                /**
123                 * we are not in a transaction so should not be committing ??
124                 * This was previously commented out - but had adverse affects
125                 * on testing - so it's back!
126                 * 
127                 */
128                try {
129                    executeBatch();
130                } finally {
131                    if (connection != null && !connection.getAutoCommit()) {
132                        connection.commit();
133                    }
134                }
135
136            } catch (SQLException e) {
137                JDBCPersistenceAdapter.log("Error while closing connection: ", e);
138                throw IOExceptionSupport.create(e);
139            } finally {
140                try {
141                    if (connection != null) {
142                        connection.close();
143                    }
144                } catch (Throwable e) {
145                    LOG.warn("Close failed: " + e.getMessage(), e);
146                } finally {
147                    connection = null;
148                }
149            }
150        }
151    }
152
153    public void begin() throws IOException {
154        if (inTx) {
155            throw new IOException("Already started.");
156        }
157        inTx = true;
158        connection = getConnection();
159    }
160
161    public void commit() throws IOException {
162        if (!inTx) {
163            throw new IOException("Not started.");
164        }
165        try {
166            executeBatch();
167            if (!connection.getAutoCommit()) {
168                connection.commit();
169            }
170        } catch (SQLException e) {
171            JDBCPersistenceAdapter.log("Commit failed: ", e);
172            
173            this.rollback(); 
174            
175            throw IOExceptionSupport.create(e);
176        } finally {
177            inTx = false;
178            close();
179        }
180    }
181
182    public void rollback() throws IOException {
183        if (!inTx) {
184            throw new IOException("Not started.");
185        }
186        try {
187            if (addMessageStatement != null) {
188                addMessageStatement.close();
189                addMessageStatement = null;
190            }
191            if (removedMessageStatement != null) {
192                removedMessageStatement.close();
193                removedMessageStatement = null;
194            }
195            if (updateLastAckStatement != null) {
196                updateLastAckStatement.close();
197                updateLastAckStatement = null;
198            }
199            connection.rollback();
200
201        } catch (SQLException e) {
202            JDBCPersistenceAdapter.log("Rollback failed: ", e);
203            throw IOExceptionSupport.create(e);
204        } finally {
205            inTx = false;
206            close();
207        }
208    }
209
210    public PreparedStatement getAddMessageStatement() {
211        return addMessageStatement;
212    }
213
214    public void setAddMessageStatement(PreparedStatement addMessageStatement) {
215        this.addMessageStatement = addMessageStatement;
216    }
217
218    public PreparedStatement getUpdateLastAckStatement() {
219        return updateLastAckStatement;
220    }
221
222    public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
223        this.updateLastAckStatement = ackMessageStatement;
224    }
225
226    public PreparedStatement getRemovedMessageStatement() {
227        return removedMessageStatement;
228    }
229
230    public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
231        this.removedMessageStatement = removedMessageStatement;
232    }
233    
234    public void setTransactionIsolation(int transactionIsolation) {
235        this.transactionIsolation = transactionIsolation;
236    }
237
238}