001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.util.Collections; 022 import java.util.HashMap; 023 import java.util.Map; 024 import javax.jms.IllegalStateException; 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 import org.apache.activemq.command.ActiveMQBytesMessage; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQMessage; 030 import org.apache.activemq.command.CommandTypes; 031 import org.apache.activemq.command.ConsumerId; 032 import org.apache.activemq.command.ConsumerInfo; 033 import org.apache.activemq.command.MessageAck; 034 import org.apache.activemq.command.MessageDispatch; 035 import org.apache.activemq.command.ProducerId; 036 import org.apache.activemq.selector.SelectorParser; 037 import org.apache.activemq.util.IOExceptionSupport; 038 import org.apache.activemq.util.IntrospectionSupport; 039 import org.apache.activemq.util.JMSExceptionSupport; 040 041 /** 042 * 043 */ 044 public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { 045 046 private final ActiveMQConnection connection; 047 private final ConsumerInfo info; 048 // These are the messages waiting to be delivered to the client 049 private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel(); 050 051 private int deliveredCounter; 052 private MessageDispatch lastDelivered; 053 private boolean eosReached; 054 private byte buffer[]; 055 private int pos; 056 private Map<String, Object> jmsProperties; 057 058 private ProducerId producerId; 059 private long nextSequenceId; 060 private long timeout; 061 private boolean firstReceived; 062 063 public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout) 064 throws JMSException { 065 this.connection = connection; 066 067 if (dest == null) { 068 throw new InvalidDestinationException("Don't understand null destinations"); 069 } else if (dest.isTemporary()) { 070 String physicalName = dest.getPhysicalName(); 071 072 if (physicalName == null) { 073 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 074 } 075 076 String connectionID = connection.getConnectionInfo().getConnectionId().getValue(); 077 078 if (physicalName.indexOf(connectionID) < 0) { 079 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 080 } 081 082 if (connection.isDeleted(dest)) { 083 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 084 } 085 } 086 087 if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1"); 088 this.timeout = timeout; 089 090 this.info = new ConsumerInfo(consumerId); 091 this.info.setSubscriptionName(name); 092 093 if (selector != null && selector.trim().length() != 0) { 094 selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) "; 095 } else { 096 selector = "JMSType='org.apache.activemq.Stream'"; 097 } 098 099 SelectorParser.parse(selector); 100 this.info.setSelector(selector); 101 102 this.info.setPrefetchSize(prefetch); 103 this.info.setNoLocal(noLocal); 104 this.info.setBrowser(false); 105 this.info.setDispatchAsync(false); 106 107 // Allows the options on the destination to configure the consumerInfo 108 if (dest.getOptions() != null) { 109 Map<String, String> options = new HashMap<String, String>(dest.getOptions()); 110 IntrospectionSupport.setProperties(this.info, options, "consumer."); 111 } 112 113 this.info.setDestination(dest); 114 115 this.connection.addInputStream(this); 116 this.connection.addDispatcher(info.getConsumerId(), this); 117 this.connection.syncSendPacket(info); 118 unconsumedMessages.start(); 119 } 120 121 @Override 122 public void close() throws IOException { 123 if (!unconsumedMessages.isClosed()) { 124 try { 125 if (lastDelivered != null) { 126 MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 127 connection.asyncSendPacket(ack); 128 } 129 dispose(); 130 this.connection.syncSendPacket(info.createRemoveCommand()); 131 } catch (JMSException e) { 132 throw IOExceptionSupport.create(e); 133 } 134 } 135 } 136 137 public void dispose() { 138 if (!unconsumedMessages.isClosed()) { 139 unconsumedMessages.close(); 140 this.connection.removeDispatcher(info.getConsumerId()); 141 this.connection.removeInputStream(this); 142 } 143 } 144 145 /** 146 * Return the JMS Properties which where used to send the InputStream 147 * 148 * @return jmsProperties 149 * @throws IOException 150 */ 151 public Map<String, Object> getJMSProperties() throws IOException { 152 if (jmsProperties == null) { 153 fillBuffer(); 154 } 155 return jmsProperties; 156 } 157 158 public ActiveMQMessage receive() throws JMSException, ReadTimeoutException { 159 checkClosed(); 160 MessageDispatch md; 161 try { 162 if (firstReceived || timeout == -1) { 163 md = unconsumedMessages.dequeue(-1); 164 firstReceived = true; 165 } else { 166 md = unconsumedMessages.dequeue(timeout); 167 if (md == null) throw new ReadTimeoutException(); 168 } 169 } catch (InterruptedException e) { 170 Thread.currentThread().interrupt(); 171 throw JMSExceptionSupport.create(e); 172 } 173 174 if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) { 175 return null; 176 } 177 178 deliveredCounter++; 179 if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) { 180 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 181 connection.asyncSendPacket(ack); 182 deliveredCounter = 0; 183 lastDelivered = null; 184 } else { 185 lastDelivered = md; 186 } 187 188 return (ActiveMQMessage)md.getMessage(); 189 } 190 191 /** 192 * @throws IllegalStateException 193 */ 194 protected void checkClosed() throws IllegalStateException { 195 if (unconsumedMessages.isClosed()) { 196 throw new IllegalStateException("The Consumer is closed"); 197 } 198 } 199 200 /** 201 * 202 * @see InputStream#read() 203 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout 204 */ 205 @Override 206 public int read() throws IOException { 207 fillBuffer(); 208 if (eosReached || buffer.length == 0) { 209 return -1; 210 } 211 212 return buffer[pos++] & 0xff; 213 } 214 215 /** 216 * 217 * @see InputStream#read(byte[], int, int) 218 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout 219 */ 220 @Override 221 public int read(byte[] b, int off, int len) throws IOException { 222 fillBuffer(); 223 if (eosReached || buffer.length == 0) { 224 return -1; 225 } 226 227 int max = Math.min(len, buffer.length - pos); 228 System.arraycopy(buffer, pos, b, off, max); 229 230 pos += max; 231 return max; 232 } 233 234 private void fillBuffer() throws IOException { 235 if (eosReached || (buffer != null && buffer.length > pos)) { 236 return; 237 } 238 try { 239 while (true) { 240 ActiveMQMessage m = receive(); 241 if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) { 242 // First message. 243 long producerSequenceId = m.getMessageId().getProducerSequenceId(); 244 if (producerId == null) { 245 // We have to start a stream at sequence id = 0 246 if (producerSequenceId != 0) { 247 continue; 248 } 249 nextSequenceId++; 250 producerId = m.getMessageId().getProducerId(); 251 } else { 252 // Verify it's the next message of the sequence. 253 if (!m.getMessageId().getProducerId().equals(producerId)) { 254 throw new IOException("Received an unexpected message: invalid producer: " + m); 255 } 256 if (producerSequenceId != nextSequenceId++) { 257 throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m); 258 } 259 } 260 261 // Read the buffer in. 262 ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m; 263 buffer = new byte[(int)bm.getBodyLength()]; 264 bm.readBytes(buffer); 265 pos = 0; 266 if (jmsProperties == null) { 267 jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties())); 268 } 269 } else { 270 eosReached = true; 271 if (jmsProperties == null) { 272 // no properties found 273 jmsProperties = Collections.emptyMap(); 274 } 275 } 276 return; 277 } 278 } catch (JMSException e) { 279 eosReached = true; 280 if (jmsProperties == null) { 281 // no properties found 282 jmsProperties = Collections.emptyMap(); 283 } 284 throw IOExceptionSupport.create(e); 285 } 286 } 287 288 public void dispatch(MessageDispatch md) { 289 unconsumedMessages.enqueue(md); 290 } 291 292 @Override 293 public String toString() { 294 return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; 295 } 296 297 298 /** 299 * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout 300 * 301 */ 302 public class ReadTimeoutException extends IOException { 303 public ReadTimeoutException() { 304 super(); 305 } 306 } 307 }