001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.io.Serializable;
021    import java.io.StringReader;
022    import java.io.StringWriter;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.advisory.AdvisorySupport;
029    import org.apache.activemq.broker.BrokerContext;
030    import org.apache.activemq.broker.BrokerContextAware;
031    import org.apache.activemq.command.ActiveMQMapMessage;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ActiveMQObjectMessage;
034    import org.apache.activemq.command.DataStructure;
035    import org.codehaus.jettison.mapped.Configuration;
036    
037    import com.thoughtworks.xstream.XStream;
038    import com.thoughtworks.xstream.io.HierarchicalStreamReader;
039    import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
040    import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
041    import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
042    import com.thoughtworks.xstream.io.xml.XppReader;
043    import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
044    
045    /**
046     * Frame translator implementation that uses XStream to convert messages to and
047     * from XML and JSON
048     *
049     * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
050     */
051    public class JmsFrameTranslator extends LegacyFrameTranslator implements
052            BrokerContextAware {
053    
054        XStream xStream = null;
055        BrokerContext brokerContext;
056    
057        public ActiveMQMessage convertFrame(ProtocolConverter converter,
058                StompFrame command) throws JMSException, ProtocolException {
059            Map<String, String> headers = command.getHeaders();
060            ActiveMQMessage msg;
061            String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
062            if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
063                msg = super.convertFrame(converter, command);
064            } else {
065                HierarchicalStreamReader in;
066    
067                try {
068                    String text = new String(command.getContent(), "UTF-8");
069                    switch (Stomp.Transformations.getValue(transformation)) {
070                    case JMS_OBJECT_XML:
071                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
072                        msg = createObjectMessage(in);
073                        break;
074                    case JMS_OBJECT_JSON:
075                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
076                        msg = createObjectMessage(in);
077                        break;
078                    case JMS_MAP_XML:
079                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
080                        msg = createMapMessage(in);
081                        break;
082                    case JMS_MAP_JSON:
083                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
084                        msg = createMapMessage(in);
085                        break;
086                    default:
087                        throw new Exception("Unknown transformation: " + transformation);
088                    }
089                } catch (Throwable e) {
090                    command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
091                    msg = super.convertFrame(converter, command);
092                }
093            }
094            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
095            return msg;
096        }
097    
098        public StompFrame convertMessage(ProtocolConverter converter,
099                ActiveMQMessage message) throws IOException, JMSException {
100            if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
101                StompFrame command = new StompFrame();
102                command.setAction(Stomp.Responses.MESSAGE);
103                Map<String, String> headers = new HashMap<String, String>(25);
104                command.setHeaders(headers);
105    
106                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
107                        converter, message, command, this);
108    
109                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
110                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
111                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
112                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
113                }
114    
115                ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
116                command.setContent(marshall(msg.getObject(),
117                        headers.get(Stomp.Headers.TRANSFORMATION))
118                        .getBytes("UTF-8"));
119                return command;
120    
121            } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
122                StompFrame command = new StompFrame();
123                command.setAction(Stomp.Responses.MESSAGE);
124                Map<String, String> headers = new HashMap<String, String>(25);
125                command.setHeaders(headers);
126    
127                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
128                        converter, message, command, this);
129    
130                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
131                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
132                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
133                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
134                }
135    
136                ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
137                command.setContent(marshall((Serializable)msg.getContentMap(),
138                        headers.get(Stomp.Headers.TRANSFORMATION))
139                        .getBytes("UTF-8"));
140                return command;
141            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
142                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
143    
144                StompFrame command = new StompFrame();
145                command.setAction(Stomp.Responses.MESSAGE);
146                Map<String, String> headers = new HashMap<String, String>(25);
147                command.setHeaders(headers);
148    
149                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
150                        converter, message, command, this);
151    
152                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
153                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
154                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
155                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
156                }
157    
158                String body = marshallAdvisory(message.getDataStructure(),
159                        headers.get(Stomp.Headers.TRANSFORMATION));
160                command.setContent(body.getBytes("UTF-8"));
161                return command;
162            } else {
163                return super.convertMessage(converter, message);
164            }
165        }
166    
167        /**
168         * Marshalls the Object to a string using XML or JSON encoding
169         */
170        protected String marshall(Serializable object, String transformation)
171                throws JMSException {
172            StringWriter buffer = new StringWriter();
173            HierarchicalStreamWriter out;
174            if (transformation.toLowerCase().endsWith("json")) {
175                out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
176            } else {
177                out = new PrettyPrintWriter(buffer);
178            }
179            getXStream().marshal(object, out);
180            return buffer.toString();
181        }
182    
183        protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
184            ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
185            Object obj = getXStream().unmarshal(in);
186            objMsg.setObject((Serializable) obj);
187            return objMsg;
188        }
189    
190        @SuppressWarnings("unchecked")
191        protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
192            ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
193            Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
194            for (String key : map.keySet()) {
195                mapMsg.setObject(key, map.get(key));
196            }
197            return mapMsg;
198        }
199    
200        protected String marshallAdvisory(final DataStructure ds, String transformation) {
201    
202            StringWriter buffer = new StringWriter();
203            HierarchicalStreamWriter out;
204            if (transformation.toLowerCase().endsWith("json")) {
205                out = new JettisonMappedXmlDriver().createWriter(buffer);
206            } else {
207                out = new PrettyPrintWriter(buffer);
208            }
209    
210            XStream xstream = getXStream();
211            xstream.setMode(XStream.NO_REFERENCES);
212            xstream.aliasPackage("", "org.apache.activemq.command");
213            xstream.marshal(ds, out);
214            return buffer.toString();
215        }
216    
217        // Properties
218        // -------------------------------------------------------------------------
219        public XStream getXStream() {
220            if (xStream == null) {
221                xStream = createXStream();
222            }
223            return xStream;
224        }
225    
226        public void setXStream(XStream xStream) {
227            this.xStream = xStream;
228        }
229    
230        // Implementation methods
231        // -------------------------------------------------------------------------
232        @SuppressWarnings("unchecked")
233        protected XStream createXStream() {
234            XStream xstream = null;
235            if (brokerContext != null) {
236                Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
237                for (XStream bean : beans.values()) {
238                    if (bean != null) {
239                        xstream = bean;
240                        break;
241                    }
242                }
243            }
244    
245            if (xstream == null) {
246                xstream = XStreamSupport.createXStream();
247                xstream.ignoreUnknownElements();
248            }
249            return xstream;
250    
251        }
252    
253        public void setBrokerContext(BrokerContext brokerContext) {
254            this.brokerContext = brokerContext;
255        }
256    
257    }