001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.amq;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.LinkedHashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.Map.Entry;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.atomic.AtomicReference;
032import java.util.concurrent.locks.Lock;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.command.ActiveMQDestination;
035import org.apache.activemq.command.DataStructure;
036import org.apache.activemq.command.JournalQueueAck;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
041import org.apache.activemq.kaha.MessageAckWithLocation;
042import org.apache.activemq.kaha.impl.async.Location;
043import org.apache.activemq.store.AbstractMessageStore;
044import org.apache.activemq.store.MessageRecoveryListener;
045import org.apache.activemq.store.PersistenceAdapter;
046import org.apache.activemq.store.ReferenceStore;
047import org.apache.activemq.store.ReferenceStore.ReferenceData;
048import org.apache.activemq.thread.Task;
049import org.apache.activemq.thread.TaskRunner;
050import org.apache.activemq.transaction.Synchronization;
051import org.apache.activemq.usage.MemoryUsage;
052import org.apache.activemq.util.Callback;
053import org.apache.activemq.util.TransactionTemplate;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * A MessageStore that uses a Journal to store it's messages.
059 * 
060 * 
061 */
062public class AMQMessageStore extends AbstractMessageStore {
063    private static final Logger LOG = LoggerFactory.getLogger(AMQMessageStore.class);
064    protected final AMQPersistenceAdapter peristenceAdapter;
065    protected final AMQTransactionStore transactionStore;
066    protected final ReferenceStore referenceStore;
067    protected final TransactionTemplate transactionTemplate;
068    protected Location lastLocation;
069    protected Location lastWrittenLocation;
070    protected Set<Location> inFlightTxLocations = new HashSet<Location>();
071    protected final TaskRunner asyncWriteTask;
072    protected CountDownLatch flushLatch;
073    private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
074    private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
075    /** A MessageStore that we can use to retrieve messages quickly. */
076    private Map<MessageId, ReferenceData> cpAddedMessageIds;
077    private final boolean debug = LOG.isDebugEnabled();
078    private final AtomicReference<Location> mark = new AtomicReference<Location>();
079    protected final Lock lock;
080
081    public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
082        super(destination);
083        this.peristenceAdapter = adapter;
084        this.lock = referenceStore.getStoreLock();
085        this.transactionStore = adapter.getTransactionStore();
086        this.referenceStore = referenceStore;
087        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
088                new NonCachedMessageEvaluationContext()));
089        asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
090            public boolean iterate() {
091                asyncWrite();
092                return false;
093            }
094        }, "Checkpoint: " + destination);
095    }
096
097    public void setMemoryUsage(MemoryUsage memoryUsage) {
098        referenceStore.setMemoryUsage(memoryUsage);
099    }
100
101    /**
102     * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
103     * is doing.
104     */
105    public final void addMessage(ConnectionContext context, final Message message) throws IOException {
106        final MessageId id = message.getMessageId();
107        final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
108        if (!context.isInTransaction()) {
109            if (debug) {
110                LOG.debug("Journalled message add for: " + id + ", at: " + location);
111            }
112            this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
113            addMessage(message, location);
114        } else {
115            if (debug) {
116                LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
117            }
118            lock.lock();
119            try {
120                inFlightTxLocations.add(location);
121            } finally {
122                lock.unlock();
123            }
124            transactionStore.addMessage(this, message, location);
125            context.getTransaction().addSynchronization(new Synchronization() {
126                public void afterCommit() throws Exception {
127                    if (debug) {
128                        LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
129                    }
130                    lock.lock();
131                    try {
132                        inFlightTxLocations.remove(location);
133                    } finally {
134                        lock.unlock();
135                    }
136                    addMessage(message, location);
137                }
138
139                public void afterRollback() throws Exception {
140                    if (debug) {
141                        LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
142                    }
143                    lock.lock();
144                    try {
145                        inFlightTxLocations.remove(location);
146                    } finally {
147                        lock.unlock();
148                    }
149                }
150            });
151        }
152    }
153
154    final void addMessage(final Message message, final Location location) throws InterruptedIOException {
155        ReferenceData data = new ReferenceData();
156        data.setExpiration(message.getExpiration());
157        data.setFileId(location.getDataFileId());
158        data.setOffset(location.getOffset());
159        lock.lock();
160        try {
161            lastLocation = location;
162            ReferenceData prev = messages.put(message.getMessageId(), data);
163            if (prev != null) {
164                AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, prev.getFileId());
165            }
166        } finally {
167            lock.unlock();
168        }
169        if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
170            flush();
171        } else {
172            try {
173                asyncWriteTask.wakeup();
174            } catch (InterruptedException e) {
175                throw new InterruptedIOException();
176            }
177        }
178    }
179
180    public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
181        MessageId id = message.getMessageId();
182        try {
183            // Only add the message if it has not already been added.
184            ReferenceData data = referenceStore.getMessageReference(id);
185            if (data == null) {
186                data = new ReferenceData();
187                data.setExpiration(message.getExpiration());
188                data.setFileId(location.getDataFileId());
189                data.setOffset(location.getOffset());
190                referenceStore.addMessageReference(context, id, data);
191                return true;
192            }
193        } catch (Throwable e) {
194            LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: "
195                    + e, e);
196        }
197        return false;
198    }
199
200    /**
201     */
202    public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
203        JournalQueueAck remove = new JournalQueueAck();
204        remove.setDestination(destination);
205        remove.setMessageAck(ack);
206        final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
207        if (!context.isInTransaction()) {
208            if (debug) {
209                LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
210            }
211            removeMessage(ack, location);
212        } else {
213            if (debug) {
214                LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
215            }
216            lock.lock();
217            try {
218                inFlightTxLocations.add(location);
219            } finally {
220                lock.unlock();
221            }
222            transactionStore.removeMessage(this, ack, location);
223            context.getTransaction().addSynchronization(new Synchronization() {
224                public void afterCommit() throws Exception {
225                    if (debug) {
226                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
227                                + location);
228                    }
229                    lock.lock();
230                    try {
231                        inFlightTxLocations.remove(location);
232                    } finally {
233                        lock.unlock();
234                    }
235                    removeMessage(ack, location);
236                }
237
238                public void afterRollback() throws Exception {
239                    if (debug) {
240                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
241                                + location);
242                    }
243                    lock.lock();
244                    try {
245                        inFlightTxLocations.remove(location);
246                    } finally {
247                        lock.unlock();
248                    }
249                }
250            });
251        }
252    }
253
254    final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
255        ReferenceData data;
256        lock.lock();
257        try {
258            lastLocation = location;
259            MessageId id = ack.getLastMessageId();
260            data = messages.remove(id);
261            if (data == null) {
262                messageAcks.add(new MessageAckWithLocation(ack, location));
263            } else {
264                // message never got written so datafileReference will still exist
265                AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
266            }
267        } finally {
268            lock.unlock();
269        }
270        if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
271            flush();
272        } else if (data == null) {
273            try {
274                asyncWriteTask.wakeup();
275            } catch (InterruptedException e) {
276                throw new InterruptedIOException();
277            }
278        }
279    }
280
281    public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
282        try {
283            // Only remove the message if it has not already been removed.
284            ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
285            if (t != null) {
286                referenceStore.removeMessage(context, messageAck);
287                return true;
288            }
289        } catch (Throwable e) {
290            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
291                    + "'.  Message may have already been acknowledged. reason: " + e);
292        }
293        return false;
294    }
295
296    /**
297     * Waits till the lastest data has landed on the referenceStore
298     * 
299     * @throws InterruptedIOException
300     */
301    public void flush() throws InterruptedIOException {
302        if (LOG.isDebugEnabled()) {
303            LOG.debug("flush starting ...");
304        }
305        CountDownLatch countDown;
306        lock.lock();
307        try {
308            if (lastWrittenLocation == lastLocation) {
309                return;
310            }
311            if (flushLatch == null) {
312                flushLatch = new CountDownLatch(1);
313            }
314            countDown = flushLatch;
315        } finally {
316            lock.unlock();
317        }
318        try {
319            asyncWriteTask.wakeup();
320            countDown.await();
321        } catch (InterruptedException e) {
322            throw new InterruptedIOException();
323        }
324        if (LOG.isDebugEnabled()) {
325            LOG.debug("flush finished");
326        }
327    }
328
329    /**
330     * @return
331     * @throws IOException
332     */
333    synchronized void asyncWrite() {
334        try {
335            CountDownLatch countDown;
336            lock.lock();
337            try {
338                countDown = flushLatch;
339                flushLatch = null;
340            } finally {
341                lock.unlock();
342            }
343            mark.set(doAsyncWrite());
344            if (countDown != null) {
345                countDown.countDown();
346            }
347        } catch (IOException e) {
348            LOG.error("Checkpoint failed: " + e, e);
349        }
350    }
351
352    /**
353     * @return
354     * @throws IOException
355     */
356    protected Location doAsyncWrite() throws IOException {
357        final List<MessageAckWithLocation> cpRemovedMessageLocations;
358        final List<Location> cpActiveJournalLocations;
359        final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
360        final Location lastLocation;
361        // swap out the message hash maps..
362        lock.lock();
363        try {
364            cpAddedMessageIds = this.messages;
365            cpRemovedMessageLocations = this.messageAcks;
366            cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
367            this.messages = new LinkedHashMap<MessageId, ReferenceData>();
368            this.messageAcks = new ArrayList<MessageAckWithLocation>();
369            lastLocation = this.lastLocation;
370        } finally {
371            lock.unlock();
372        }
373        if (LOG.isDebugEnabled()) {
374            LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
375                    + cpRemovedMessageLocations.size() + " ");
376        }
377        transactionTemplate.run(new Callback() {
378            public void execute() throws Exception {
379                int size = 0;
380                PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
381                ConnectionContext context = transactionTemplate.getContext();
382                // Checkpoint the added messages.
383                Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
384                while (iterator.hasNext()) {
385                    Entry<MessageId, ReferenceData> entry = iterator.next();
386                    try {
387                        if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) {
388                            if (LOG.isDebugEnabled()) {
389                                LOG.debug("adding message ref:" + entry.getKey());
390                            }
391                            size++;
392                        } else {
393                            if (LOG.isDebugEnabled()) {
394                                LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue());
395                            }
396                        }
397                        AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
398                                .getValue().getFileId());
399                    } catch (Throwable e) {
400                        LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
401                    }
402                    
403                    // Commit the batch if it's getting too big
404                    if (size >= maxCheckpointMessageAddSize) {
405                        persitanceAdapter.commitTransaction(context);
406                        persitanceAdapter.beginTransaction(context);
407                        size = 0;
408                    }
409                }
410                persitanceAdapter.commitTransaction(context);
411                persitanceAdapter.beginTransaction(context);
412                // Checkpoint the removed messages.
413                for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
414                    try {
415                        referenceStore.removeMessage(transactionTemplate.getContext(), ack);
416                    } catch (Throwable e) {
417                        LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
418                    }
419                }
420            }
421        });
422        LOG.debug("Batch update done. lastLocation:" + lastLocation);
423        lock.lock();
424        try {
425            cpAddedMessageIds = null;
426            lastWrittenLocation = lastLocation;
427        } finally {
428            lock.unlock();
429        }
430        if (cpActiveJournalLocations.size() > 0) {
431            Collections.sort(cpActiveJournalLocations);
432            return cpActiveJournalLocations.get(0);
433        } else {
434            return lastLocation;
435        }
436    }
437
438    /**
439     * 
440     */
441    public Message getMessage(MessageId identity) throws IOException {
442        Location location = getLocation(identity);
443        if (location != null) {
444            DataStructure rc = peristenceAdapter.readCommand(location);
445            try {
446                return (Message) rc;
447            } catch (ClassCastException e) {
448                throw new IOException("Could not read message " + identity + " at location " + location
449                        + ", expected a message, but got: " + rc);
450            }
451        }
452        return null;
453    }
454
455    protected Location getLocation(MessageId messageId) throws IOException {
456        ReferenceData data = null;
457        lock.lock();
458        try {
459            // Is it still in flight???
460            data = messages.get(messageId);
461            if (data == null && cpAddedMessageIds != null) {
462                data = cpAddedMessageIds.get(messageId);
463            }
464        } finally {
465            lock.unlock();
466        }
467        if (data == null) {
468            data = referenceStore.getMessageReference(messageId);
469            if (data == null) {
470                return null;
471            }
472        }
473        Location location = new Location();
474        location.setDataFileId(data.getFileId());
475        location.setOffset(data.getOffset());
476        return location;
477    }
478
479    /**
480     * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
481     * transaction log and then the cache is updated.
482     * 
483     * @param listener
484     * @throws Exception
485     */
486    public void recover(final MessageRecoveryListener listener) throws Exception {
487        flush();
488        referenceStore.recover(new RecoveryListenerAdapter(this, listener));
489    }
490
491    public void start() throws Exception {
492        referenceStore.start();
493    }
494
495    public void stop() throws Exception {
496        flush();
497        asyncWriteTask.shutdown();
498        referenceStore.stop();
499    }
500
501    /**
502     * @return Returns the longTermStore.
503     */
504    public ReferenceStore getReferenceStore() {
505        return referenceStore;
506    }
507
508    /**
509     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
510     */
511    public void removeAllMessages(ConnectionContext context) throws IOException {
512        flush();
513        referenceStore.removeAllMessages(context);
514    }
515
516    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
517            String messageRef) throws IOException {
518        throw new IOException("The journal does not support message references.");
519    }
520
521    public String getMessageReference(MessageId identity) throws IOException {
522        throw new IOException("The journal does not support message references.");
523    }
524
525    /**
526     * @return
527     * @throws IOException
528     * @see org.apache.activemq.store.MessageStore#getMessageCount()
529     */
530    public int getMessageCount() throws IOException {
531        flush();
532        return referenceStore.getMessageCount();
533    }
534
535    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
536        RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
537        referenceStore.recoverNextMessages(maxReturned, recoveryListener);
538        if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
539            flush();
540            referenceStore.recoverNextMessages(maxReturned, recoveryListener);
541        }
542    }
543
544    Message getMessage(ReferenceData data) throws IOException {
545        Location location = new Location();
546        location.setDataFileId(data.getFileId());
547        location.setOffset(data.getOffset());
548        DataStructure rc = peristenceAdapter.readCommand(location);
549        try {
550            return (Message) rc;
551        } catch (ClassCastException e) {
552            throw new IOException("Could not read message  at location " + location + ", expected a message, but got: "
553                    + rc);
554        }
555    }
556
557    public void resetBatching() {
558        referenceStore.resetBatching();
559    }
560
561    public Location getMark() {
562        return mark.get();
563    }
564
565    public void dispose(ConnectionContext context) {
566        try {
567            flush();
568        } catch (InterruptedIOException e) {
569            Thread.currentThread().interrupt();
570        }
571        referenceStore.dispose(context);
572        super.dispose(context);
573    }
574
575    public void setBatch(MessageId messageId) {
576        try {
577            flush();
578        } catch (InterruptedIOException e) {
579            LOG.debug("flush on setBatch resulted in exception", e);
580        }
581        getReferenceStore().setBatch(messageId);
582    }
583
584}