001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.io.Serializable;
021import java.io.StringReader;
022import java.io.StringWriter;
023import java.util.HashMap;
024import java.util.Map;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.advisory.AdvisorySupport;
029import org.apache.activemq.broker.BrokerContext;
030import org.apache.activemq.broker.BrokerContextAware;
031import org.apache.activemq.command.ActiveMQMapMessage;
032import org.apache.activemq.command.ActiveMQMessage;
033import org.apache.activemq.command.ActiveMQObjectMessage;
034import org.apache.activemq.command.DataStructure;
035import org.codehaus.jettison.mapped.Configuration;
036
037import com.thoughtworks.xstream.XStream;
038import com.thoughtworks.xstream.io.HierarchicalStreamReader;
039import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
040import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
041import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
042import com.thoughtworks.xstream.io.xml.XppReader;
043import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
044
045/**
046 * Frame translator implementation that uses XStream to convert messages to and
047 * from XML and JSON
048 *
049 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
050 */
051public class JmsFrameTranslator extends LegacyFrameTranslator implements
052        BrokerContextAware {
053
054    XStream xStream = null;
055    BrokerContext brokerContext;
056
057    public ActiveMQMessage convertFrame(ProtocolConverter converter,
058            StompFrame command) throws JMSException, ProtocolException {
059        Map<String, String> headers = command.getHeaders();
060        ActiveMQMessage msg;
061        String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
062        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
063            msg = super.convertFrame(converter, command);
064        } else {
065            HierarchicalStreamReader in;
066
067            try {
068                String text = new String(command.getContent(), "UTF-8");
069                switch (Stomp.Transformations.getValue(transformation)) {
070                case JMS_OBJECT_XML:
071                    in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
072                    msg = createObjectMessage(in);
073                    break;
074                case JMS_OBJECT_JSON:
075                    in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
076                    msg = createObjectMessage(in);
077                    break;
078                case JMS_MAP_XML:
079                    in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
080                    msg = createMapMessage(in);
081                    break;
082                case JMS_MAP_JSON:
083                    in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
084                    msg = createMapMessage(in);
085                    break;
086                default:
087                    throw new Exception("Unknown transformation: " + transformation);
088                }
089            } catch (Throwable e) {
090                command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
091                msg = super.convertFrame(converter, command);
092            }
093        }
094        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
095        return msg;
096    }
097
098    public StompFrame convertMessage(ProtocolConverter converter,
099            ActiveMQMessage message) throws IOException, JMSException {
100        if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
101            StompFrame command = new StompFrame();
102            command.setAction(Stomp.Responses.MESSAGE);
103            Map<String, String> headers = new HashMap<String, String>(25);
104            command.setHeaders(headers);
105
106            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
107                    converter, message, command, this);
108
109            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
110                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
111            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
112                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
113            }
114
115            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
116            command.setContent(marshall(msg.getObject(),
117                    headers.get(Stomp.Headers.TRANSFORMATION))
118                    .getBytes("UTF-8"));
119            return command;
120
121        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
122            StompFrame command = new StompFrame();
123            command.setAction(Stomp.Responses.MESSAGE);
124            Map<String, String> headers = new HashMap<String, String>(25);
125            command.setHeaders(headers);
126
127            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
128                    converter, message, command, this);
129
130            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
131                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
132            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
133                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
134            }
135
136            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
137            command.setContent(marshall((Serializable)msg.getContentMap(),
138                    headers.get(Stomp.Headers.TRANSFORMATION))
139                    .getBytes("UTF-8"));
140            return command;
141        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
142                AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
143
144            StompFrame command = new StompFrame();
145            command.setAction(Stomp.Responses.MESSAGE);
146            Map<String, String> headers = new HashMap<String, String>(25);
147            command.setHeaders(headers);
148
149            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
150                    converter, message, command, this);
151
152            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
153                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
154            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
155                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
156            }
157
158            String body = marshallAdvisory(message.getDataStructure(),
159                    headers.get(Stomp.Headers.TRANSFORMATION));
160            command.setContent(body.getBytes("UTF-8"));
161            return command;
162        } else {
163            return super.convertMessage(converter, message);
164        }
165    }
166
167    /**
168     * Marshalls the Object to a string using XML or JSON encoding
169     */
170    protected String marshall(Serializable object, String transformation)
171            throws JMSException {
172        StringWriter buffer = new StringWriter();
173        HierarchicalStreamWriter out;
174        if (transformation.toLowerCase().endsWith("json")) {
175            out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
176        } else {
177            out = new PrettyPrintWriter(buffer);
178        }
179        getXStream().marshal(object, out);
180        return buffer.toString();
181    }
182
183    protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
184        ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
185        Object obj = getXStream().unmarshal(in);
186        objMsg.setObject((Serializable) obj);
187        return objMsg;
188    }
189
190    @SuppressWarnings("unchecked")
191    protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
192        ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
193        Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
194        for (String key : map.keySet()) {
195            mapMsg.setObject(key, map.get(key));
196        }
197        return mapMsg;
198    }
199
200    protected String marshallAdvisory(final DataStructure ds, String transformation) {
201
202        StringWriter buffer = new StringWriter();
203        HierarchicalStreamWriter out;
204        if (transformation.toLowerCase().endsWith("json")) {
205            out = new JettisonMappedXmlDriver().createWriter(buffer);
206        } else {
207            out = new PrettyPrintWriter(buffer);
208        }
209
210        XStream xstream = getXStream();
211        xstream.setMode(XStream.NO_REFERENCES);
212        xstream.aliasPackage("", "org.apache.activemq.command");
213        xstream.marshal(ds, out);
214        return buffer.toString();
215    }
216
217    // Properties
218    // -------------------------------------------------------------------------
219    public XStream getXStream() {
220        if (xStream == null) {
221            xStream = createXStream();
222        }
223        return xStream;
224    }
225
226    public void setXStream(XStream xStream) {
227        this.xStream = xStream;
228    }
229
230    // Implementation methods
231    // -------------------------------------------------------------------------
232    @SuppressWarnings("unchecked")
233    protected XStream createXStream() {
234        XStream xstream = null;
235        if (brokerContext != null) {
236            Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
237            for (XStream bean : beans.values()) {
238                if (bean != null) {
239                    xstream = bean;
240                    break;
241                }
242            }
243        }
244
245        if (xstream == null) {
246            xstream = XStreamSupport.createXStream();
247            xstream.ignoreUnknownElements();
248        }
249        return xstream;
250
251    }
252
253    public void setBrokerContext(BrokerContext brokerContext) {
254        this.brokerContext = brokerContext;
255    }
256
257}