001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transaction; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Iterator; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.FutureTask; 027import javax.transaction.xa.XAException; 028import org.apache.activemq.command.TransactionId; 029import org.slf4j.Logger; 030 031/** 032 * Keeps track of all the actions the need to be done when a transaction does a 033 * commit or rollback. 034 * 035 * 036 */ 037public abstract class Transaction { 038 039 public static final byte START_STATE = 0; // can go to: 1,2,3 040 public static final byte IN_USE_STATE = 1; // can go to: 2,3 041 public static final byte PREPARED_STATE = 2; // can go to: 3 042 public static final byte FINISHED_STATE = 3; 043 044 private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>(); 045 private byte state = START_STATE; 046 protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() { 047 public Object call() throws Exception { 048 doPreCommit(); 049 return null; 050 } 051 }); 052 protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() { 053 public Object call() throws Exception { 054 doPostCommit(); 055 return null; 056 } 057 }); 058 059 public byte getState() { 060 return state; 061 } 062 063 public void setState(byte state) { 064 this.state = state; 065 } 066 067 public void addSynchronization(Synchronization r) { 068 synchronizations.add(r); 069 if (state == START_STATE) { 070 state = IN_USE_STATE; 071 } 072 } 073 074 public void removeSynchronization(Synchronization r) { 075 synchronizations.remove(r); 076 } 077 078 public void prePrepare() throws Exception { 079 080 // Is it ok to call prepare now given the state of the 081 // transaction? 082 switch (state) { 083 case START_STATE: 084 case IN_USE_STATE: 085 break; 086 default: 087 XAException xae = new XAException("Prepare cannot be called now."); 088 xae.errorCode = XAException.XAER_PROTO; 089 throw xae; 090 } 091 092 // // Run the prePrepareTasks 093 // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) { 094 // Callback r = (Callback) iter.next(); 095 // r.execute(); 096 // } 097 } 098 099 protected void fireBeforeCommit() throws Exception { 100 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { 101 Synchronization s = iter.next(); 102 s.beforeCommit(); 103 } 104 } 105 106 protected void fireAfterCommit() throws Exception { 107 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { 108 Synchronization s = iter.next(); 109 s.afterCommit(); 110 } 111 } 112 113 public void fireAfterRollback() throws Exception { 114 Collections.reverse(synchronizations); 115 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { 116 Synchronization s = iter.next(); 117 s.afterRollback(); 118 } 119 } 120 121 @Override 122 public String toString() { 123 return super.toString() + "[synchronizations=" + synchronizations + "]"; 124 } 125 126 public abstract void commit(boolean onePhase) throws XAException, IOException; 127 128 public abstract void rollback() throws XAException, IOException; 129 130 public abstract int prepare() throws XAException, IOException; 131 132 public abstract TransactionId getTransactionId(); 133 134 public abstract Logger getLog(); 135 136 public boolean isPrepared() { 137 return getState() == PREPARED_STATE; 138 } 139 140 public int size() { 141 return synchronizations.size(); 142 } 143 144 protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException { 145 try { 146 postCommitTask.get(); 147 } catch (InterruptedException e) { 148 throw new InterruptedIOException(e.toString()); 149 } catch (ExecutionException e) { 150 Throwable t = e.getCause(); 151 if (t instanceof XAException) { 152 throw (XAException) t; 153 } else if (t instanceof IOException) { 154 throw (IOException) t; 155 } else { 156 throw new XAException(e.toString()); 157 } 158 } 159 } 160 161 protected void doPreCommit() throws XAException { 162 try { 163 fireBeforeCommit(); 164 } catch (Throwable e) { 165 // I guess this could happen. Post commit task failed 166 // to execute properly. 167 getLog().warn("PRE COMMIT FAILED: ", e); 168 XAException xae = new XAException("PRE COMMIT FAILED"); 169 xae.errorCode = XAException.XAER_RMERR; 170 xae.initCause(e); 171 throw xae; 172 } 173 } 174 175 protected void doPostCommit() throws XAException { 176 try { 177 fireAfterCommit(); 178 } catch (Throwable e) { 179 // I guess this could happen. Post commit task failed 180 // to execute properly. 181 getLog().warn("POST COMMIT FAILED: ", e); 182 XAException xae = new XAException("POST COMMIT FAILED"); 183 xae.errorCode = XAException.XAER_RMERR; 184 xae.initCause(e); 185 throw xae; 186 } 187 } 188}