001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedHashMap; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageAck; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.command.TransactionId; 033import org.apache.activemq.command.XATransactionId; 034import org.apache.activemq.store.InlineListenableFuture; 035import org.apache.activemq.store.ListenableFuture; 036import org.apache.activemq.store.MessageStore; 037import org.apache.activemq.store.PersistenceAdapter; 038import org.apache.activemq.store.ProxyMessageStore; 039import org.apache.activemq.store.ProxyTopicMessageStore; 040import org.apache.activemq.store.TopicMessageStore; 041import org.apache.activemq.store.TransactionRecoveryListener; 042import org.apache.activemq.store.TransactionStore; 043 044/** 045 * Provides a TransactionStore implementation that can create transaction aware 046 * MessageStore objects from non transaction aware MessageStore objects. 047 */ 048public class MemoryTransactionStore implements TransactionStore { 049 050 protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 051 protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>()); 052 protected final PersistenceAdapter persistenceAdapter; 053 054 private boolean doingRecover; 055 056 public class Tx { 057 058 public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 059 060 public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 061 062 public void add(AddMessageCommand msg) { 063 messages.add(msg); 064 } 065 066 public void add(RemoveMessageCommand ack) { 067 acks.add(ack); 068 } 069 070 public Message[] getMessages() { 071 Message rc[] = new Message[messages.size()]; 072 int count = 0; 073 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 074 AddMessageCommand cmd = iter.next(); 075 rc[count++] = cmd.getMessage(); 076 } 077 return rc; 078 } 079 080 public MessageAck[] getAcks() { 081 MessageAck rc[] = new MessageAck[acks.size()]; 082 int count = 0; 083 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 084 RemoveMessageCommand cmd = iter.next(); 085 rc[count++] = cmd.getMessageAck(); 086 } 087 return rc; 088 } 089 090 /** 091 * @throws IOException 092 */ 093 public void commit() throws IOException { 094 ConnectionContext ctx = new ConnectionContext(); 095 persistenceAdapter.beginTransaction(ctx); 096 try { 097 098 // Do all the message adds. 099 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 100 AddMessageCommand cmd = iter.next(); 101 cmd.run(ctx); 102 } 103 // And removes.. 104 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 105 RemoveMessageCommand cmd = iter.next(); 106 cmd.run(ctx); 107 } 108 109 } catch (IOException e) { 110 persistenceAdapter.rollbackTransaction(ctx); 111 throw e; 112 } 113 persistenceAdapter.commitTransaction(ctx); 114 } 115 } 116 117 public interface AddMessageCommand { 118 Message getMessage(); 119 120 MessageStore getMessageStore(); 121 122 void run(ConnectionContext context) throws IOException; 123 124 void setMessageStore(MessageStore messageStore); 125 } 126 127 public interface RemoveMessageCommand { 128 MessageAck getMessageAck(); 129 130 void run(ConnectionContext context) throws IOException; 131 132 MessageStore getMessageStore(); 133 } 134 135 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { 136 this.persistenceAdapter = persistenceAdapter; 137 } 138 139 public MessageStore proxy(MessageStore messageStore) { 140 ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) { 141 @Override 142 public void addMessage(ConnectionContext context, final Message send) throws IOException { 143 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 144 } 145 146 @Override 147 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 148 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 149 } 150 151 @Override 152 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 153 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 154 return new InlineListenableFuture(); 155 } 156 157 @Override 158 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException { 159 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 160 return new InlineListenableFuture(); 161 } 162 163 @Override 164 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 165 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 166 } 167 168 @Override 169 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 170 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 171 } 172 }; 173 onProxyQueueStore(proxyMessageStore); 174 return proxyMessageStore; 175 } 176 177 protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) { 178 } 179 180 public TopicMessageStore proxy(TopicMessageStore messageStore) { 181 ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) { 182 @Override 183 public void addMessage(ConnectionContext context, final Message send) throws IOException { 184 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 185 } 186 187 @Override 188 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 189 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 190 } 191 192 @Override 193 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 194 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 195 return new InlineListenableFuture(); 196 } 197 198 @Override 199 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 200 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 201 return new InlineListenableFuture(); 202 } 203 204 @Override 205 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 206 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 207 } 208 209 @Override 210 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 211 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 212 } 213 214 @Override 215 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) 216 throws IOException { 217 MemoryTransactionStore.this.acknowledge((TopicMessageStore) getDelegate(), clientId, subscriptionName, messageId, ack); 218 } 219 }; 220 onProxyTopicStore(proxyTopicMessageStore); 221 return proxyTopicMessageStore; 222 } 223 224 protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) { 225 } 226 227 /** 228 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 229 */ 230 @Override 231 public void prepare(TransactionId txid) throws IOException { 232 Tx tx = inflightTransactions.remove(txid); 233 if (tx == null) { 234 return; 235 } 236 preparedTransactions.put(txid, tx); 237 } 238 239 public Tx getTx(Object txid) { 240 Tx tx = inflightTransactions.get(txid); 241 if (tx == null) { 242 tx = new Tx(); 243 inflightTransactions.put(txid, tx); 244 } 245 return tx; 246 } 247 248 public Tx getPreparedTx(TransactionId txid) { 249 Tx tx = preparedTransactions.get(txid); 250 if (tx == null) { 251 tx = new Tx(); 252 preparedTransactions.put(txid, tx); 253 } 254 return tx; 255 } 256 257 @Override 258 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException { 259 if (preCommit != null) { 260 preCommit.run(); 261 } 262 Tx tx; 263 if (wasPrepared) { 264 tx = preparedTransactions.remove(txid); 265 } else { 266 tx = inflightTransactions.remove(txid); 267 } 268 269 if (tx != null) { 270 tx.commit(); 271 } 272 if (postCommit != null) { 273 postCommit.run(); 274 } 275 } 276 277 /** 278 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 279 */ 280 @Override 281 public void rollback(TransactionId txid) throws IOException { 282 preparedTransactions.remove(txid); 283 inflightTransactions.remove(txid); 284 } 285 286 @Override 287 public void start() throws Exception { 288 } 289 290 @Override 291 public void stop() throws Exception { 292 } 293 294 @Override 295 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 296 // All the inflight transactions get rolled back.. 297 inflightTransactions.clear(); 298 this.doingRecover = true; 299 try { 300 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 301 Object txid = iter.next(); 302 Tx tx = preparedTransactions.get(txid); 303 listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks()); 304 onRecovered(tx); 305 } 306 } finally { 307 this.doingRecover = false; 308 } 309 } 310 311 protected void onRecovered(Tx tx) { 312 } 313 314 /** 315 * @param message 316 * @throws IOException 317 */ 318 void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException { 319 320 if (doingRecover) { 321 return; 322 } 323 324 if (message.getTransactionId() != null) { 325 Tx tx = getTx(message.getTransactionId()); 326 tx.add(new AddMessageCommand() { 327 @SuppressWarnings("unused") 328 MessageStore messageStore = destination; 329 330 @Override 331 public Message getMessage() { 332 return message; 333 } 334 335 @Override 336 public MessageStore getMessageStore() { 337 return destination; 338 } 339 340 @Override 341 public void run(ConnectionContext ctx) throws IOException { 342 destination.addMessage(ctx, message); 343 } 344 345 @Override 346 public void setMessageStore(MessageStore messageStore) { 347 this.messageStore = messageStore; 348 } 349 350 }); 351 } else { 352 destination.addMessage(context, message); 353 } 354 } 355 356 /** 357 * @param ack 358 * @throws IOException 359 */ 360 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 361 if (doingRecover) { 362 return; 363 } 364 365 if (ack.isInTransaction()) { 366 Tx tx = getTx(ack.getTransactionId()); 367 tx.add(new RemoveMessageCommand() { 368 @Override 369 public MessageAck getMessageAck() { 370 return ack; 371 } 372 373 @Override 374 public void run(ConnectionContext ctx) throws IOException { 375 destination.removeMessage(ctx, ack); 376 } 377 378 @Override 379 public MessageStore getMessageStore() { 380 return destination; 381 } 382 }); 383 } else { 384 destination.removeMessage(null, ack); 385 } 386 } 387 388 public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, final MessageId messageId, 389 final MessageAck ack) throws IOException { 390 if (doingRecover) { 391 return; 392 } 393 394 if (ack.isInTransaction()) { 395 Tx tx = getTx(ack.getTransactionId()); 396 tx.add(new RemoveMessageCommand() { 397 @Override 398 public MessageAck getMessageAck() { 399 return ack; 400 } 401 402 @Override 403 public void run(ConnectionContext ctx) throws IOException { 404 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 405 } 406 407 @Override 408 public MessageStore getMessageStore() { 409 return destination; 410 } 411 }); 412 } else { 413 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 414 } 415 } 416 417 public void delete() { 418 inflightTransactions.clear(); 419 preparedTransactions.clear(); 420 doingRecover = false; 421 } 422}