001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.store.journal; 019 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.LinkedHashMap; 024 import java.util.Map; 025 import javax.transaction.xa.XAException; 026 import org.apache.activeio.journal.RecordLocation; 027 import org.apache.activemq.command.JournalTopicAck; 028 import org.apache.activemq.command.JournalTransaction; 029 import org.apache.activemq.command.Message; 030 import org.apache.activemq.command.MessageAck; 031 import org.apache.activemq.command.TransactionId; 032 import org.apache.activemq.command.XATransactionId; 033 import org.apache.activemq.store.TransactionRecoveryListener; 034 import org.apache.activemq.store.TransactionStore; 035 036 /** 037 */ 038 public class JournalTransactionStore implements TransactionStore { 039 040 private final JournalPersistenceAdapter peristenceAdapter; 041 private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>(); 042 private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>(); 043 private boolean doingRecover; 044 045 public static class TxOperation { 046 047 static final byte ADD_OPERATION_TYPE = 0; 048 static final byte REMOVE_OPERATION_TYPE = 1; 049 static final byte ACK_OPERATION_TYPE = 3; 050 051 public byte operationType; 052 public JournalMessageStore store; 053 public Object data; 054 055 public TxOperation(byte operationType, JournalMessageStore store, Object data) { 056 this.operationType = operationType; 057 this.store = store; 058 this.data = data; 059 } 060 061 } 062 063 /** 064 * Operations 065 * 066 * 067 */ 068 public static class Tx { 069 070 private final RecordLocation location; 071 private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>(); 072 073 public Tx(RecordLocation location) { 074 this.location = location; 075 } 076 077 public void add(JournalMessageStore store, Message msg) { 078 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); 079 } 080 081 public void add(JournalMessageStore store, MessageAck ack) { 082 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); 083 } 084 085 public void add(JournalTopicMessageStore store, JournalTopicAck ack) { 086 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); 087 } 088 089 public Message[] getMessages() { 090 ArrayList<Object> list = new ArrayList<Object>(); 091 for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) { 092 TxOperation op = iter.next(); 093 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 094 list.add(op.data); 095 } 096 } 097 Message rc[] = new Message[list.size()]; 098 list.toArray(rc); 099 return rc; 100 } 101 102 public MessageAck[] getAcks() { 103 ArrayList<Object> list = new ArrayList<Object>(); 104 for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) { 105 TxOperation op = iter.next(); 106 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 107 list.add(op.data); 108 } 109 } 110 MessageAck rc[] = new MessageAck[list.size()]; 111 list.toArray(rc); 112 return rc; 113 } 114 115 public ArrayList<TxOperation> getOperations() { 116 return operations; 117 } 118 119 } 120 121 public JournalTransactionStore(JournalPersistenceAdapter adapter) { 122 this.peristenceAdapter = adapter; 123 } 124 125 /** 126 * @throws IOException 127 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 128 */ 129 public void prepare(TransactionId txid) throws IOException { 130 Tx tx = null; 131 synchronized (inflightTransactions) { 132 tx = inflightTransactions.remove(txid); 133 } 134 if (tx == null) { 135 return; 136 } 137 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), 138 true); 139 synchronized (preparedTransactions) { 140 preparedTransactions.put(txid, tx); 141 } 142 } 143 144 /** 145 * @throws IOException 146 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 147 */ 148 public void replayPrepare(TransactionId txid) throws IOException { 149 Tx tx = null; 150 synchronized (inflightTransactions) { 151 tx = inflightTransactions.remove(txid); 152 } 153 if (tx == null) { 154 return; 155 } 156 synchronized (preparedTransactions) { 157 preparedTransactions.put(txid, tx); 158 } 159 } 160 161 public Tx getTx(Object txid, RecordLocation location) { 162 Tx tx = null; 163 synchronized (inflightTransactions) { 164 tx = inflightTransactions.get(txid); 165 } 166 if (tx == null) { 167 tx = new Tx(location); 168 inflightTransactions.put(txid, tx); 169 } 170 return tx; 171 } 172 173 /** 174 * @throws XAException 175 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 176 */ 177 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 178 Tx tx; 179 if (preCommit != null) { 180 preCommit.run(); 181 } 182 if (wasPrepared) { 183 synchronized (preparedTransactions) { 184 tx = preparedTransactions.remove(txid); 185 } 186 } else { 187 synchronized (inflightTransactions) { 188 tx = inflightTransactions.remove(txid); 189 } 190 } 191 if (tx == null) { 192 if (postCommit != null) { 193 postCommit.run(); 194 } 195 return; 196 } 197 if (txid.isXATransaction()) { 198 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, 199 wasPrepared), true); 200 } else { 201 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, 202 wasPrepared), true); 203 } 204 if (postCommit != null) { 205 postCommit.run(); 206 } 207 } 208 209 /** 210 * @throws XAException 211 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 212 */ 213 public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { 214 if (wasPrepared) { 215 synchronized (preparedTransactions) { 216 return preparedTransactions.remove(txid); 217 } 218 } else { 219 synchronized (inflightTransactions) { 220 return inflightTransactions.remove(txid); 221 } 222 } 223 } 224 225 /** 226 * @throws IOException 227 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 228 */ 229 public void rollback(TransactionId txid) throws IOException { 230 Tx tx = null; 231 synchronized (inflightTransactions) { 232 tx = inflightTransactions.remove(txid); 233 } 234 if (tx != null) { 235 synchronized (preparedTransactions) { 236 tx = preparedTransactions.remove(txid); 237 } 238 } 239 if (tx != null) { 240 if (txid.isXATransaction()) { 241 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, 242 false), true); 243 } else { 244 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, 245 txid, false), true); 246 } 247 } 248 } 249 250 /** 251 * @throws IOException 252 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 253 */ 254 public void replayRollback(TransactionId txid) throws IOException { 255 boolean inflight = false; 256 synchronized (inflightTransactions) { 257 inflight = inflightTransactions.remove(txid) != null; 258 } 259 if (inflight) { 260 synchronized (preparedTransactions) { 261 preparedTransactions.remove(txid); 262 } 263 } 264 } 265 266 public void start() throws Exception { 267 } 268 269 public void stop() throws Exception { 270 } 271 272 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 273 // All the in-flight transactions get rolled back.. 274 synchronized (inflightTransactions) { 275 inflightTransactions.clear(); 276 } 277 this.doingRecover = true; 278 try { 279 Map<TransactionId, Tx> txs = null; 280 synchronized (preparedTransactions) { 281 txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions); 282 } 283 for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) { 284 Object txid = iter.next(); 285 Tx tx = txs.get(txid); 286 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 287 } 288 } finally { 289 this.doingRecover = false; 290 } 291 } 292 293 /** 294 * @param message 295 * @throws IOException 296 */ 297 void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException { 298 Tx tx = getTx(message.getTransactionId(), location); 299 tx.add(store, message); 300 } 301 302 /** 303 * @param ack 304 * @throws IOException 305 */ 306 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) 307 throws IOException { 308 Tx tx = getTx(ack.getTransactionId(), location); 309 tx.add(store, ack); 310 } 311 312 public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { 313 Tx tx = getTx(ack.getTransactionId(), location); 314 tx.add(store, ack); 315 } 316 317 public RecordLocation checkpoint() throws IOException { 318 // Nothing really to checkpoint.. since, we don't 319 // checkpoint tx operations in to long term store until they are 320 // committed. 321 // But we keep track of the first location of an operation 322 // that was associated with an active tx. The journal can not 323 // roll over active tx records. 324 RecordLocation rc = null; 325 synchronized (inflightTransactions) { 326 for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) { 327 Tx tx = iter.next(); 328 RecordLocation location = tx.location; 329 if (rc == null || rc.compareTo(location) < 0) { 330 rc = location; 331 } 332 } 333 } 334 synchronized (preparedTransactions) { 335 for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) { 336 Tx tx = iter.next(); 337 RecordLocation location = tx.location; 338 if (rc == null || rc.compareTo(location) < 0) { 339 rc = location; 340 } 341 } 342 return rc; 343 } 344 } 345 346 public boolean isDoingRecover() { 347 return doingRecover; 348 } 349 350 }