001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.Map;
022import java.util.Map.Entry;
023import java.util.concurrent.ConcurrentHashMap;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.BrokerServiceAware;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.command.TransactionId;
032import org.apache.activemq.command.XATransactionId;
033import org.apache.activemq.kaha.RuntimeStoreException;
034import org.apache.activemq.store.MessageStore;
035import org.apache.activemq.store.ProxyMessageStore;
036import org.apache.activemq.store.ProxyTopicMessageStore;
037import org.apache.activemq.store.TopicMessageStore;
038import org.apache.activemq.store.TransactionRecoveryListener;
039import org.apache.activemq.store.TransactionStore;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Provides a TransactionStore implementation that can create transaction aware
045 * MessageStore objects from non transaction aware MessageStore objects.
046 *
047 *
048 */
049public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
050    private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
051
052    private final Map transactions = new ConcurrentHashMap();
053    private final Map prepared;
054    private final KahaPersistenceAdapter adaptor;
055
056    private BrokerService brokerService;
057
058    KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
059        this.adaptor = adaptor;
060        this.prepared = preparedMap;
061    }
062
063    public MessageStore proxy(MessageStore messageStore) {
064        return new ProxyMessageStore(messageStore) {
065            @Override
066            public void addMessage(ConnectionContext context, final Message send) throws IOException {
067                KahaTransactionStore.this.addMessage(getDelegate(), send);
068            }
069
070            @Override
071            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
072                KahaTransactionStore.this.addMessage(getDelegate(), send);
073            }
074
075            @Override
076            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
077                KahaTransactionStore.this.removeMessage(getDelegate(), ack);
078            }
079        };
080    }
081
082    public TopicMessageStore proxy(TopicMessageStore messageStore) {
083        return new ProxyTopicMessageStore(messageStore) {
084            @Override
085            public void addMessage(ConnectionContext context, final Message send) throws IOException {
086                KahaTransactionStore.this.addMessage(getDelegate(), send);
087            }
088
089            @Override
090            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
091                KahaTransactionStore.this.removeMessage(getDelegate(), ack);
092            }
093
094            @Override
095            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
096                            MessageId messageId, MessageAck ack) throws IOException {
097                KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
098            }
099        };
100    }
101
102    /**
103     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
104     */
105    public void prepare(TransactionId txid) {
106        KahaTransaction tx = getTx(txid);
107        if (tx != null) {
108            tx.prepare();
109            prepared.put(txid, tx);
110        }
111    }
112
113    public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
114        if(before != null) {
115            before.run();
116        }
117        KahaTransaction tx = getTx(txid);
118        if (tx != null) {
119            tx.commit(this);
120            removeTx(txid);
121        }
122        if (after != null) {
123            after.run();
124        }
125    }
126
127    /**
128     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
129     */
130    public void rollback(TransactionId txid) {
131        KahaTransaction tx = getTx(txid);
132        if (tx != null) {
133            tx.rollback();
134            removeTx(txid);
135        }
136    }
137
138    public void start() throws Exception {
139    }
140
141    public void stop() throws Exception {
142    }
143
144    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
145        for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
146            Map.Entry entry = (Entry)i.next();
147            XATransactionId xid = (XATransactionId)entry.getKey();
148            KahaTransaction kt = (KahaTransaction)entry.getValue();
149            listener.recover(xid, kt.getMessages(), kt.getAcks());
150        }
151    }
152
153    /**
154     * @param message
155     * @throws IOException
156     */
157    void addMessage(final MessageStore destination, final Message message) throws IOException {
158        try {
159            if (message.isInTransaction()) {
160                KahaTransaction tx = getOrCreateTx(message.getTransactionId());
161                tx.add((KahaMessageStore)destination, message);
162            } else {
163                destination.addMessage(null, message);
164            }
165        } catch (RuntimeStoreException rse) {
166            if (rse.getCause() instanceof IOException) {
167                brokerService.handleIOException((IOException)rse.getCause());
168            }
169            throw rse;
170        }
171    }
172
173    /**
174     * @param ack
175     * @throws IOException
176     */
177    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
178        try {
179            if (ack.isInTransaction()) {
180                KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
181                tx.add((KahaMessageStore)destination, ack);
182            } else {
183                destination.removeMessage(null, ack);
184            }
185        } catch (RuntimeStoreException rse) {
186            if (rse.getCause() instanceof IOException) {
187                brokerService.handleIOException((IOException)rse.getCause());
188            }
189            throw rse;
190        }
191    }
192
193    final void acknowledge(final TopicMessageStore destination, String clientId,
194                           String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
195        try {
196            if (ack.isInTransaction()) {
197                KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
198                tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
199            } else {
200                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
201            }
202        } catch (RuntimeStoreException rse) {
203            if (rse.getCause() instanceof IOException) {
204                brokerService.handleIOException((IOException)rse.getCause());
205            }
206            throw rse;
207        }
208    }
209
210    protected synchronized KahaTransaction getTx(TransactionId key) {
211        KahaTransaction result = (KahaTransaction)transactions.get(key);
212        if (result == null) {
213            result = (KahaTransaction)prepared.get(key);
214        }
215        return result;
216    }
217
218    protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
219        KahaTransaction result = (KahaTransaction)transactions.get(key);
220        if (result == null) {
221            result = new KahaTransaction();
222            transactions.put(key, result);
223        }
224        return result;
225    }
226
227    protected synchronized void removeTx(TransactionId key) {
228        transactions.remove(key);
229        prepared.remove(key);
230    }
231
232    public void delete() {
233        transactions.clear();
234        prepared.clear();
235    }
236
237    protected MessageStore getStoreById(Object id) {
238        return adaptor.retrieveMessageStore(id);
239    }
240
241    public void setBrokerService(BrokerService brokerService) {
242        this.brokerService = brokerService;
243    }
244}