001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.store.amq; 019 020 import java.io.IOException; 021 import java.util.Iterator; 022 import java.util.LinkedHashMap; 023 import java.util.Map; 024 import javax.transaction.xa.XAException; 025 import org.apache.activemq.command.JournalTopicAck; 026 import org.apache.activemq.command.JournalTransaction; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageAck; 029 import org.apache.activemq.command.TransactionId; 030 import org.apache.activemq.command.XATransactionId; 031 import org.apache.activemq.kaha.impl.async.Location; 032 import org.apache.activemq.store.TransactionRecoveryListener; 033 import org.apache.activemq.store.TransactionStore; 034 035 /** 036 */ 037 public class AMQTransactionStore implements TransactionStore { 038 039 protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>(); 040 Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>(); 041 042 private final AMQPersistenceAdapter peristenceAdapter; 043 private boolean doingRecover; 044 045 public AMQTransactionStore(AMQPersistenceAdapter adapter) { 046 this.peristenceAdapter = adapter; 047 } 048 049 /** 050 * @throws IOException 051 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 052 */ 053 public void prepare(TransactionId txid) throws IOException { 054 AMQTx tx = null; 055 synchronized (inflightTransactions) { 056 tx = inflightTransactions.remove(txid); 057 } 058 if (tx == null) { 059 return; 060 } 061 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true); 062 synchronized (preparedTransactions) { 063 preparedTransactions.put(txid, tx); 064 } 065 } 066 067 /** 068 * @throws IOException 069 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 070 */ 071 public void replayPrepare(TransactionId txid) throws IOException { 072 AMQTx tx = null; 073 synchronized (inflightTransactions) { 074 tx = inflightTransactions.remove(txid); 075 } 076 if (tx == null) { 077 return; 078 } 079 synchronized (preparedTransactions) { 080 preparedTransactions.put(txid, tx); 081 } 082 } 083 084 public AMQTx getTx(TransactionId txid, Location location) { 085 AMQTx tx = null; 086 synchronized (inflightTransactions) { 087 tx = inflightTransactions.get(txid); 088 if (tx == null) { 089 tx = new AMQTx(location); 090 inflightTransactions.put(txid, tx); 091 } 092 } 093 return tx; 094 } 095 096 /** 097 * @throws XAException 098 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 099 */ 100 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 101 if (preCommit != null) { 102 preCommit.run(); 103 } 104 AMQTx tx; 105 if (wasPrepared) { 106 synchronized (preparedTransactions) { 107 tx = preparedTransactions.remove(txid); 108 } 109 } else { 110 synchronized (inflightTransactions) { 111 tx = inflightTransactions.remove(txid); 112 } 113 } 114 if (tx == null) { 115 if (postCommit != null) { 116 postCommit.run(); 117 } 118 return; 119 } 120 if (txid.isXATransaction()) { 121 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true,true); 122 } else { 123 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true); 124 } 125 if (postCommit != null) { 126 postCommit.run(); 127 } 128 } 129 130 /** 131 * @throws XAException 132 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 133 */ 134 public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { 135 if (wasPrepared) { 136 synchronized (preparedTransactions) { 137 return preparedTransactions.remove(txid); 138 } 139 } else { 140 synchronized (inflightTransactions) { 141 return inflightTransactions.remove(txid); 142 } 143 } 144 } 145 146 /** 147 * @throws IOException 148 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 149 */ 150 public void rollback(TransactionId txid) throws IOException { 151 AMQTx tx = null; 152 synchronized (inflightTransactions) { 153 tx = inflightTransactions.remove(txid); 154 } 155 if (tx != null) { 156 synchronized (preparedTransactions) { 157 tx = preparedTransactions.remove(txid); 158 } 159 } 160 if (tx != null) { 161 if (txid.isXATransaction()) { 162 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true,true); 163 } else { 164 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true,true); 165 } 166 } 167 } 168 169 /** 170 * @throws IOException 171 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 172 */ 173 public void replayRollback(TransactionId txid) throws IOException { 174 boolean inflight = false; 175 synchronized (inflightTransactions) { 176 inflight = inflightTransactions.remove(txid) != null; 177 } 178 if (inflight) { 179 synchronized (preparedTransactions) { 180 preparedTransactions.remove(txid); 181 } 182 } 183 } 184 185 public void start() throws Exception { 186 } 187 188 public void stop() throws Exception { 189 } 190 191 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 192 // All the in-flight transactions get rolled back.. 193 synchronized (inflightTransactions) { 194 inflightTransactions.clear(); 195 } 196 this.doingRecover = true; 197 try { 198 Map<TransactionId, AMQTx> txs = null; 199 synchronized (preparedTransactions) { 200 txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions); 201 } 202 for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) { 203 Object txid = iter.next(); 204 AMQTx tx = txs.get(txid); 205 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 206 } 207 } finally { 208 this.doingRecover = false; 209 } 210 } 211 212 /** 213 * @param message 214 * @throws IOException 215 */ 216 void addMessage(AMQMessageStore store, Message message, Location location) throws IOException { 217 AMQTx tx = getTx(message.getTransactionId(), location); 218 tx.add(store, message, location); 219 } 220 221 /** 222 * @param ack 223 * @throws IOException 224 */ 225 public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException { 226 AMQTx tx = getTx(ack.getTransactionId(), location); 227 tx.add(store, ack); 228 } 229 230 public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) { 231 AMQTx tx = getTx(ack.getTransactionId(), location); 232 tx.add(store, ack); 233 } 234 235 public Location checkpoint() throws IOException { 236 // Nothing really to checkpoint.. since, we don't 237 // checkpoint tx operations in to long term store until they are 238 // committed. 239 // But we keep track of the first location of an operation 240 // that was associated with an active tx. The journal can not 241 // roll over active tx records. 242 Location minimumLocationInUse = null; 243 synchronized (inflightTransactions) { 244 for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) { 245 AMQTx tx = iter.next(); 246 Location location = tx.getLocation(); 247 if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) { 248 minimumLocationInUse = location; 249 } 250 } 251 } 252 synchronized (preparedTransactions) { 253 for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) { 254 AMQTx tx = iter.next(); 255 Location location = tx.getLocation(); 256 if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) { 257 minimumLocationInUse = location; 258 } 259 } 260 return minimumLocationInUse; 261 } 262 } 263 264 public boolean isDoingRecover() { 265 return doingRecover; 266 } 267 268 /** 269 * @return the preparedTransactions 270 */ 271 public Map<TransactionId, AMQTx> getPreparedTransactions() { 272 return this.preparedTransactions; 273 } 274 275 /** 276 * @param preparedTransactions the preparedTransactions to set 277 */ 278 public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) { 279 if (preparedTransactions != null) { 280 this.preparedTransactions.clear(); 281 this.preparedTransactions.putAll(preparedTransactions); 282 } 283 } 284 }