001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.IOException;
020import org.apache.activemq.broker.ConnectionContext;
021import org.apache.activemq.command.ActiveMQDestination;
022import org.apache.activemq.command.Message;
023import org.apache.activemq.command.MessageAck;
024import org.apache.activemq.command.MessageId;
025import org.apache.activemq.kaha.MapContainer;
026import org.apache.activemq.kaha.StoreEntry;
027import org.apache.activemq.store.MessageRecoveryListener;
028import org.apache.activemq.store.MessageStore;
029import org.apache.activemq.store.AbstractMessageStore;
030import org.apache.activemq.usage.MemoryUsage;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * An implementation of {@link org.apache.activemq.store.MessageStore} which
035 * uses a JPS Container
036 * 
037 * 
038 */
039public class KahaMessageStore extends AbstractMessageStore {
040
041    protected final MapContainer<MessageId, Message> messageContainer;
042    protected StoreEntry batchEntry;
043
044    public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
045        throws IOException {
046        super(destination);
047        this.messageContainer = container;
048    }
049
050    protected MessageId getMessageId(Object object) {
051        return ((Message)object).getMessageId();
052    }
053
054    public Object getId() {
055        return messageContainer.getId();
056    }
057
058    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
059        messageContainer.put(message.getMessageId(), message);
060        // TODO: we should do the following but it is not need if the message is
061        // being added within a persistence
062        // transaction
063        // but since I can't tell if one is running right now.. I'll leave this
064        // out for now.
065        // if( message.isResponseRequired() ) {
066        // messageContainer.force();
067        // }
068    }
069
070    public synchronized Message getMessage(MessageId identity) throws IOException {
071        Message result = messageContainer.get(identity);
072        return result;
073    }
074
075    protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
076        listener.recoverMessage(msg);
077        return listener.hasSpace();
078    }
079
080    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
081        removeMessage(ack.getLastMessageId());
082    }
083
084    public synchronized void removeMessage(MessageId msgId) throws IOException {
085        StoreEntry entry = messageContainer.getEntry(msgId);
086        if (entry != null) {
087            messageContainer.remove(entry);
088            if (messageContainer.isEmpty() || (batchEntry != null && batchEntry.equals(entry))) {
089                resetBatching();
090            }
091        }
092    }
093
094    public synchronized void recover(MessageRecoveryListener listener) throws Exception {
095        for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
096            .getNext(entry)) {
097            Message msg = (Message)messageContainer.getValue(entry);
098            if (!recoverMessage(listener, msg)) {
099                break;
100            }
101        }
102    }
103
104    public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
105        messageContainer.clear();
106    }
107
108    public synchronized void delete() {
109        messageContainer.clear();
110    }
111
112    /**
113     * @return the number of messages held by this destination
114     * @see org.apache.activemq.store.MessageStore#getMessageCount()
115     */
116    public int getMessageCount() {
117        return messageContainer.size();
118    }
119
120    /**
121     * @param id
122     * @return null
123     * @throws Exception
124     * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
125     */
126    public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception {
127        return null;
128    }
129
130    /**
131     * @param lastMessageId
132     * @param maxReturned
133     * @param listener
134     * @throws Exception
135     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId,
136     *      int, org.apache.activemq.store.MessageRecoveryListener)
137     */
138    public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
139        throws Exception {
140        StoreEntry entry = batchEntry;
141        if (entry == null) {
142            entry = messageContainer.getFirst();
143        } else {
144            entry = messageContainer.refresh(entry);
145            entry = messageContainer.getNext(entry);
146            if (entry == null) {
147                batchEntry = null;
148            }
149        }
150        if (entry != null) {
151            int count = 0;
152            do {
153                Message msg = messageContainer.getValue(entry);
154                if (msg != null) {
155                    recoverMessage(listener, msg);
156                    count++;
157                }
158                batchEntry = entry;
159                entry = messageContainer.getNext(entry);
160            } while (entry != null && count < maxReturned && listener.hasSpace());
161        }
162    }
163
164    /**
165     * @param nextToDispatch
166     * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
167     */
168    public synchronized void resetBatching() {
169        batchEntry = null;
170    }
171
172    /**
173     * @return true if the store supports cursors
174     */
175    public boolean isSupportForCursors() {
176        return true;
177    }
178
179    @Override
180    public void setBatch(MessageId messageId) {
181        batchEntry = messageContainer.getEntry(messageId);
182    }
183    
184}