001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.LinkedHashMap; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.ConcurrentHashMap; 026 027 import javax.jms.JMSException; 028 import javax.transaction.xa.XAException; 029 030 import org.apache.activemq.ActiveMQMessageAudit; 031 import org.apache.activemq.broker.jmx.ManagedRegionBroker; 032 import org.apache.activemq.broker.region.Destination; 033 import org.apache.activemq.broker.region.Queue; 034 import org.apache.activemq.command.ActiveMQDestination; 035 import org.apache.activemq.command.BaseCommand; 036 import org.apache.activemq.command.ConnectionInfo; 037 import org.apache.activemq.command.LocalTransactionId; 038 import org.apache.activemq.command.Message; 039 import org.apache.activemq.command.MessageAck; 040 import org.apache.activemq.command.ProducerInfo; 041 import org.apache.activemq.command.TransactionId; 042 import org.apache.activemq.command.XATransactionId; 043 import org.apache.activemq.state.ProducerState; 044 import org.apache.activemq.store.TransactionRecoveryListener; 045 import org.apache.activemq.store.TransactionStore; 046 import org.apache.activemq.transaction.LocalTransaction; 047 import org.apache.activemq.transaction.Synchronization; 048 import org.apache.activemq.transaction.Transaction; 049 import org.apache.activemq.transaction.XATransaction; 050 import org.apache.activemq.util.IOExceptionSupport; 051 import org.apache.activemq.util.WrappedException; 052 import org.slf4j.Logger; 053 import org.slf4j.LoggerFactory; 054 055 /** 056 * This broker filter handles the transaction related operations in the Broker 057 * interface. 058 * 059 * 060 */ 061 public class TransactionBroker extends BrokerFilter { 062 063 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class); 064 065 // The prepared XA transactions. 066 private TransactionStore transactionStore; 067 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>(); 068 private ActiveMQMessageAudit audit; 069 070 public TransactionBroker(Broker next, TransactionStore transactionStore) { 071 super(next); 072 this.transactionStore = transactionStore; 073 } 074 075 // //////////////////////////////////////////////////////////////////////////// 076 // 077 // Life cycle Methods 078 // 079 // //////////////////////////////////////////////////////////////////////////// 080 081 /** 082 * Recovers any prepared transactions. 083 */ 084 public void start() throws Exception { 085 transactionStore.start(); 086 try { 087 final ConnectionContext context = new ConnectionContext(); 088 context.setBroker(this); 089 context.setInRecoveryMode(true); 090 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 091 context.setProducerFlowControl(false); 092 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 093 producerExchange.setMutable(true); 094 producerExchange.setConnectionContext(context); 095 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 096 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange(); 097 consumerExchange.setConnectionContext(context); 098 transactionStore.recover(new TransactionRecoveryListener() { 099 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 100 try { 101 beginTransaction(context, xid); 102 XATransaction transaction = (XATransaction) getTransaction(context, xid, false); 103 for (int i = 0; i < addedMessages.length; i++) { 104 forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]); 105 } 106 for (int i = 0; i < aks.length; i++) { 107 forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]); 108 } 109 transaction.setState(Transaction.PREPARED_STATE); 110 registerMBean(transaction); 111 if (LOG.isDebugEnabled()) { 112 LOG.debug("recovered prepared transaction: " + transaction.getTransactionId()); 113 } 114 } catch (Throwable e) { 115 throw new WrappedException(e); 116 } 117 } 118 }); 119 } catch (WrappedException e) { 120 Throwable cause = e.getCause(); 121 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause); 122 } 123 next.start(); 124 } 125 126 private void registerMBean(XATransaction transaction) { 127 if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) { 128 ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker(); 129 managedRegionBroker.registerRecoveredTransactionMBean(transaction); 130 } 131 } 132 133 private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction, 134 ActiveMQDestination amqDestination, BaseCommand ack) throws Exception { 135 Destination destination = addDestination(context, amqDestination, false); 136 registerSync(destination, transaction, ack); 137 } 138 139 private void registerSync(Destination destination, Transaction transaction, BaseCommand command) { 140 if (destination instanceof Queue) { 141 Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage()); 142 // ensure one per destination in the list 143 transaction.removeSynchronization(sync); 144 transaction.addSynchronization(sync); 145 } 146 } 147 148 static class PreparedDestinationCompletion extends Synchronization { 149 final Queue queue; 150 final boolean messageSend; 151 public PreparedDestinationCompletion(final Queue queue, boolean messageSend) { 152 this.queue = queue; 153 // rollback relevant to acks, commit to sends 154 this.messageSend = messageSend; 155 } 156 157 @Override 158 public int hashCode() { 159 return System.identityHashCode(queue) + 160 System.identityHashCode(Boolean.valueOf(messageSend)); 161 } 162 163 @Override 164 public boolean equals(Object other) { 165 return other instanceof PreparedDestinationCompletion && 166 queue.equals(((PreparedDestinationCompletion) other).queue) && 167 messageSend == ((PreparedDestinationCompletion) other).messageSend; 168 } 169 170 @Override 171 public void afterRollback() throws Exception { 172 if (!messageSend) { 173 queue.clearPendingMessages(); 174 if (LOG.isDebugEnabled()) { 175 LOG.debug("cleared pending from afterRollback : " + queue); 176 } 177 } 178 } 179 180 @Override 181 public void afterCommit() throws Exception { 182 if (messageSend) { 183 queue.clearPendingMessages(); 184 if (LOG.isDebugEnabled()) { 185 LOG.debug("cleared pending from afterCommit : " + queue); 186 } 187 } 188 } 189 } 190 191 public void stop() throws Exception { 192 transactionStore.stop(); 193 next.stop(); 194 } 195 196 // //////////////////////////////////////////////////////////////////////////// 197 // 198 // BrokerFilter overrides 199 // 200 // //////////////////////////////////////////////////////////////////////////// 201 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 202 List<TransactionId> txs = new ArrayList<TransactionId>(); 203 synchronized (xaTransactions) { 204 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) { 205 Transaction tx = iter.next(); 206 if (tx.isPrepared()) { 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("prepared transaction: " + tx.getTransactionId()); 209 } 210 txs.add(tx.getTransactionId()); 211 } 212 } 213 } 214 XATransactionId rc[] = new XATransactionId[txs.size()]; 215 txs.toArray(rc); 216 if (LOG.isDebugEnabled()) { 217 LOG.debug("prepared transaction list size: " + rc.length); 218 } 219 return rc; 220 } 221 222 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 223 // the transaction may have already been started. 224 if (xid.isXATransaction()) { 225 XATransaction transaction = null; 226 synchronized (xaTransactions) { 227 transaction = xaTransactions.get(xid); 228 if (transaction != null) { 229 return; 230 } 231 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId()); 232 xaTransactions.put(xid, transaction); 233 } 234 } else { 235 Map<TransactionId, Transaction> transactionMap = context.getTransactions(); 236 Transaction transaction = transactionMap.get(xid); 237 if (transaction != null) { 238 throw new JMSException("Transaction '" + xid + "' has already been started."); 239 } 240 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context); 241 transactionMap.put(xid, transaction); 242 } 243 } 244 245 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 246 Transaction transaction = getTransaction(context, xid, false); 247 return transaction.prepare(); 248 } 249 250 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 251 Transaction transaction = getTransaction(context, xid, true); 252 transaction.commit(onePhase); 253 } 254 255 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 256 Transaction transaction = getTransaction(context, xid, true); 257 transaction.rollback(); 258 } 259 260 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 261 Transaction transaction = getTransaction(context, xid, true); 262 transaction.rollback(); 263 } 264 265 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 266 // This method may be invoked recursively. 267 // Track original tx so that it can be restored. 268 final ConnectionContext context = consumerExchange.getConnectionContext(); 269 Transaction originalTx = context.getTransaction(); 270 Transaction transaction = null; 271 if (ack.isInTransaction()) { 272 transaction = getTransaction(context, ack.getTransactionId(), false); 273 } 274 context.setTransaction(transaction); 275 try { 276 next.acknowledge(consumerExchange, ack); 277 } finally { 278 context.setTransaction(originalTx); 279 } 280 } 281 282 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception { 283 // This method may be invoked recursively. 284 // Track original tx so that it can be restored. 285 final ConnectionContext context = producerExchange.getConnectionContext(); 286 Transaction originalTx = context.getTransaction(); 287 Transaction transaction = null; 288 Synchronization sync = null; 289 if (message.getTransactionId() != null) { 290 transaction = getTransaction(context, message.getTransactionId(), false); 291 if (transaction != null) { 292 sync = new Synchronization() { 293 294 public void afterRollback() { 295 if (audit != null) { 296 audit.rollback(message); 297 } 298 } 299 }; 300 transaction.addSynchronization(sync); 301 } 302 } 303 if (audit == null || !audit.isDuplicate(message)) { 304 context.setTransaction(transaction); 305 try { 306 next.send(producerExchange, message); 307 } finally { 308 context.setTransaction(originalTx); 309 } 310 } else { 311 if (sync != null && transaction != null) { 312 transaction.removeSynchronization(sync); 313 } 314 if (LOG.isDebugEnabled()) { 315 LOG.debug("IGNORING duplicate message " + message); 316 } 317 } 318 } 319 320 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 321 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) { 322 try { 323 Transaction transaction = iter.next(); 324 transaction.rollback(); 325 } catch (Exception e) { 326 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e); 327 } 328 iter.remove(); 329 } 330 331 synchronized (xaTransactions) { 332 // first find all txs that belongs to the connection 333 ArrayList<XATransaction> txs = new ArrayList<XATransaction>(); 334 for (XATransaction tx : xaTransactions.values()) { 335 if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) { 336 txs.add(tx); 337 } 338 } 339 340 // then remove them 341 // two steps needed to avoid ConcurrentModificationException, from removeTransaction() 342 for (XATransaction tx : txs) { 343 try { 344 tx.rollback(); 345 } catch (Exception e) { 346 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e); 347 } 348 } 349 350 } 351 next.removeConnection(context, info, error); 352 } 353 354 // //////////////////////////////////////////////////////////////////////////// 355 // 356 // Implementation help methods. 357 // 358 // //////////////////////////////////////////////////////////////////////////// 359 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException { 360 Map transactionMap = null; 361 synchronized (xaTransactions) { 362 transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions(); 363 } 364 Transaction transaction = (Transaction)transactionMap.get(xid); 365 if (transaction != null) { 366 return transaction; 367 } 368 if (xid.isXATransaction()) { 369 XAException e = new XAException("Transaction '" + xid + "' has not been started."); 370 e.errorCode = XAException.XAER_NOTA; 371 throw e; 372 } else { 373 throw new JMSException("Transaction '" + xid + "' has not been started."); 374 } 375 } 376 377 public void removeTransaction(XATransactionId xid) { 378 synchronized (xaTransactions) { 379 xaTransactions.remove(xid); 380 } 381 } 382 383 public synchronized void brokerServiceStarted() { 384 super.brokerServiceStarted(); 385 if (getBrokerService().isSupportFailOver() && audit == null) { 386 audit = new ActiveMQMessageAudit(); 387 } 388 } 389 390 }