001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.command;
019
020import java.io.BufferedInputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.EOFException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.util.zip.DeflaterOutputStream;
028import java.util.zip.InflaterInputStream;
029
030import javax.jms.JMSException;
031import javax.jms.MessageEOFException;
032import javax.jms.MessageFormatException;
033import javax.jms.MessageNotReadableException;
034import javax.jms.MessageNotWriteableException;
035import javax.jms.StreamMessage;
036
037import org.apache.activemq.ActiveMQConnection;
038import org.apache.activemq.util.ByteArrayInputStream;
039import org.apache.activemq.util.ByteArrayOutputStream;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.MarshallingSupport;
043
044/**
045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046 * types in the Java programming language. It is filled and read sequentially.
047 * It inherits from the <CODE>Message</CODE> interface and adds a stream
048 * message body. Its methods are based largely on those found in
049 * <CODE>java.io.DataInputStream</CODE> and
050 * <CODE>java.io.DataOutputStream</CODE>. <p/>
051 * <P>
052 * The primitive types can be read or written explicitly using methods for each
053 * type. They may also be read or written generically as objects. For instance,
054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055 * <CODE>StreamMessage.writeObject(new
056 * Integer(6))</CODE>. Both forms are
057 * provided, because the explicit form is convenient for static programming, and
058 * the object form is needed when types are not known at compile time. <p/>
059 * <P>
060 * When the message is first created, and when <CODE>clearBody</CODE> is
061 * called, the body of the message is in write-only mode. After the first call
062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063 * After a message has been sent, the client that sent it can retain and modify
064 * it without affecting the message that has been sent. The same message object
065 * can be sent multiple times. When a message has been received, the provider
066 * has called <CODE>reset</CODE> so that the message body is in read-only mode
067 * for the client. <p/>
068 * <P>
069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070 * message body is cleared and the message body is in write-only mode. <p/>
071 * <P>
072 * If a client attempts to read a message in write-only mode, a
073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074 * <P>
075 * If a client attempts to write a message in read-only mode, a
076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077 * <P>
078 * <CODE>StreamMessage</CODE> objects support the following conversion table.
079 * The marked cases must be supported. The unmarked cases must throw a
080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081 * conversions may throw a runtime exception if the primitive's
082 * <CODE>valueOf()</CODE> method does not accept it as a valid
083 * <CODE>String</CODE> representation of the primitive. <p/>
084 * <P>
085 * A value written as the row type can be read as the column type. <p/>
086 * 
087 * <PRE>
088 *  | | boolean byte short char int long float double String byte[]
089 * |----------------------------------------------------------------------
090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092 * X |----------------------------------------------------------------------
093 * 
094 * </PRE>
095 * 
096 * <p/>
097 * <P>
098 * Attempting to read a null value as a primitive type must be treated as
099 * calling the primitive's corresponding <code>valueOf(String)</code>
100 * conversion method with a null value. Since <code>char</code> does not
101 * support a <code>String</code> conversion, attempting to read a null value
102 * as a <code>char</code> must throw a <code>NullPointerException</code>.
103 * 
104 * @openwire:marshaller code="27"
105 * @see javax.jms.Session#createStreamMessage()
106 * @see javax.jms.BytesMessage
107 * @see javax.jms.MapMessage
108 * @see javax.jms.Message
109 * @see javax.jms.ObjectMessage
110 * @see javax.jms.TextMessage
111 */
112public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113
114    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115
116    protected transient DataOutputStream dataOut;
117    protected transient ByteArrayOutputStream bytesOut;
118    protected transient DataInputStream dataIn;
119    protected transient int remainingBytes = -1;
120
121    public Message copy() {
122        ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
123        copy(copy);
124        return copy;
125    }
126
127    private void copy(ActiveMQStreamMessage copy) {
128        storeContent();
129        super.copy(copy);
130        copy.dataOut = null;
131        copy.bytesOut = null;
132        copy.dataIn = null;
133    }
134
135    public void onSend() throws JMSException {
136        super.onSend();
137        storeContent();
138    }
139
140    private void storeContent() {
141        if (dataOut != null) {
142            try {
143                dataOut.close();
144                setContent(bytesOut.toByteSequence());
145                bytesOut = null;
146                dataOut = null;
147            } catch (IOException ioe) {
148                throw new RuntimeException(ioe);
149            }
150        }
151    }
152
153    public byte getDataStructureType() {
154        return DATA_STRUCTURE_TYPE;
155    }
156
157    public String getJMSXMimeType() {
158        return "jms/stream-message";
159    }
160
161    /**
162     * Clears out the message body. Clearing a message's body does not clear its
163     * header values or property entries. <p/>
164     * <P>
165     * If this message body was read-only, calling this method leaves the
166     * message body in the same state as an empty body in a newly created
167     * message.
168     * 
169     * @throws JMSException if the JMS provider fails to clear the message body
170     *                 due to some internal error.
171     */
172
173    public void clearBody() throws JMSException {
174        super.clearBody();
175        this.dataOut = null;
176        this.dataIn = null;
177        this.bytesOut = null;
178        this.remainingBytes = -1;
179    }
180
181    /**
182     * Reads a <code>boolean</code> from the stream message.
183     * 
184     * @return the <code>boolean</code> value read
185     * @throws JMSException if the JMS provider fails to read the message due to
186     *                 some internal error.
187     * @throws MessageEOFException if unexpected end of message stream has been
188     *                 reached.
189     * @throws MessageFormatException if this type conversion is invalid.
190     * @throws MessageNotReadableException if the message is in write-only mode.
191     */
192
193    public boolean readBoolean() throws JMSException {
194        initializeReading();
195        try {
196
197            this.dataIn.mark(10);
198            int type = this.dataIn.read();
199            if (type == -1) {
200                throw new MessageEOFException("reached end of data");
201            }
202            if (type == MarshallingSupport.BOOLEAN_TYPE) {
203                return this.dataIn.readBoolean();
204            }
205            if (type == MarshallingSupport.STRING_TYPE) {
206                return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
207            }
208            if (type == MarshallingSupport.NULL) {
209                this.dataIn.reset();
210                throw new NullPointerException("Cannot convert NULL value to boolean.");
211            } else {
212                this.dataIn.reset();
213                throw new MessageFormatException(" not a boolean type");
214            }
215        } catch (EOFException e) {
216            throw JMSExceptionSupport.createMessageEOFException(e);
217        } catch (IOException e) {
218            throw JMSExceptionSupport.createMessageFormatException(e);
219        }
220    }
221
222    /**
223     * Reads a <code>byte</code> value from the stream message.
224     * 
225     * @return the next byte from the stream message as a 8-bit
226     *         <code>byte</code>
227     * @throws JMSException if the JMS provider fails to read the message due to
228     *                 some internal error.
229     * @throws MessageEOFException if unexpected end of message stream has been
230     *                 reached.
231     * @throws MessageFormatException if this type conversion is invalid.
232     * @throws MessageNotReadableException if the message is in write-only mode.
233     */
234
235    public byte readByte() throws JMSException {
236        initializeReading();
237        try {
238
239            this.dataIn.mark(10);
240            int type = this.dataIn.read();
241            if (type == -1) {
242                throw new MessageEOFException("reached end of data");
243            }
244            if (type == MarshallingSupport.BYTE_TYPE) {
245                return this.dataIn.readByte();
246            }
247            if (type == MarshallingSupport.STRING_TYPE) {
248                return Byte.valueOf(this.dataIn.readUTF()).byteValue();
249            }
250            if (type == MarshallingSupport.NULL) {
251                this.dataIn.reset();
252                throw new NullPointerException("Cannot convert NULL value to byte.");
253            } else {
254                this.dataIn.reset();
255                throw new MessageFormatException(" not a byte type");
256            }
257        } catch (NumberFormatException mfe) {
258            try {
259                this.dataIn.reset();
260            } catch (IOException ioe) {
261                throw JMSExceptionSupport.create(ioe);
262            }
263            throw mfe;
264
265        } catch (EOFException e) {
266            throw JMSExceptionSupport.createMessageEOFException(e);
267        } catch (IOException e) {
268            throw JMSExceptionSupport.createMessageFormatException(e);
269        }
270    }
271
272    /**
273     * Reads a 16-bit integer from the stream message.
274     * 
275     * @return a 16-bit integer from the stream message
276     * @throws JMSException if the JMS provider fails to read the message due to
277     *                 some internal error.
278     * @throws MessageEOFException if unexpected end of message stream has been
279     *                 reached.
280     * @throws MessageFormatException if this type conversion is invalid.
281     * @throws MessageNotReadableException if the message is in write-only mode.
282     */
283
284    public short readShort() throws JMSException {
285        initializeReading();
286        try {
287
288            this.dataIn.mark(17);
289            int type = this.dataIn.read();
290            if (type == -1) {
291                throw new MessageEOFException("reached end of data");
292            }
293            if (type == MarshallingSupport.SHORT_TYPE) {
294                return this.dataIn.readShort();
295            }
296            if (type == MarshallingSupport.BYTE_TYPE) {
297                return this.dataIn.readByte();
298            }
299            if (type == MarshallingSupport.STRING_TYPE) {
300                return Short.valueOf(this.dataIn.readUTF()).shortValue();
301            }
302            if (type == MarshallingSupport.NULL) {
303                this.dataIn.reset();
304                throw new NullPointerException("Cannot convert NULL value to short.");
305            } else {
306                this.dataIn.reset();
307                throw new MessageFormatException(" not a short type");
308            }
309        } catch (NumberFormatException mfe) {
310            try {
311                this.dataIn.reset();
312            } catch (IOException ioe) {
313                throw JMSExceptionSupport.create(ioe);
314            }
315            throw mfe;
316
317        } catch (EOFException e) {
318            throw JMSExceptionSupport.createMessageEOFException(e);
319        } catch (IOException e) {
320            throw JMSExceptionSupport.createMessageFormatException(e);
321        }
322
323    }
324
325    /**
326     * Reads a Unicode character value from the stream message.
327     * 
328     * @return a Unicode character from the stream message
329     * @throws JMSException if the JMS provider fails to read the message due to
330     *                 some internal error.
331     * @throws MessageEOFException if unexpected end of message stream has been
332     *                 reached.
333     * @throws MessageFormatException if this type conversion is invalid
334     * @throws MessageNotReadableException if the message is in write-only mode.
335     */
336
337    public char readChar() throws JMSException {
338        initializeReading();
339        try {
340
341            this.dataIn.mark(17);
342            int type = this.dataIn.read();
343            if (type == -1) {
344                throw new MessageEOFException("reached end of data");
345            }
346            if (type == MarshallingSupport.CHAR_TYPE) {
347                return this.dataIn.readChar();
348            }
349            if (type == MarshallingSupport.NULL) {
350                this.dataIn.reset();
351                throw new NullPointerException("Cannot convert NULL value to char.");
352            } else {
353                this.dataIn.reset();
354                throw new MessageFormatException(" not a char type");
355            }
356        } catch (NumberFormatException mfe) {
357            try {
358                this.dataIn.reset();
359            } catch (IOException ioe) {
360                throw JMSExceptionSupport.create(ioe);
361            }
362            throw mfe;
363
364        } catch (EOFException e) {
365            throw JMSExceptionSupport.createMessageEOFException(e);
366        } catch (IOException e) {
367            throw JMSExceptionSupport.createMessageFormatException(e);
368        }
369    }
370
371    /**
372     * Reads a 32-bit integer from the stream message.
373     * 
374     * @return a 32-bit integer value from the stream message, interpreted as an
375     *         <code>int</code>
376     * @throws JMSException if the JMS provider fails to read the message due to
377     *                 some internal error.
378     * @throws MessageEOFException if unexpected end of message stream has been
379     *                 reached.
380     * @throws MessageFormatException if this type conversion is invalid.
381     * @throws MessageNotReadableException if the message is in write-only mode.
382     */
383
384    public int readInt() throws JMSException {
385        initializeReading();
386        try {
387
388            this.dataIn.mark(33);
389            int type = this.dataIn.read();
390            if (type == -1) {
391                throw new MessageEOFException("reached end of data");
392            }
393            if (type == MarshallingSupport.INTEGER_TYPE) {
394                return this.dataIn.readInt();
395            }
396            if (type == MarshallingSupport.SHORT_TYPE) {
397                return this.dataIn.readShort();
398            }
399            if (type == MarshallingSupport.BYTE_TYPE) {
400                return this.dataIn.readByte();
401            }
402            if (type == MarshallingSupport.STRING_TYPE) {
403                return Integer.valueOf(this.dataIn.readUTF()).intValue();
404            }
405            if (type == MarshallingSupport.NULL) {
406                this.dataIn.reset();
407                throw new NullPointerException("Cannot convert NULL value to int.");
408            } else {
409                this.dataIn.reset();
410                throw new MessageFormatException(" not an int type");
411            }
412        } catch (NumberFormatException mfe) {
413            try {
414                this.dataIn.reset();
415            } catch (IOException ioe) {
416                throw JMSExceptionSupport.create(ioe);
417            }
418            throw mfe;
419
420        } catch (EOFException e) {
421            throw JMSExceptionSupport.createMessageEOFException(e);
422        } catch (IOException e) {
423            throw JMSExceptionSupport.createMessageFormatException(e);
424        }
425    }
426
427    /**
428     * Reads a 64-bit integer from the stream message.
429     * 
430     * @return a 64-bit integer value from the stream message, interpreted as a
431     *         <code>long</code>
432     * @throws JMSException if the JMS provider fails to read the message due to
433     *                 some internal error.
434     * @throws MessageEOFException if unexpected end of message stream has been
435     *                 reached.
436     * @throws MessageFormatException if this type conversion is invalid.
437     * @throws MessageNotReadableException if the message is in write-only mode.
438     */
439
440    public long readLong() throws JMSException {
441        initializeReading();
442        try {
443
444            this.dataIn.mark(65);
445            int type = this.dataIn.read();
446            if (type == -1) {
447                throw new MessageEOFException("reached end of data");
448            }
449            if (type == MarshallingSupport.LONG_TYPE) {
450                return this.dataIn.readLong();
451            }
452            if (type == MarshallingSupport.INTEGER_TYPE) {
453                return this.dataIn.readInt();
454            }
455            if (type == MarshallingSupport.SHORT_TYPE) {
456                return this.dataIn.readShort();
457            }
458            if (type == MarshallingSupport.BYTE_TYPE) {
459                return this.dataIn.readByte();
460            }
461            if (type == MarshallingSupport.STRING_TYPE) {
462                return Long.valueOf(this.dataIn.readUTF()).longValue();
463            }
464            if (type == MarshallingSupport.NULL) {
465                this.dataIn.reset();
466                throw new NullPointerException("Cannot convert NULL value to long.");
467            } else {
468                this.dataIn.reset();
469                throw new MessageFormatException(" not a long type");
470            }
471        } catch (NumberFormatException mfe) {
472            try {
473                this.dataIn.reset();
474            } catch (IOException ioe) {
475                throw JMSExceptionSupport.create(ioe);
476            }
477            throw mfe;
478
479        } catch (EOFException e) {
480            throw JMSExceptionSupport.createMessageEOFException(e);
481        } catch (IOException e) {
482            throw JMSExceptionSupport.createMessageFormatException(e);
483        }
484    }
485
486    /**
487     * Reads a <code>float</code> from the stream message.
488     * 
489     * @return a <code>float</code> value from the stream message
490     * @throws JMSException if the JMS provider fails to read the message due to
491     *                 some internal error.
492     * @throws MessageEOFException if unexpected end of message stream has been
493     *                 reached.
494     * @throws MessageFormatException if this type conversion is invalid.
495     * @throws MessageNotReadableException if the message is in write-only mode.
496     */
497
498    public float readFloat() throws JMSException {
499        initializeReading();
500        try {
501            this.dataIn.mark(33);
502            int type = this.dataIn.read();
503            if (type == -1) {
504                throw new MessageEOFException("reached end of data");
505            }
506            if (type == MarshallingSupport.FLOAT_TYPE) {
507                return this.dataIn.readFloat();
508            }
509            if (type == MarshallingSupport.STRING_TYPE) {
510                return Float.valueOf(this.dataIn.readUTF()).floatValue();
511            }
512            if (type == MarshallingSupport.NULL) {
513                this.dataIn.reset();
514                throw new NullPointerException("Cannot convert NULL value to float.");
515            } else {
516                this.dataIn.reset();
517                throw new MessageFormatException(" not a float type");
518            }
519        } catch (NumberFormatException mfe) {
520            try {
521                this.dataIn.reset();
522            } catch (IOException ioe) {
523                throw JMSExceptionSupport.create(ioe);
524            }
525            throw mfe;
526
527        } catch (EOFException e) {
528            throw JMSExceptionSupport.createMessageEOFException(e);
529        } catch (IOException e) {
530            throw JMSExceptionSupport.createMessageFormatException(e);
531        }
532    }
533
534    /**
535     * Reads a <code>double</code> from the stream message.
536     * 
537     * @return a <code>double</code> value from the stream message
538     * @throws JMSException if the JMS provider fails to read the message due to
539     *                 some internal error.
540     * @throws MessageEOFException if unexpected end of message stream has been
541     *                 reached.
542     * @throws MessageFormatException if this type conversion is invalid.
543     * @throws MessageNotReadableException if the message is in write-only mode.
544     */
545
546    public double readDouble() throws JMSException {
547        initializeReading();
548        try {
549
550            this.dataIn.mark(65);
551            int type = this.dataIn.read();
552            if (type == -1) {
553                throw new MessageEOFException("reached end of data");
554            }
555            if (type == MarshallingSupport.DOUBLE_TYPE) {
556                return this.dataIn.readDouble();
557            }
558            if (type == MarshallingSupport.FLOAT_TYPE) {
559                return this.dataIn.readFloat();
560            }
561            if (type == MarshallingSupport.STRING_TYPE) {
562                return Double.valueOf(this.dataIn.readUTF()).doubleValue();
563            }
564            if (type == MarshallingSupport.NULL) {
565                this.dataIn.reset();
566                throw new NullPointerException("Cannot convert NULL value to double.");
567            } else {
568                this.dataIn.reset();
569                throw new MessageFormatException(" not a double type");
570            }
571        } catch (NumberFormatException mfe) {
572            try {
573                this.dataIn.reset();
574            } catch (IOException ioe) {
575                throw JMSExceptionSupport.create(ioe);
576            }
577            throw mfe;
578
579        } catch (EOFException e) {
580            throw JMSExceptionSupport.createMessageEOFException(e);
581        } catch (IOException e) {
582            throw JMSExceptionSupport.createMessageFormatException(e);
583        }
584    }
585
586    /**
587     * Reads a <CODE>String</CODE> from the stream message.
588     * 
589     * @return a Unicode string from the stream message
590     * @throws JMSException if the JMS provider fails to read the message due to
591     *                 some internal error.
592     * @throws MessageEOFException if unexpected end of message stream has been
593     *                 reached.
594     * @throws MessageFormatException if this type conversion is invalid.
595     * @throws MessageNotReadableException if the message is in write-only mode.
596     */
597
598    public String readString() throws JMSException {
599        initializeReading();
600        try {
601
602            this.dataIn.mark(65);
603            int type = this.dataIn.read();
604            if (type == -1) {
605                throw new MessageEOFException("reached end of data");
606            }
607            if (type == MarshallingSupport.NULL) {
608                return null;
609            }
610            if (type == MarshallingSupport.BIG_STRING_TYPE) {
611                return MarshallingSupport.readUTF8(dataIn);
612            }
613            if (type == MarshallingSupport.STRING_TYPE) {
614                return this.dataIn.readUTF();
615            }
616            if (type == MarshallingSupport.LONG_TYPE) {
617                return new Long(this.dataIn.readLong()).toString();
618            }
619            if (type == MarshallingSupport.INTEGER_TYPE) {
620                return new Integer(this.dataIn.readInt()).toString();
621            }
622            if (type == MarshallingSupport.SHORT_TYPE) {
623                return new Short(this.dataIn.readShort()).toString();
624            }
625            if (type == MarshallingSupport.BYTE_TYPE) {
626                return new Byte(this.dataIn.readByte()).toString();
627            }
628            if (type == MarshallingSupport.FLOAT_TYPE) {
629                return new Float(this.dataIn.readFloat()).toString();
630            }
631            if (type == MarshallingSupport.DOUBLE_TYPE) {
632                return new Double(this.dataIn.readDouble()).toString();
633            }
634            if (type == MarshallingSupport.BOOLEAN_TYPE) {
635                return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
636            }
637            if (type == MarshallingSupport.CHAR_TYPE) {
638                return new Character(this.dataIn.readChar()).toString();
639            } else {
640                this.dataIn.reset();
641                throw new MessageFormatException(" not a String type");
642            }
643        } catch (NumberFormatException mfe) {
644            try {
645                this.dataIn.reset();
646            } catch (IOException ioe) {
647                throw JMSExceptionSupport.create(ioe);
648            }
649            throw mfe;
650
651        } catch (EOFException e) {
652            throw JMSExceptionSupport.createMessageEOFException(e);
653        } catch (IOException e) {
654            throw JMSExceptionSupport.createMessageFormatException(e);
655        }
656    }
657
658    /**
659     * Reads a byte array field from the stream message into the specified
660     * <CODE>byte[]</CODE> object (the read buffer). <p/>
661     * <P>
662     * To read the field value, <CODE>readBytes</CODE> should be successively
663     * called until it returns a value less than the length of the read buffer.
664     * The value of the bytes in the buffer following the last byte read is
665     * undefined. <p/>
666     * <P>
667     * If <CODE>readBytes</CODE> returns a value equal to the length of the
668     * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
669     * are no more bytes to be read, this call returns -1. <p/>
670     * <P>
671     * If the byte array field value is null, <CODE>readBytes</CODE> returns
672     * -1. <p/>
673     * <P>
674     * If the byte array field value is empty, <CODE>readBytes</CODE> returns
675     * 0. <p/>
676     * <P>
677     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
678     * field value has been made, the full value of the field must be read
679     * before it is valid to read the next field. An attempt to read the next
680     * field before that has been done will throw a
681     * <CODE>MessageFormatException</CODE>. <p/>
682     * <P>
683     * To read the byte field value into a new <CODE>byte[]</CODE> object, use
684     * the <CODE>readObject</CODE> method.
685     * 
686     * @param value the buffer into which the data is read
687     * @return the total number of bytes read into the buffer, or -1 if there is
688     *         no more data because the end of the byte field has been reached
689     * @throws JMSException if the JMS provider fails to read the message due to
690     *                 some internal error.
691     * @throws MessageEOFException if unexpected end of message stream has been
692     *                 reached.
693     * @throws MessageFormatException if this type conversion is invalid.
694     * @throws MessageNotReadableException if the message is in write-only mode.
695     * @see #readObject()
696     */
697
698    public int readBytes(byte[] value) throws JMSException {
699
700        initializeReading();
701        try {
702            if (value == null) {
703                throw new NullPointerException();
704            }
705
706            if (remainingBytes == -1) {
707                this.dataIn.mark(value.length + 1);
708                int type = this.dataIn.read();
709                if (type == -1) {
710                    throw new MessageEOFException("reached end of data");
711                }
712                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
713                    throw new MessageFormatException("Not a byte array");
714                }
715                remainingBytes = this.dataIn.readInt();
716            } else if (remainingBytes == 0) {
717                remainingBytes = -1;
718                return -1;
719            }
720
721            if (value.length <= remainingBytes) {
722                // small buffer
723                remainingBytes -= value.length;
724                this.dataIn.readFully(value);
725                return value.length;
726            } else {
727                // big buffer
728                int rc = this.dataIn.read(value, 0, remainingBytes);
729                remainingBytes = 0;
730                return rc;
731            }
732
733        } catch (EOFException e) {
734            JMSException jmsEx = new MessageEOFException(e.getMessage());
735            jmsEx.setLinkedException(e);
736            throw jmsEx;
737        } catch (IOException e) {
738            JMSException jmsEx = new MessageFormatException(e.getMessage());
739            jmsEx.setLinkedException(e);
740            throw jmsEx;
741        }
742    }
743
744    /**
745     * Reads an object from the stream message. <p/>
746     * <P>
747     * This method can be used to return, in objectified format, an object in
748     * the Java programming language ("Java object") that has been written to
749     * the stream with the equivalent <CODE>writeObject</CODE> method call, or
750     * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
751     * <P>
752     * Note that byte values are returned as <CODE>byte[]</CODE>, not
753     * <CODE>Byte[]</CODE>. <p/>
754     * <P>
755     * An attempt to call <CODE>readObject</CODE> to read a byte field value
756     * into a new <CODE>byte[]</CODE> object before the full value of the byte
757     * field has been read will throw a <CODE>MessageFormatException</CODE>.
758     * 
759     * @return a Java object from the stream message, in objectified format (for
760     *         example, if the object was written as an <CODE>int</CODE>, an
761     *         <CODE>Integer</CODE> is returned)
762     * @throws JMSException if the JMS provider fails to read the message due to
763     *                 some internal error.
764     * @throws MessageEOFException if unexpected end of message stream has been
765     *                 reached.
766     * @throws MessageFormatException if this type conversion is invalid.
767     * @throws MessageNotReadableException if the message is in write-only mode.
768     * @see #readBytes(byte[] value)
769     */
770
771    public Object readObject() throws JMSException {
772        initializeReading();
773        try {
774            this.dataIn.mark(65);
775            int type = this.dataIn.read();
776            if (type == -1) {
777                throw new MessageEOFException("reached end of data");
778            }
779            if (type == MarshallingSupport.NULL) {
780                return null;
781            }
782            if (type == MarshallingSupport.BIG_STRING_TYPE) {
783                return MarshallingSupport.readUTF8(dataIn);
784            }
785            if (type == MarshallingSupport.STRING_TYPE) {
786                return this.dataIn.readUTF();
787            }
788            if (type == MarshallingSupport.LONG_TYPE) {
789                return Long.valueOf(this.dataIn.readLong());
790            }
791            if (type == MarshallingSupport.INTEGER_TYPE) {
792                return Integer.valueOf(this.dataIn.readInt());
793            }
794            if (type == MarshallingSupport.SHORT_TYPE) {
795                return Short.valueOf(this.dataIn.readShort());
796            }
797            if (type == MarshallingSupport.BYTE_TYPE) {
798                return Byte.valueOf(this.dataIn.readByte());
799            }
800            if (type == MarshallingSupport.FLOAT_TYPE) {
801                return new Float(this.dataIn.readFloat());
802            }
803            if (type == MarshallingSupport.DOUBLE_TYPE) {
804                return new Double(this.dataIn.readDouble());
805            }
806            if (type == MarshallingSupport.BOOLEAN_TYPE) {
807                return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
808            }
809            if (type == MarshallingSupport.CHAR_TYPE) {
810                return Character.valueOf(this.dataIn.readChar());
811            }
812            if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
813                int len = this.dataIn.readInt();
814                byte[] value = new byte[len];
815                this.dataIn.readFully(value);
816                return value;
817            } else {
818                this.dataIn.reset();
819                throw new MessageFormatException("unknown type");
820            }
821        } catch (NumberFormatException mfe) {
822            try {
823                this.dataIn.reset();
824            } catch (IOException ioe) {
825                throw JMSExceptionSupport.create(ioe);
826            }
827            throw mfe;
828
829        } catch (EOFException e) {
830            JMSException jmsEx = new MessageEOFException(e.getMessage());
831            jmsEx.setLinkedException(e);
832            throw jmsEx;
833        } catch (IOException e) {
834            JMSException jmsEx = new MessageFormatException(e.getMessage());
835            jmsEx.setLinkedException(e);
836            throw jmsEx;
837        }
838    }
839
840    /**
841     * Writes a <code>boolean</code> to the stream message. The value
842     * <code>true</code> is written as the value <code>(byte)1</code>; the
843     * value <code>false</code> is written as the value <code>(byte)0</code>.
844     * 
845     * @param value the <code>boolean</code> value to be written
846     * @throws JMSException if the JMS provider fails to write the message due
847     *                 to some internal error.
848     * @throws MessageNotWriteableException if the message is in read-only mode.
849     */
850
851    public void writeBoolean(boolean value) throws JMSException {
852        initializeWriting();
853        try {
854            MarshallingSupport.marshalBoolean(dataOut, value);
855        } catch (IOException ioe) {
856            throw JMSExceptionSupport.create(ioe);
857        }
858    }
859
860    /**
861     * Writes a <code>byte</code> to the stream message.
862     * 
863     * @param value the <code>byte</code> value to be written
864     * @throws JMSException if the JMS provider fails to write the message due
865     *                 to some internal error.
866     * @throws MessageNotWriteableException if the message is in read-only mode.
867     */
868
869    public void writeByte(byte value) throws JMSException {
870        initializeWriting();
871        try {
872            MarshallingSupport.marshalByte(dataOut, value);
873        } catch (IOException ioe) {
874            throw JMSExceptionSupport.create(ioe);
875        }
876    }
877
878    /**
879     * Writes a <code>short</code> to the stream message.
880     * 
881     * @param value the <code>short</code> value to be written
882     * @throws JMSException if the JMS provider fails to write the message due
883     *                 to some internal error.
884     * @throws MessageNotWriteableException if the message is in read-only mode.
885     */
886
887    public void writeShort(short value) throws JMSException {
888        initializeWriting();
889        try {
890            MarshallingSupport.marshalShort(dataOut, value);
891        } catch (IOException ioe) {
892            throw JMSExceptionSupport.create(ioe);
893        }
894    }
895
896    /**
897     * Writes a <code>char</code> to the stream message.
898     * 
899     * @param value the <code>char</code> value to be written
900     * @throws JMSException if the JMS provider fails to write the message due
901     *                 to some internal error.
902     * @throws MessageNotWriteableException if the message is in read-only mode.
903     */
904
905    public void writeChar(char value) throws JMSException {
906        initializeWriting();
907        try {
908            MarshallingSupport.marshalChar(dataOut, value);
909        } catch (IOException ioe) {
910            throw JMSExceptionSupport.create(ioe);
911        }
912    }
913
914    /**
915     * Writes an <code>int</code> to the stream message.
916     * 
917     * @param value the <code>int</code> value to be written
918     * @throws JMSException if the JMS provider fails to write the message due
919     *                 to some internal error.
920     * @throws MessageNotWriteableException if the message is in read-only mode.
921     */
922
923    public void writeInt(int value) throws JMSException {
924        initializeWriting();
925        try {
926            MarshallingSupport.marshalInt(dataOut, value);
927        } catch (IOException ioe) {
928            throw JMSExceptionSupport.create(ioe);
929        }
930    }
931
932    /**
933     * Writes a <code>long</code> to the stream message.
934     * 
935     * @param value the <code>long</code> value to be written
936     * @throws JMSException if the JMS provider fails to write the message due
937     *                 to some internal error.
938     * @throws MessageNotWriteableException if the message is in read-only mode.
939     */
940
941    public void writeLong(long value) throws JMSException {
942        initializeWriting();
943        try {
944            MarshallingSupport.marshalLong(dataOut, value);
945        } catch (IOException ioe) {
946            throw JMSExceptionSupport.create(ioe);
947        }
948    }
949
950    /**
951     * Writes a <code>float</code> to the stream message.
952     * 
953     * @param value the <code>float</code> value to be written
954     * @throws JMSException if the JMS provider fails to write the message due
955     *                 to some internal error.
956     * @throws MessageNotWriteableException if the message is in read-only mode.
957     */
958
959    public void writeFloat(float value) throws JMSException {
960        initializeWriting();
961        try {
962            MarshallingSupport.marshalFloat(dataOut, value);
963        } catch (IOException ioe) {
964            throw JMSExceptionSupport.create(ioe);
965        }
966    }
967
968    /**
969     * Writes a <code>double</code> to the stream message.
970     * 
971     * @param value the <code>double</code> value to be written
972     * @throws JMSException if the JMS provider fails to write the message due
973     *                 to some internal error.
974     * @throws MessageNotWriteableException if the message is in read-only mode.
975     */
976
977    public void writeDouble(double value) throws JMSException {
978        initializeWriting();
979        try {
980            MarshallingSupport.marshalDouble(dataOut, value);
981        } catch (IOException ioe) {
982            throw JMSExceptionSupport.create(ioe);
983        }
984    }
985
986    /**
987     * Writes a <code>String</code> to the stream message.
988     * 
989     * @param value the <code>String</code> value to be written
990     * @throws JMSException if the JMS provider fails to write the message due
991     *                 to some internal error.
992     * @throws MessageNotWriteableException if the message is in read-only mode.
993     */
994
995    public void writeString(String value) throws JMSException {
996        initializeWriting();
997        try {
998            if (value == null) {
999                MarshallingSupport.marshalNull(dataOut);
1000            } else {
1001                MarshallingSupport.marshalString(dataOut, value);
1002            }
1003        } catch (IOException ioe) {
1004            throw JMSExceptionSupport.create(ioe);
1005        }
1006    }
1007
1008    /**
1009     * Writes a byte array field to the stream message. <p/>
1010     * <P>
1011     * The byte array <code>value</code> is written to the message as a byte
1012     * array field. Consecutively written byte array fields are treated as two
1013     * distinct fields when the fields are read.
1014     * 
1015     * @param value the byte array value to be written
1016     * @throws JMSException if the JMS provider fails to write the message due
1017     *                 to some internal error.
1018     * @throws MessageNotWriteableException if the message is in read-only mode.
1019     */
1020
1021    public void writeBytes(byte[] value) throws JMSException {
1022        writeBytes(value, 0, value.length);
1023    }
1024
1025    /**
1026     * Writes a portion of a byte array as a byte array field to the stream
1027     * message. <p/>
1028     * <P>
1029     * The a portion of the byte array <code>value</code> is written to the
1030     * message as a byte array field. Consecutively written byte array fields
1031     * are treated as two distinct fields when the fields are read.
1032     * 
1033     * @param value the byte array value to be written
1034     * @param offset the initial offset within the byte array
1035     * @param length the number of bytes to use
1036     * @throws JMSException if the JMS provider fails to write the message due
1037     *                 to some internal error.
1038     * @throws MessageNotWriteableException if the message is in read-only mode.
1039     */
1040
1041    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1042        initializeWriting();
1043        try {
1044            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1045        } catch (IOException ioe) {
1046            throw JMSExceptionSupport.create(ioe);
1047        }
1048    }
1049
1050    /**
1051     * Writes an object to the stream message. <p/>
1052     * <P>
1053     * This method works only for the objectified primitive object types (<code>Integer</code>,
1054     * <code>Double</code>, <code>Long</code>&nbsp;...),
1055     * <code>String</code> objects, and byte arrays.
1056     * 
1057     * @param value the Java object to be written
1058     * @throws JMSException if the JMS provider fails to write the message due
1059     *                 to some internal error.
1060     * @throws MessageFormatException if the object is invalid.
1061     * @throws MessageNotWriteableException if the message is in read-only mode.
1062     */
1063
1064    public void writeObject(Object value) throws JMSException {
1065        initializeWriting();
1066        if (value == null) {
1067            try {
1068                MarshallingSupport.marshalNull(dataOut);
1069            } catch (IOException ioe) {
1070                throw JMSExceptionSupport.create(ioe);
1071            }
1072        } else if (value instanceof String) {
1073            writeString(value.toString());
1074        } else if (value instanceof Character) {
1075            writeChar(((Character)value).charValue());
1076        } else if (value instanceof Boolean) {
1077            writeBoolean(((Boolean)value).booleanValue());
1078        } else if (value instanceof Byte) {
1079            writeByte(((Byte)value).byteValue());
1080        } else if (value instanceof Short) {
1081            writeShort(((Short)value).shortValue());
1082        } else if (value instanceof Integer) {
1083            writeInt(((Integer)value).intValue());
1084        } else if (value instanceof Float) {
1085            writeFloat(((Float)value).floatValue());
1086        } else if (value instanceof Double) {
1087            writeDouble(((Double)value).doubleValue());
1088        } else if (value instanceof byte[]) {
1089            writeBytes((byte[])value);
1090        }else if (value instanceof Long) {
1091            writeLong(((Long)value).longValue());
1092        }else {
1093            throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1094        }
1095    }
1096
1097    /**
1098     * Puts the message body in read-only mode and repositions the stream of
1099     * bytes to the beginning.
1100     * 
1101     * @throws JMSException if an internal error occurs
1102     */
1103
1104    public void reset() throws JMSException {
1105        storeContent();
1106        this.bytesOut = null;
1107        this.dataIn = null;
1108        this.dataOut = null;
1109        this.remainingBytes = -1;
1110        setReadOnlyBody(true);
1111    }
1112
1113    private void initializeWriting() throws MessageNotWriteableException {
1114        checkReadOnlyBody();
1115        if (this.dataOut == null) {
1116            this.bytesOut = new ByteArrayOutputStream();
1117            OutputStream os = bytesOut;
1118            ActiveMQConnection connection = getConnection();
1119            if (connection != null && connection.isUseCompression()) {
1120                compressed = true;
1121                os = new DeflaterOutputStream(os);
1122            }
1123            this.dataOut = new DataOutputStream(os);
1124        }
1125    }
1126
1127    protected void checkWriteOnlyBody() throws MessageNotReadableException {
1128        if (!readOnlyBody) {
1129            throw new MessageNotReadableException("Message body is write-only");
1130        }
1131    }
1132
1133    private void initializeReading() throws MessageNotReadableException {
1134        checkWriteOnlyBody();
1135        if (this.dataIn == null) {
1136            ByteSequence data = getContent();
1137            if (data == null) {
1138                data = new ByteSequence(new byte[] {}, 0, 0);
1139            }
1140            InputStream is = new ByteArrayInputStream(data);
1141            if (isCompressed()) {
1142                is = new InflaterInputStream(is);
1143                is = new BufferedInputStream(is);
1144            }
1145            this.dataIn = new DataInputStream(is);
1146        }
1147    }
1148
1149    public String toString() {
1150        return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1151    }
1152}