001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.ActiveMQQueue;
029import org.apache.activemq.command.ActiveMQTopic;
030import org.apache.activemq.command.ProducerId;
031import org.apache.activemq.store.MessageStore;
032import org.apache.activemq.store.PersistenceAdapter;
033import org.apache.activemq.store.ProxyMessageStore;
034import org.apache.activemq.store.TopicMessageStore;
035import org.apache.activemq.store.TransactionStore;
036import org.apache.activemq.usage.SystemUsage;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * @org.apache.xbean.XBean
042 * 
043 */
044public class MemoryPersistenceAdapter implements PersistenceAdapter {
045    private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
046
047    MemoryTransactionStore transactionStore;
048    ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
049    ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
050    private boolean useExternalMessageReferences;
051
052    public Set<ActiveMQDestination> getDestinations() {
053        Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
054        for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
055            rc.add(iter.next());
056        }
057        for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) {
058            rc.add(iter.next());
059        }
060        return rc;
061    }
062
063    public static MemoryPersistenceAdapter newInstance(File file) {
064        return new MemoryPersistenceAdapter();
065    }
066
067    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
068        MessageStore rc = queues.get(destination);
069        if (rc == null) {
070            rc = new MemoryMessageStore(destination);
071            if (transactionStore != null) {
072                rc = transactionStore.proxy(rc);
073            }
074            queues.put(destination, rc);
075        }
076        return rc;
077    }
078
079    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
080        TopicMessageStore rc = topics.get(destination);
081        if (rc == null) {
082            rc = new MemoryTopicMessageStore(destination);
083            if (transactionStore != null) {
084                rc = transactionStore.proxy(rc);
085            }
086            topics.put(destination, rc);
087        }
088        return rc;
089    }
090
091    /**
092     * Cleanup method to remove any state associated with the given destination
093     *
094     * @param destination Destination to forget
095     */
096    public void removeQueueMessageStore(ActiveMQQueue destination) {
097        queues.remove(destination);
098    }
099
100    /**
101     * Cleanup method to remove any state associated with the given destination
102     *
103     * @param destination Destination to forget
104     */
105    public void removeTopicMessageStore(ActiveMQTopic destination) {
106        topics.remove(destination);
107    }
108
109    public TransactionStore createTransactionStore() throws IOException {
110        if (transactionStore == null) {
111            transactionStore = new MemoryTransactionStore(this);
112        }
113        return transactionStore;
114    }
115
116    public void beginTransaction(ConnectionContext context) {
117    }
118
119    public void commitTransaction(ConnectionContext context) {
120    }
121
122    public void rollbackTransaction(ConnectionContext context) {
123    }
124
125    public void start() throws Exception {
126    }
127
128    public void stop() throws Exception {
129    }
130
131    public long getLastMessageBrokerSequenceId() throws IOException {
132        return 0;
133    }
134
135    public void deleteAllMessages() throws IOException {
136        for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
137            MemoryMessageStore store = asMemoryMessageStore(iter.next());
138            if (store != null) {
139                store.delete();
140            }
141        }
142        for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) {
143            MemoryMessageStore store = asMemoryMessageStore(iter.next());
144            if (store != null) {
145                store.delete();
146            }
147        }
148
149        if (transactionStore != null) {
150            transactionStore.delete();
151        }
152    }
153
154    public boolean isUseExternalMessageReferences() {
155        return useExternalMessageReferences;
156    }
157
158    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
159        this.useExternalMessageReferences = useExternalMessageReferences;
160    }
161
162    protected MemoryMessageStore asMemoryMessageStore(Object value) {
163        if (value instanceof MemoryMessageStore) {
164            return (MemoryMessageStore)value;
165        }
166        if (value instanceof ProxyMessageStore) {
167            MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
168            if (delegate instanceof MemoryMessageStore) {
169                return (MemoryMessageStore) delegate;
170            }
171        }
172        LOG.warn("Expected an instance of MemoryMessageStore but was: " + value);
173        return null;
174    }
175
176    /**
177     * @param usageManager The UsageManager that is controlling the broker's
178     *                memory usage.
179     */
180    public void setUsageManager(SystemUsage usageManager) {
181    }
182
183    public String toString() {
184        return "MemoryPersistenceAdapter";
185    }
186
187    public void setBrokerName(String brokerName) {
188    }
189
190    public void setDirectory(File dir) {
191    }
192    
193    public File getDirectory(){
194        return null;
195    }
196
197    public void checkpoint(boolean sync) throws IOException {
198    }
199    
200    public long size(){
201        return 0;
202    }
203    
204    public void setCreateTransactionStore(boolean create) throws IOException {
205        if (create) {
206            createTransactionStore();
207        }
208    }
209
210    public long getLastProducerSequenceId(ProducerId id) {
211        // memory map does duplicate suppression
212        return -1;
213    }
214}