001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028
029import org.apache.activeio.journal.RecordLocation;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.JournalQueueAck;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageId;
036import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
037import org.apache.activemq.store.MessageRecoveryListener;
038import org.apache.activemq.store.MessageStore;
039import org.apache.activemq.store.PersistenceAdapter;
040import org.apache.activemq.store.AbstractMessageStore;
041import org.apache.activemq.transaction.Synchronization;
042import org.apache.activemq.usage.MemoryUsage;
043import org.apache.activemq.usage.SystemUsage;
044import org.apache.activemq.util.Callback;
045import org.apache.activemq.util.TransactionTemplate;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A MessageStore that uses a Journal to store it's messages.
051 * 
052 * 
053 */
054public class JournalMessageStore extends AbstractMessageStore {
055
056    private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
057
058    protected final JournalPersistenceAdapter peristenceAdapter;
059    protected final JournalTransactionStore transactionStore;
060    protected final MessageStore longTermStore;
061    protected final TransactionTemplate transactionTemplate;
062    protected RecordLocation lastLocation;
063    protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
064
065    private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
066    private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
067
068    /** A MessageStore that we can use to retrieve messages quickly. */
069    private Map<MessageId, Message> cpAddedMessageIds;
070
071
072    private MemoryUsage memoryUsage;
073
074    public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
075        super(destination);
076        this.peristenceAdapter = adapter;
077        this.transactionStore = adapter.getTransactionStore();
078        this.longTermStore = checkpointStore;
079        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
080    }
081
082    
083    public void setMemoryUsage(MemoryUsage memoryUsage) {
084        this.memoryUsage=memoryUsage;
085        longTermStore.setMemoryUsage(memoryUsage);
086    }
087
088    /**
089     * Not synchronized since the Journal has better throughput if you increase
090     * the number of concurrent writes that it is doing.
091     */
092    public void addMessage(ConnectionContext context, final Message message) throws IOException {
093
094        final MessageId id = message.getMessageId();
095
096        final boolean debug = LOG.isDebugEnabled();
097        message.incrementReferenceCount();
098
099        final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
100        if (!context.isInTransaction()) {
101            if (debug) {
102                LOG.debug("Journalled message add for: " + id + ", at: " + location);
103            }
104            addMessage(message, location);
105        } else {
106            if (debug) {
107                LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
108            }
109            synchronized (this) {
110                inFlightTxLocations.add(location);
111            }
112            transactionStore.addMessage(this, message, location);
113            context.getTransaction().addSynchronization(new Synchronization() {
114                public void afterCommit() throws Exception {
115                    if (debug) {
116                        LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
117                    }
118                    synchronized (JournalMessageStore.this) {
119                        inFlightTxLocations.remove(location);
120                        addMessage(message, location);
121                    }
122                }
123
124                public void afterRollback() throws Exception {
125                    if (debug) {
126                        LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
127                    }
128                    synchronized (JournalMessageStore.this) {
129                        inFlightTxLocations.remove(location);
130                    }
131                    message.decrementReferenceCount();
132                }
133            });
134        }
135    }
136
137    void addMessage(final Message message, final RecordLocation location) {
138        synchronized (this) {
139            lastLocation = location;
140            MessageId id = message.getMessageId();
141            messages.put(id, message);
142        }
143    }
144
145    public void replayAddMessage(ConnectionContext context, Message message) {
146        try {
147            // Only add the message if it has not already been added.
148            Message t = longTermStore.getMessage(message.getMessageId());
149            if (t == null) {
150                longTermStore.addMessage(context, message);
151            }
152        } catch (Throwable e) {
153            LOG.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
154        }
155    }
156
157    /**
158     */
159    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
160        final boolean debug = LOG.isDebugEnabled();
161        JournalQueueAck remove = new JournalQueueAck();
162        remove.setDestination(destination);
163        remove.setMessageAck(ack);
164
165        final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
166        if (!context.isInTransaction()) {
167            if (debug) {
168                LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
169            }
170            removeMessage(ack, location);
171        } else {
172            if (debug) {
173                LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
174            }
175            synchronized (this) {
176                inFlightTxLocations.add(location);
177            }
178            transactionStore.removeMessage(this, ack, location);
179            context.getTransaction().addSynchronization(new Synchronization() {
180                public void afterCommit() throws Exception {
181                    if (debug) {
182                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
183                    }
184                    synchronized (JournalMessageStore.this) {
185                        inFlightTxLocations.remove(location);
186                        removeMessage(ack, location);
187                    }
188                }
189
190                public void afterRollback() throws Exception {
191                    if (debug) {
192                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
193                    }
194                    synchronized (JournalMessageStore.this) {
195                        inFlightTxLocations.remove(location);
196                    }
197                }
198            });
199
200        }
201    }
202
203    final void removeMessage(final MessageAck ack, final RecordLocation location) {
204        synchronized (this) {
205            lastLocation = location;
206            MessageId id = ack.getLastMessageId();
207            Message message = messages.remove(id);
208            if (message == null) {
209                messageAcks.add(ack);
210            } else {
211                message.decrementReferenceCount();
212            }
213        }
214    }
215
216    public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
217        try {
218            // Only remove the message if it has not already been removed.
219            Message t = longTermStore.getMessage(messageAck.getLastMessageId());
220            if (t != null) {
221                longTermStore.removeMessage(context, messageAck);
222            }
223        } catch (Throwable e) {
224            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
225        }
226    }
227
228    /**
229     * @return
230     * @throws IOException
231     */
232    public RecordLocation checkpoint() throws IOException {
233        return checkpoint(null);
234    }
235
236    /**
237     * @return
238     * @throws IOException
239     */
240    @SuppressWarnings("unchecked")
241    public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
242
243        final List<MessageAck> cpRemovedMessageLocations;
244        final List<RecordLocation> cpActiveJournalLocations;
245        final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
246
247        // swap out the message hash maps..
248        synchronized (this) {
249            cpAddedMessageIds = this.messages;
250            cpRemovedMessageLocations = this.messageAcks;
251
252            cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
253
254            this.messages = new LinkedHashMap<MessageId, Message>();
255            this.messageAcks = new ArrayList<MessageAck>();
256        }
257
258        transactionTemplate.run(new Callback() {
259            public void execute() throws Exception {
260
261                int size = 0;
262
263                PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
264                ConnectionContext context = transactionTemplate.getContext();
265
266                // Checkpoint the added messages.
267                synchronized (JournalMessageStore.this) {
268                    Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
269                    while (iterator.hasNext()) {
270                        Message message = iterator.next();
271                        try {
272                            longTermStore.addMessage(context, message);
273                        } catch (Throwable e) {
274                            LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
275                        }
276                        size += message.getSize();
277                        message.decrementReferenceCount();
278                        // Commit the batch if it's getting too big
279                        if (size >= maxCheckpointMessageAddSize) {
280                            persitanceAdapter.commitTransaction(context);
281                            persitanceAdapter.beginTransaction(context);
282                            size = 0;
283                        }
284                    }
285                }
286
287                persitanceAdapter.commitTransaction(context);
288                persitanceAdapter.beginTransaction(context);
289
290                // Checkpoint the removed messages.
291                Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
292                while (iterator.hasNext()) {
293                    try {
294                        MessageAck ack = iterator.next();
295                        longTermStore.removeMessage(transactionTemplate.getContext(), ack);
296                    } catch (Throwable e) {
297                        LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
298                    }
299                }
300
301                if (postCheckpointTest != null) {
302                    postCheckpointTest.execute();
303                }
304            }
305
306        });
307
308        synchronized (this) {
309            cpAddedMessageIds = null;
310        }
311
312        if (cpActiveJournalLocations.size() > 0) {
313            Collections.sort(cpActiveJournalLocations);
314            return cpActiveJournalLocations.get(0);
315        }
316        synchronized (this) {
317            return lastLocation;
318        }
319    }
320
321    /**
322     * 
323     */
324    public Message getMessage(MessageId identity) throws IOException {
325        Message answer = null;
326
327        synchronized (this) {
328            // Do we have a still have it in the journal?
329            answer = messages.get(identity);
330            if (answer == null && cpAddedMessageIds != null) {
331                answer = cpAddedMessageIds.get(identity);
332            }
333        }
334
335        if (answer != null) {
336            return answer;
337        }
338
339        // If all else fails try the long term message store.
340        return longTermStore.getMessage(identity);
341    }
342
343    /**
344     * Replays the checkpointStore first as those messages are the oldest ones,
345     * then messages are replayed from the transaction log and then the cache is
346     * updated.
347     * 
348     * @param listener
349     * @throws Exception
350     */
351    public void recover(final MessageRecoveryListener listener) throws Exception {
352        peristenceAdapter.checkpoint(true, true);
353        longTermStore.recover(listener);
354    }
355
356    public void start() throws Exception {
357        if (this.memoryUsage != null) {
358            this.memoryUsage.addUsageListener(peristenceAdapter);
359        }
360        longTermStore.start();
361    }
362
363    public void stop() throws Exception {
364        longTermStore.stop();
365        if (this.memoryUsage != null) {
366            this.memoryUsage.removeUsageListener(peristenceAdapter);
367        }
368    }
369
370    /**
371     * @return Returns the longTermStore.
372     */
373    public MessageStore getLongTermMessageStore() {
374        return longTermStore;
375    }
376
377    /**
378     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
379     */
380    public void removeAllMessages(ConnectionContext context) throws IOException {
381        peristenceAdapter.checkpoint(true, true);
382        longTermStore.removeAllMessages(context);
383    }
384
385    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
386        throw new IOException("The journal does not support message references.");
387    }
388
389    public String getMessageReference(MessageId identity) throws IOException {
390        throw new IOException("The journal does not support message references.");
391    }
392
393    /**
394     * @return
395     * @throws IOException
396     * @see org.apache.activemq.store.MessageStore#getMessageCount()
397     */
398    public int getMessageCount() throws IOException {
399        peristenceAdapter.checkpoint(true, true);
400        return longTermStore.getMessageCount();
401    }
402
403    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
404        peristenceAdapter.checkpoint(true, true);
405        longTermStore.recoverNextMessages(maxReturned, listener);
406
407    }
408
409    public void resetBatching() {
410        longTermStore.resetBatching();
411
412    }
413
414    @Override
415    public void setBatch(MessageId messageId) throws Exception {
416        peristenceAdapter.checkpoint(true, true);
417        longTermStore.setBatch(messageId);
418    }
419
420}