001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.Future; 024 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.command.Message; 027import org.apache.activemq.command.MessageAck; 028import org.apache.activemq.command.MessageId; 029import org.apache.activemq.command.TransactionId; 030import org.apache.activemq.command.XATransactionId; 031import org.apache.activemq.store.AbstractMessageStore; 032import org.apache.activemq.store.MessageStore; 033import org.apache.activemq.store.PersistenceAdapter; 034import org.apache.activemq.store.ProxyMessageStore; 035import org.apache.activemq.store.ProxyTopicMessageStore; 036import org.apache.activemq.store.TopicMessageStore; 037import org.apache.activemq.store.TransactionRecoveryListener; 038import org.apache.activemq.store.TransactionStore; 039 040/** 041 * Provides a TransactionStore implementation that can create transaction aware 042 * MessageStore objects from non transaction aware MessageStore objects. 043 * 044 * 045 */ 046public class MemoryTransactionStore implements TransactionStore { 047 048 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 049 ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 050 final PersistenceAdapter persistenceAdapter; 051 052 private boolean doingRecover; 053 054 public class Tx { 055 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 056 057 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 058 059 public void add(AddMessageCommand msg) { 060 messages.add(msg); 061 } 062 063 public void add(RemoveMessageCommand ack) { 064 acks.add(ack); 065 } 066 067 public Message[] getMessages() { 068 Message rc[] = new Message[messages.size()]; 069 int count = 0; 070 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 071 AddMessageCommand cmd = iter.next(); 072 rc[count++] = cmd.getMessage(); 073 } 074 return rc; 075 } 076 077 public MessageAck[] getAcks() { 078 MessageAck rc[] = new MessageAck[acks.size()]; 079 int count = 0; 080 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 081 RemoveMessageCommand cmd = iter.next(); 082 rc[count++] = cmd.getMessageAck(); 083 } 084 return rc; 085 } 086 087 /** 088 * @throws IOException 089 */ 090 public void commit() throws IOException { 091 ConnectionContext ctx = new ConnectionContext(); 092 persistenceAdapter.beginTransaction(ctx); 093 try { 094 095 // Do all the message adds. 096 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 097 AddMessageCommand cmd = iter.next(); 098 cmd.run(ctx); 099 } 100 // And removes.. 101 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 102 RemoveMessageCommand cmd = iter.next(); 103 cmd.run(ctx); 104 } 105 106 } catch ( IOException e ) { 107 persistenceAdapter.rollbackTransaction(ctx); 108 throw e; 109 } 110 persistenceAdapter.commitTransaction(ctx); 111 } 112 } 113 114 public interface AddMessageCommand { 115 Message getMessage(); 116 117 void run(ConnectionContext context) throws IOException; 118 } 119 120 public interface RemoveMessageCommand { 121 MessageAck getMessageAck(); 122 123 void run(ConnectionContext context) throws IOException; 124 } 125 126 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { 127 this.persistenceAdapter=persistenceAdapter; 128 } 129 130 public MessageStore proxy(MessageStore messageStore) { 131 return new ProxyMessageStore(messageStore) { 132 @Override 133 public void addMessage(ConnectionContext context, final Message send) throws IOException { 134 MemoryTransactionStore.this.addMessage(getDelegate(), send); 135 } 136 137 @Override 138 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 139 MemoryTransactionStore.this.addMessage(getDelegate(), send); 140 } 141 142 @Override 143 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 144 MemoryTransactionStore.this.addMessage(getDelegate(), message); 145 return AbstractMessageStore.FUTURE; 146 } 147 148 @Override 149 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException { 150 MemoryTransactionStore.this.addMessage(getDelegate(), message); 151 return AbstractMessageStore.FUTURE; 152 } 153 154 @Override 155 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 156 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 157 } 158 159 @Override 160 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 161 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 162 } 163 }; 164 } 165 166 public TopicMessageStore proxy(TopicMessageStore messageStore) { 167 return new ProxyTopicMessageStore(messageStore) { 168 @Override 169 public void addMessage(ConnectionContext context, final Message send) throws IOException { 170 MemoryTransactionStore.this.addMessage(getDelegate(), send); 171 } 172 173 @Override 174 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 175 MemoryTransactionStore.this.addMessage(getDelegate(), send); 176 } 177 178 @Override 179 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 180 MemoryTransactionStore.this.addMessage(getDelegate(), message); 181 return AbstractMessageStore.FUTURE; 182 } 183 184 @Override 185 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 186 MemoryTransactionStore.this.addMessage(getDelegate(), message); 187 return AbstractMessageStore.FUTURE; 188 } 189 190 @Override 191 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 192 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 193 } 194 195 @Override 196 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 197 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 198 } 199 200 @Override 201 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 202 MessageId messageId, MessageAck ack) throws IOException { 203 MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, 204 subscriptionName, messageId, ack); 205 } 206 }; 207 } 208 209 /** 210 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 211 */ 212 public void prepare(TransactionId txid) { 213 Tx tx = inflightTransactions.remove(txid); 214 if (tx == null) { 215 return; 216 } 217 preparedTransactions.put(txid, tx); 218 } 219 220 public Tx getTx(Object txid) { 221 Tx tx = inflightTransactions.get(txid); 222 if (tx == null) { 223 tx = new Tx(); 224 inflightTransactions.put(txid, tx); 225 } 226 return tx; 227 } 228 229 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 230 if (preCommit != null) { 231 preCommit.run(); 232 } 233 Tx tx; 234 if (wasPrepared) { 235 tx = preparedTransactions.remove(txid); 236 } else { 237 tx = inflightTransactions.remove(txid); 238 } 239 240 if (tx == null) { 241 if (postCommit != null) { 242 postCommit.run(); 243 } 244 return; 245 } 246 // ensure message order w.r.t to cursor and store for setBatch() 247 synchronized (this) { 248 tx.commit(); 249 if (postCommit != null) { 250 postCommit.run(); 251 } 252 } 253 } 254 255 /** 256 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 257 */ 258 public void rollback(TransactionId txid) { 259 preparedTransactions.remove(txid); 260 inflightTransactions.remove(txid); 261 } 262 263 public void start() throws Exception { 264 } 265 266 public void stop() throws Exception { 267 } 268 269 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 270 // All the inflight transactions get rolled back.. 271 inflightTransactions.clear(); 272 this.doingRecover = true; 273 try { 274 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 275 Object txid = iter.next(); 276 Tx tx = preparedTransactions.get(txid); 277 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 278 } 279 } finally { 280 this.doingRecover = false; 281 } 282 } 283 284 /** 285 * @param message 286 * @throws IOException 287 */ 288 void addMessage(final MessageStore destination, final Message message) throws IOException { 289 290 if (doingRecover) { 291 return; 292 } 293 294 if (message.getTransactionId() != null) { 295 Tx tx = getTx(message.getTransactionId()); 296 tx.add(new AddMessageCommand() { 297 public Message getMessage() { 298 return message; 299 } 300 301 public void run(ConnectionContext ctx) throws IOException { 302 destination.addMessage(ctx, message); 303 } 304 305 }); 306 } else { 307 destination.addMessage(null, message); 308 } 309 } 310 311 /** 312 * @param ack 313 * @throws IOException 314 */ 315 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 316 if (doingRecover) { 317 return; 318 } 319 320 if (ack.isInTransaction()) { 321 Tx tx = getTx(ack.getTransactionId()); 322 tx.add(new RemoveMessageCommand() { 323 public MessageAck getMessageAck() { 324 return ack; 325 } 326 327 public void run(ConnectionContext ctx) throws IOException { 328 destination.removeMessage(ctx, ack); 329 } 330 }); 331 } else { 332 destination.removeMessage(null, ack); 333 } 334 } 335 336 final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, 337 final MessageId messageId, final MessageAck ack) throws IOException { 338 if (doingRecover) { 339 return; 340 } 341 342 if (ack.isInTransaction()) { 343 Tx tx = getTx(ack.getTransactionId()); 344 tx.add(new RemoveMessageCommand() { 345 public MessageAck getMessageAck() { 346 return ack; 347 } 348 349 public void run(ConnectionContext ctx) throws IOException { 350 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 351 } 352 }); 353 } else { 354 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 355 } 356 } 357 358 359 public void delete() { 360 inflightTransactions.clear(); 361 preparedTransactions.clear(); 362 doingRecover = false; 363 } 364 365}