001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.DataOutputStream;
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.Map;
023    
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    
027    import org.apache.activemq.advisory.AdvisorySupport;
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.ActiveMQTextMessage;
032    import org.apache.activemq.command.DataStructure;
033    import org.apache.activemq.util.ByteArrayOutputStream;
034    import org.apache.activemq.util.ByteSequence;
035    
036    import com.thoughtworks.xstream.XStream;
037    import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
038    
039    /**
040     * Implements ActiveMQ 4.0 translations
041     */
042    public class LegacyFrameTranslator implements FrameTranslator {
043    
044    
045        public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
046            final Map<?, ?> headers = command.getHeaders();
047            final ActiveMQMessage msg;
048            /*
049             * To reduce the complexity of this method perhaps a Chain of Responsibility
050             * would be a better implementation
051             */
052            if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) {
053                String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE);
054                if(intendedType.equalsIgnoreCase("text")){
055                    ActiveMQTextMessage text = new ActiveMQTextMessage();
056                    try {
057                        //text.setText(new String(command.getContent(), "UTF-8"));
058                        ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
059                        DataOutputStream data = new DataOutputStream(bytes);
060                        data.writeInt(command.getContent().length);
061                        data.write(command.getContent());
062                        text.setContent(bytes.toByteSequence());
063                    } catch (Throwable e) {
064                        throw new ProtocolException("Text could not bet set: " + e, false, e);
065                    }
066                    msg = text;
067                } else if(intendedType.equalsIgnoreCase("bytes")) {
068                    ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();
069                    byteMessage.writeBytes(command.getContent());
070                    msg = byteMessage;
071                } else {
072                    throw new ProtocolException("Unsupported message type '"+intendedType+"'",false);
073                }
074            }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
075                headers.remove(Stomp.Headers.CONTENT_LENGTH);
076                ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
077                bm.writeBytes(command.getContent());
078                msg = bm;
079            } else {
080                ActiveMQTextMessage text = new ActiveMQTextMessage();
081                try {
082                    //text.setText(new String(command.getContent(), "UTF-8"));
083                    ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
084                    DataOutputStream data = new DataOutputStream(bytes);
085                    data.writeInt(command.getContent().length);
086                    data.write(command.getContent());
087                    text.setContent(bytes.toByteSequence());
088                } catch (Throwable e) {
089                    throw new ProtocolException("Text could not bet set: " + e, false, e);
090                }
091                msg = text;
092            }
093            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
094            return msg;
095        }
096    
097        public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
098            StompFrame command = new StompFrame();
099            command.setAction(Stomp.Responses.MESSAGE);
100            Map<String, String> headers = new HashMap<String, String>(25);
101            command.setHeaders(headers);
102    
103            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
104    
105            if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
106    
107                if (!message.isCompressed() && message.getContent() != null) {
108                    ByteSequence msgContent = message.getContent();
109                    if (msgContent.getLength() > 4) {
110                        byte[] content = new byte[msgContent.getLength() - 4];
111                        System.arraycopy(msgContent.data, 4, content, 0, content.length);
112                        command.setContent(content);
113                    }
114                } else {
115                    ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
116                    String messageText = msg.getText();
117                    if (messageText != null) {
118                        command.setContent(msg.getText().getBytes("UTF-8"));
119                    }
120                }
121    
122            } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
123    
124                ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
125                msg.setReadOnlyBody(true);
126                byte[] data = new byte[(int)msg.getBodyLength()];
127                msg.readBytes(data);
128    
129                headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
130                command.setContent(data);
131            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
132                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
133    
134                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
135                        converter, message, command, this);
136    
137                String body = marshallAdvisory(message.getDataStructure());
138                command.setContent(body.getBytes("UTF-8"));
139            }
140            return command;
141        }
142    
143        public String convertDestination(ProtocolConverter converter, Destination d) {
144            if (d == null) {
145                return null;
146            }
147            ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
148            String physicalName = activeMQDestination.getPhysicalName();
149    
150            String rc = converter.getCreatedTempDestinationName(activeMQDestination);
151            if( rc!=null ) {
152                return rc;
153            }
154    
155            StringBuilder buffer = new StringBuilder();
156            if (activeMQDestination.isQueue()) {
157                if (activeMQDestination.isTemporary()) {
158                    buffer.append("/remote-temp-queue/");
159                } else {
160                    buffer.append("/queue/");
161                }
162            } else {
163                if (activeMQDestination.isTemporary()) {
164                    buffer.append("/remote-temp-topic/");
165                } else {
166                    buffer.append("/topic/");
167                }
168            }
169            buffer.append(physicalName);
170            return buffer.toString();
171        }
172    
173        public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
174            if (name == null) {
175                return null;
176            } else if (name.startsWith("/queue/")) {
177                String qName = name.substring("/queue/".length(), name.length());
178                return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
179            } else if (name.startsWith("/topic/")) {
180                String tName = name.substring("/topic/".length(), name.length());
181                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
182            } else if (name.startsWith("/remote-temp-queue/")) {
183                String tName = name.substring("/remote-temp-queue/".length(), name.length());
184                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
185            } else if (name.startsWith("/remote-temp-topic/")) {
186                String tName = name.substring("/remote-temp-topic/".length(), name.length());
187                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
188            } else if (name.startsWith("/temp-queue/")) {
189                return converter.createTempDestination(name, false);
190            } else if (name.startsWith("/temp-topic/")) {
191                return converter.createTempDestination(name, true);
192            } else {
193                if (forceFallback) {
194                    try {
195                        ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(name);
196                        if (fallback != null) {
197                            return fallback;
198                        }
199                    } catch (JMSException e) {
200                        throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
201                                + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
202                    }
203                }
204                throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
205                                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
206            }
207        }
208    
209        /**
210         * Return an Advisory message as a JSON formatted string
211         * @param ds
212         * @return
213         */
214        protected String marshallAdvisory(final DataStructure ds) {
215            XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
216            xstream.setMode(XStream.NO_REFERENCES);
217            xstream.aliasPackage("", "org.apache.activemq.command");
218            return xstream.toXML(ds);
219        }
220    }