001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.command;
019    
020    import java.io.DataInputStream;
021    import java.io.DataOutputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.ObjectOutputStream;
025    import java.io.OutputStream;
026    import java.io.Serializable;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.JMSException;
031    import javax.jms.ObjectMessage;
032    
033    import org.apache.activemq.ActiveMQConnection;
034    import org.apache.activemq.util.ByteArrayInputStream;
035    import org.apache.activemq.util.ByteArrayOutputStream;
036    import org.apache.activemq.util.ByteSequence;
037    import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
038    import org.apache.activemq.util.JMSExceptionSupport;
039    import org.apache.activemq.wireformat.WireFormat;
040    
041    /**
042     * An <CODE>ObjectMessage</CODE> object is used to send a message that
043     * contains a serializable object in the Java programming language ("Java
044     * object"). It inherits from the <CODE>Message</CODE> interface and adds a
045     * body containing a single reference to an object. Only
046     * <CODE>Serializable</CODE> Java objects can be used. <p/>
047     * <P>
048     * If a collection of Java objects must be sent, one of the
049     * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
050     * <P>
051     * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
052     * mode. If a client attempts to write to the message at this point, a
053     * <CODE>MessageNotWriteableException</CODE> is thrown. If
054     * <CODE>clearBody</CODE> is called, the message can now be both read from and
055     * written to.
056     * 
057     * @openwire:marshaller code="26"
058     * @see javax.jms.Session#createObjectMessage()
059     * @see javax.jms.Session#createObjectMessage(Serializable)
060     * @see javax.jms.BytesMessage
061     * @see javax.jms.MapMessage
062     * @see javax.jms.Message
063     * @see javax.jms.StreamMessage
064     * @see javax.jms.TextMessage
065     */
066    public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
067        
068        // TODO: verify classloader
069        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
070        static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); 
071    
072        protected transient Serializable object;
073    
074        public Message copy() {
075            ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
076            copy(copy);
077            return copy;
078        }
079    
080        private void copy(ActiveMQObjectMessage copy) {
081            ActiveMQConnection connection = getConnection();
082            if (connection == null || !connection.isObjectMessageSerializationDefered()) {
083                storeContent();
084                copy.object = null;
085            } else {
086                copy.object = object;
087            }
088            super.copy(copy);
089            
090        }
091    
092        public void storeContent() {
093            ByteSequence bodyAsBytes = getContent();
094            if (bodyAsBytes == null && object != null) {
095                try {
096                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
097                    OutputStream os = bytesOut;
098                    ActiveMQConnection connection = getConnection();
099                    if (connection != null && connection.isUseCompression()) {
100                        compressed = true;
101                        os = new DeflaterOutputStream(os);
102                    }
103                    DataOutputStream dataOut = new DataOutputStream(os);
104                    ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
105                    objOut.writeObject(object);
106                    objOut.flush();
107                    objOut.reset();
108                    objOut.close();
109                    setContent(bytesOut.toByteSequence());
110                } catch (IOException ioe) {
111                    throw new RuntimeException(ioe.getMessage(), ioe);
112                }
113            }
114        }
115    
116        public byte getDataStructureType() {
117            return DATA_STRUCTURE_TYPE;
118        }
119    
120        public String getJMSXMimeType() {
121            return "jms/object-message";
122        }
123    
124        /**
125         * Clears out the message body. Clearing a message's body does not clear its
126         * header values or property entries. <p/>
127         * <P>
128         * If this message body was read-only, calling this method leaves the
129         * message body in the same state as an empty body in a newly created
130         * message.
131         * 
132         * @throws JMSException if the JMS provider fails to clear the message body
133         *                 due to some internal error.
134         */
135    
136        public void clearBody() throws JMSException {
137            super.clearBody();
138            this.object = null;
139        }
140    
141        /**
142         * Sets the serializable object containing this message's data. It is
143         * important to note that an <CODE>ObjectMessage</CODE> contains a
144         * snapshot of the object at the time <CODE>setObject()</CODE> is called;
145         * subsequent modifications of the object will have no effect on the
146         * <CODE>ObjectMessage</CODE> body.
147         * 
148         * @param newObject the message's data
149         * @throws JMSException if the JMS provider fails to set the object due to
150         *                 some internal error.
151         * @throws javax.jms.MessageFormatException if object serialization fails.
152         * @throws javax.jms.MessageNotWriteableException if the message is in
153         *                 read-only mode.
154         */
155    
156        public void setObject(Serializable newObject) throws JMSException {
157            checkReadOnlyBody();
158            this.object = newObject;
159            setContent(null);
160            ActiveMQConnection connection = getConnection();
161            if (connection == null || !connection.isObjectMessageSerializationDefered()) {
162                storeContent();
163            }
164        }
165    
166        /**
167         * Gets the serializable object containing this message's data. The default
168         * value is null.
169         * 
170         * @return the serializable object containing this message's data
171         * @throws JMSException
172         */
173        public Serializable getObject() throws JMSException {
174            if (object == null && getContent() != null) {
175                try {
176                    ByteSequence content = getContent();
177                    InputStream is = new ByteArrayInputStream(content);
178                    if (isCompressed()) {
179                        is = new InflaterInputStream(is);
180                    }
181                    DataInputStream dataIn = new DataInputStream(is);
182                    ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
183                    try {
184                        object = (Serializable)objIn.readObject();
185                    } catch (ClassNotFoundException ce) {
186                        throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
187                    } finally {
188                        dataIn.close();
189                    }
190                } catch (IOException e) {
191                    throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
192                }
193            }
194            return this.object;
195        }
196    
197        @Override
198        public void beforeMarshall(WireFormat wireFormat) throws IOException {
199            super.beforeMarshall(wireFormat);
200            // may have initiated on vm transport with deferred marshalling
201            storeContent();
202        }
203    
204        public void clearMarshalledState() throws JMSException {
205            super.clearMarshalledState();
206            this.object = null;
207        }
208    
209        public void onMessageRolledBack() {
210            super.onMessageRolledBack();
211    
212            // lets force the object to be deserialized again - as we could have
213            // changed the object
214            object = null;
215        }
216    
217        public String toString() {
218            try {
219                getObject();
220            } catch (JMSException e) {
221            }
222            return super.toString();
223        }
224    }