001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.Map; 022 import java.util.Map.Entry; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import org.apache.activemq.broker.BrokerService; 026 import org.apache.activemq.broker.BrokerServiceAware; 027 import org.apache.activemq.broker.ConnectionContext; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.command.MessageAck; 030 import org.apache.activemq.command.MessageId; 031 import org.apache.activemq.command.TransactionId; 032 import org.apache.activemq.command.XATransactionId; 033 import org.apache.activemq.kaha.RuntimeStoreException; 034 import org.apache.activemq.store.MessageStore; 035 import org.apache.activemq.store.ProxyMessageStore; 036 import org.apache.activemq.store.ProxyTopicMessageStore; 037 import org.apache.activemq.store.TopicMessageStore; 038 import org.apache.activemq.store.TransactionRecoveryListener; 039 import org.apache.activemq.store.TransactionStore; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 /** 044 * Provides a TransactionStore implementation that can create transaction aware 045 * MessageStore objects from non transaction aware MessageStore objects. 046 * 047 * 048 */ 049 public class KahaTransactionStore implements TransactionStore, BrokerServiceAware { 050 private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class); 051 052 private final Map transactions = new ConcurrentHashMap(); 053 private final Map prepared; 054 private final KahaPersistenceAdapter adaptor; 055 056 private BrokerService brokerService; 057 058 KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) { 059 this.adaptor = adaptor; 060 this.prepared = preparedMap; 061 } 062 063 public MessageStore proxy(MessageStore messageStore) { 064 return new ProxyMessageStore(messageStore) { 065 @Override 066 public void addMessage(ConnectionContext context, final Message send) throws IOException { 067 KahaTransactionStore.this.addMessage(getDelegate(), send); 068 } 069 070 @Override 071 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 072 KahaTransactionStore.this.addMessage(getDelegate(), send); 073 } 074 075 @Override 076 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 077 KahaTransactionStore.this.removeMessage(getDelegate(), ack); 078 } 079 }; 080 } 081 082 public TopicMessageStore proxy(TopicMessageStore messageStore) { 083 return new ProxyTopicMessageStore(messageStore) { 084 @Override 085 public void addMessage(ConnectionContext context, final Message send) throws IOException { 086 KahaTransactionStore.this.addMessage(getDelegate(), send); 087 } 088 089 @Override 090 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 091 KahaTransactionStore.this.removeMessage(getDelegate(), ack); 092 } 093 094 @Override 095 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 096 MessageId messageId, MessageAck ack) throws IOException { 097 KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack); 098 } 099 }; 100 } 101 102 /** 103 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 104 */ 105 public void prepare(TransactionId txid) { 106 KahaTransaction tx = getTx(txid); 107 if (tx != null) { 108 tx.prepare(); 109 prepared.put(txid, tx); 110 } 111 } 112 113 public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException { 114 if(before != null) { 115 before.run(); 116 } 117 KahaTransaction tx = getTx(txid); 118 if (tx != null) { 119 tx.commit(this); 120 removeTx(txid); 121 } 122 if (after != null) { 123 after.run(); 124 } 125 } 126 127 /** 128 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 129 */ 130 public void rollback(TransactionId txid) { 131 KahaTransaction tx = getTx(txid); 132 if (tx != null) { 133 tx.rollback(); 134 removeTx(txid); 135 } 136 } 137 138 public void start() throws Exception { 139 } 140 141 public void stop() throws Exception { 142 } 143 144 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 145 for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) { 146 Map.Entry entry = (Entry)i.next(); 147 XATransactionId xid = (XATransactionId)entry.getKey(); 148 KahaTransaction kt = (KahaTransaction)entry.getValue(); 149 listener.recover(xid, kt.getMessages(), kt.getAcks()); 150 } 151 } 152 153 /** 154 * @param message 155 * @throws IOException 156 */ 157 void addMessage(final MessageStore destination, final Message message) throws IOException { 158 try { 159 if (message.isInTransaction()) { 160 KahaTransaction tx = getOrCreateTx(message.getTransactionId()); 161 tx.add((KahaMessageStore)destination, message); 162 } else { 163 destination.addMessage(null, message); 164 } 165 } catch (RuntimeStoreException rse) { 166 if (rse.getCause() instanceof IOException) { 167 brokerService.handleIOException((IOException)rse.getCause()); 168 } 169 throw rse; 170 } 171 } 172 173 /** 174 * @param ack 175 * @throws IOException 176 */ 177 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 178 try { 179 if (ack.isInTransaction()) { 180 KahaTransaction tx = getOrCreateTx(ack.getTransactionId()); 181 tx.add((KahaMessageStore)destination, ack); 182 } else { 183 destination.removeMessage(null, ack); 184 } 185 } catch (RuntimeStoreException rse) { 186 if (rse.getCause() instanceof IOException) { 187 brokerService.handleIOException((IOException)rse.getCause()); 188 } 189 throw rse; 190 } 191 } 192 193 final void acknowledge(final TopicMessageStore destination, String clientId, 194 String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 195 try { 196 if (ack.isInTransaction()) { 197 KahaTransaction tx = getOrCreateTx(ack.getTransactionId()); 198 tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack); 199 } else { 200 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 201 } 202 } catch (RuntimeStoreException rse) { 203 if (rse.getCause() instanceof IOException) { 204 brokerService.handleIOException((IOException)rse.getCause()); 205 } 206 throw rse; 207 } 208 } 209 210 protected synchronized KahaTransaction getTx(TransactionId key) { 211 KahaTransaction result = (KahaTransaction)transactions.get(key); 212 if (result == null) { 213 result = (KahaTransaction)prepared.get(key); 214 } 215 return result; 216 } 217 218 protected synchronized KahaTransaction getOrCreateTx(TransactionId key) { 219 KahaTransaction result = (KahaTransaction)transactions.get(key); 220 if (result == null) { 221 result = new KahaTransaction(); 222 transactions.put(key, result); 223 } 224 return result; 225 } 226 227 protected synchronized void removeTx(TransactionId key) { 228 transactions.remove(key); 229 prepared.remove(key); 230 } 231 232 public void delete() { 233 transactions.clear(); 234 prepared.clear(); 235 } 236 237 protected MessageStore getStoreById(Object id) { 238 return adaptor.retrieveMessageStore(id); 239 } 240 241 public void setBrokerService(BrokerService brokerService) { 242 this.brokerService = brokerService; 243 } 244 }