001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Map;
024import javax.jms.IllegalStateException;
025import javax.jms.InvalidDestinationException;
026import javax.jms.JMSException;
027import org.apache.activemq.command.ActiveMQBytesMessage;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQMessage;
030import org.apache.activemq.command.CommandTypes;
031import org.apache.activemq.command.ConsumerId;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.ProducerId;
036import org.apache.activemq.selector.SelectorParser;
037import org.apache.activemq.util.IOExceptionSupport;
038import org.apache.activemq.util.IntrospectionSupport;
039import org.apache.activemq.util.JMSExceptionSupport;
040
041/**
042 * 
043 */
044public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
045
046    private final ActiveMQConnection connection;
047    private final ConsumerInfo info;
048    // These are the messages waiting to be delivered to the client
049    private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
050
051    private int deliveredCounter;
052    private MessageDispatch lastDelivered;
053    private boolean eosReached;
054    private byte buffer[];
055    private int pos;
056    private Map<String, Object> jmsProperties;
057
058    private ProducerId producerId;
059    private long nextSequenceId;
060    private long timeout;
061    private boolean firstReceived;
062
063    public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch,  long timeout)
064        throws JMSException {
065        this.connection = connection;
066
067        if (dest == null) {
068            throw new InvalidDestinationException("Don't understand null destinations");
069        } else if (dest.isTemporary()) {
070            String physicalName = dest.getPhysicalName();
071
072            if (physicalName == null) {
073                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
074            }
075
076            String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
077
078            if (physicalName.indexOf(connectionID) < 0) {
079                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
080            }
081
082            if (connection.isDeleted(dest)) {
083                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
084            }
085        }
086
087        if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
088        this.timeout = timeout;
089        
090        this.info = new ConsumerInfo(consumerId);
091        this.info.setSubscriptionName(name);
092
093        if (selector != null && selector.trim().length() != 0) {
094            selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
095        } else {
096            selector = "JMSType='org.apache.activemq.Stream'";
097        }
098
099        SelectorParser.parse(selector);
100        this.info.setSelector(selector);
101
102        this.info.setPrefetchSize(prefetch);
103        this.info.setNoLocal(noLocal);
104        this.info.setBrowser(false);
105        this.info.setDispatchAsync(false);
106
107        // Allows the options on the destination to configure the consumerInfo
108        if (dest.getOptions() != null) {
109            Map<String, String> options = new HashMap<String, String>(dest.getOptions());
110            IntrospectionSupport.setProperties(this.info, options, "consumer.");
111        }
112
113        this.info.setDestination(dest);
114
115        this.connection.addInputStream(this);
116        this.connection.addDispatcher(info.getConsumerId(), this);
117        this.connection.syncSendPacket(info);
118        unconsumedMessages.start();
119    }
120
121    @Override
122    public void close() throws IOException {
123        if (!unconsumedMessages.isClosed()) {
124            try {
125                if (lastDelivered != null) {
126                    MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
127                    connection.asyncSendPacket(ack);
128                }
129                dispose();
130                this.connection.syncSendPacket(info.createRemoveCommand());
131            } catch (JMSException e) {
132                throw IOExceptionSupport.create(e);
133            }
134        }
135    }
136
137    public void dispose() {
138        if (!unconsumedMessages.isClosed()) {
139            unconsumedMessages.close();
140            this.connection.removeDispatcher(info.getConsumerId());
141            this.connection.removeInputStream(this);
142        }
143    }
144
145    /**
146     * Return the JMS Properties which where used to send the InputStream
147     *
148     * @return jmsProperties
149     * @throws IOException
150     */
151    public Map<String, Object> getJMSProperties() throws IOException {
152        if (jmsProperties == null) {
153            fillBuffer();
154        }
155        return jmsProperties;
156    }
157
158    public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
159        checkClosed();
160        MessageDispatch md;
161        try {
162            if (firstReceived || timeout == -1) {
163                md = unconsumedMessages.dequeue(-1);
164                firstReceived = true;
165            } else {
166                md = unconsumedMessages.dequeue(timeout);
167                if (md == null) throw new ReadTimeoutException();
168            }
169        } catch (InterruptedException e) {
170            Thread.currentThread().interrupt();
171            throw JMSExceptionSupport.create(e);
172        }
173
174        if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
175            return null;
176        }
177
178        deliveredCounter++;
179        if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
180            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
181            connection.asyncSendPacket(ack);
182            deliveredCounter = 0;
183            lastDelivered = null;
184        } else {
185            lastDelivered = md;
186        }
187
188        return (ActiveMQMessage)md.getMessage();
189    }
190
191    /**
192     * @throws IllegalStateException
193     */
194    protected void checkClosed() throws IllegalStateException {
195        if (unconsumedMessages.isClosed()) {
196            throw new IllegalStateException("The Consumer is closed");
197        }
198    }
199
200    /**
201     * 
202     * @see InputStream#read()
203     * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
204     */
205    @Override
206    public int read() throws IOException {
207        fillBuffer();
208        if (eosReached || buffer.length == 0) {
209            return -1;
210        }
211
212        return buffer[pos++] & 0xff;
213    }
214    
215    /**
216     * 
217     * @see InputStream#read(byte[], int, int)
218     * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
219     */
220    @Override
221    public int read(byte[] b, int off, int len) throws IOException {
222        fillBuffer();
223        if (eosReached || buffer.length == 0) {
224            return -1;
225        }
226
227        int max = Math.min(len, buffer.length - pos);
228        System.arraycopy(buffer, pos, b, off, max);
229
230        pos += max;
231        return max;
232    }
233
234    private void fillBuffer() throws IOException {
235        if (eosReached || (buffer != null && buffer.length > pos)) {
236            return;
237        }
238        try {
239            while (true) {
240                ActiveMQMessage m = receive();
241                if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
242                    // First message.
243                    long producerSequenceId = m.getMessageId().getProducerSequenceId();
244                    if (producerId == null) {
245                        // We have to start a stream at sequence id = 0
246                        if (producerSequenceId != 0) {
247                            continue;
248                        }
249                        nextSequenceId++;
250                        producerId = m.getMessageId().getProducerId();
251                    } else {
252                        // Verify it's the next message of the sequence.
253                        if (!m.getMessageId().getProducerId().equals(producerId)) {
254                            throw new IOException("Received an unexpected message: invalid producer: " + m);
255                        }
256                        if (producerSequenceId != nextSequenceId++) {
257                            throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
258                        }
259                    }
260
261                    // Read the buffer in.
262                    ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
263                    buffer = new byte[(int)bm.getBodyLength()];
264                    bm.readBytes(buffer);
265                    pos = 0;
266                    if (jmsProperties == null) {
267                        jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
268                    }
269                } else {
270                    eosReached = true;
271                    if (jmsProperties == null) {
272                        // no properties found
273                        jmsProperties = Collections.emptyMap();
274                    }
275                }
276                return;
277            }
278        } catch (JMSException e) {
279            eosReached = true;
280            if (jmsProperties == null) {
281                // no properties found
282                jmsProperties = Collections.emptyMap();
283            }
284            throw IOExceptionSupport.create(e);
285        }
286    }
287
288    public void dispatch(MessageDispatch md) {
289        unconsumedMessages.enqueue(md);
290    }
291
292    @Override
293    public String toString() {
294        return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
295    }
296
297
298    /**
299     * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
300     *
301     */
302    public class ReadTimeoutException extends IOException {
303        public ReadTimeoutException() {
304            super();
305        }
306    }
307}