001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.memory.buffer;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.ActiveMQMessage;
026import org.apache.activemq.command.Message;
027
028/**
029 * Allows messages to be added to the end of the buffer such that they are kept
030 * around and evicted in a FIFO manner.
031 * 
032 * 
033 */
034public class MessageQueue {
035
036    private MessageBuffer buffer;
037    private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
038    private int size;
039    private Object lock = new Object();
040    private int position;
041
042    public MessageQueue(MessageBuffer buffer) {
043        this.buffer = buffer;
044    }
045
046    public void add(MessageReference messageRef) {
047        Message message = messageRef.getMessageHardRef();
048        int delta = message.getSize();
049        int newSize = 0;
050        synchronized (lock) {
051            list.add(messageRef);
052            size += delta;
053            newSize = size;
054        }
055        buffer.onSizeChanged(this, delta, newSize);
056    }
057    
058    public void add(ActiveMQMessage message) {
059        int delta = message.getSize();
060        int newSize = 0;
061        synchronized (lock) {
062            list.add(message);
063            size += delta;
064            newSize = size;
065        }
066        buffer.onSizeChanged(this, delta, newSize);
067    }
068
069    public int evictMessage() {
070        synchronized (lock) {
071            if (!list.isEmpty()) {
072                ActiveMQMessage message = (ActiveMQMessage) list.removeFirst();
073                int messageSize = message.getSize();
074                size -= messageSize;
075                return messageSize;
076            }
077        }
078        return 0;
079    }
080
081    /**
082     * Returns a copy of the list
083     */
084    public List<MessageReference> getList() {
085        synchronized (lock) {
086            return new ArrayList<MessageReference>(list);
087        }
088    }
089
090    public void appendMessages(List<MessageReference> answer) {
091        synchronized (lock) {
092            for (Iterator<MessageReference> iter = list.iterator(); iter.hasNext();) {
093                answer.add(iter.next());
094            }
095        }
096    }
097
098    public int getSize() {
099        synchronized (lock) {
100            return size;
101        }
102    }
103
104    public int getPosition() {
105        return position;
106    }
107
108    public void setPosition(int position) {
109        this.position = position;
110    }
111
112    public void clear() {
113        synchronized (lock) {
114            list.clear();
115            size = 0;
116        }
117    }
118
119}