001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.stomp;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.Socket;
026import java.net.UnknownHostException;
027import java.util.HashMap;
028
029public class StompConnection {
030
031    public static final long RECEIVE_TIMEOUT = 10000;
032
033    private Socket stompSocket;
034    private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035
036    public void open(String host, int port) throws IOException, UnknownHostException {
037        open(new Socket(host, port));
038    }
039
040    public void open(Socket socket) {
041        stompSocket = socket;
042    }
043
044    public void close() throws IOException {
045        if (stompSocket != null) {
046            stompSocket.close();
047            stompSocket = null;
048        }
049    }
050
051    public void sendFrame(String data) throws Exception {
052        byte[] bytes = data.getBytes("UTF-8");
053        OutputStream outputStream = stompSocket.getOutputStream();
054        outputStream.write(bytes);
055        outputStream.flush();
056    }
057
058    public void sendFrame(String frame, byte[] data) throws Exception {
059        byte[] bytes = frame.getBytes("UTF-8");
060        OutputStream outputStream = stompSocket.getOutputStream();
061        outputStream.write(bytes);
062        outputStream.write(data);
063        outputStream.flush();
064    }
065
066    public StompFrame receive() throws Exception {
067        return receive(RECEIVE_TIMEOUT);
068    }
069
070    public StompFrame receive(long timeOut) throws Exception {
071        stompSocket.setSoTimeout((int)timeOut);
072        InputStream is = stompSocket.getInputStream();
073        StompWireFormat wf = new StompWireFormat();
074        DataInputStream dis = new DataInputStream(is);
075        return (StompFrame)wf.unmarshal(dis);
076    }
077
078    public String receiveFrame() throws Exception {
079        return receiveFrame(RECEIVE_TIMEOUT);
080    }
081
082    public String receiveFrame(long timeOut) throws Exception {
083        stompSocket.setSoTimeout((int)timeOut);
084        InputStream is = stompSocket.getInputStream();
085        int c = 0;
086        for (;;) {
087            c = is.read();
088            if (c < 0) {
089                throw new IOException("socket closed.");
090            } else if (c == 0) {
091                c = is.read();
092                if (c == '\n') {
093                    // end of frame
094                    return stringFromBuffer(inputBuffer);
095                } else {
096                    inputBuffer.write(0);
097                    inputBuffer.write(c);
098                }
099            } else {
100                inputBuffer.write(c);
101            }
102        }
103    }
104
105    private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
106        byte[] ba = inputBuffer.toByteArray();
107        inputBuffer.reset();
108        return new String(ba, "UTF-8");
109    }
110
111    public Socket getStompSocket() {
112        return stompSocket;
113    }
114
115    public void setStompSocket(Socket stompSocket) {
116        this.stompSocket = stompSocket;
117    }
118
119    public void connect(String username, String password) throws Exception {
120        connect(username, password, null);
121    }
122
123    public void connect(String username, String password, String client) throws Exception {
124        HashMap<String, String> headers = new HashMap<String, String>();
125        headers.put("login", username);
126        headers.put("passcode", password);
127        if (client != null) {
128            headers.put("client-id", client);
129        }
130        StompFrame frame = new StompFrame("CONNECT", headers);
131        sendFrame(frame.format());
132
133        StompFrame connect = receive();
134        if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
135            throw new Exception ("Not connected: " + connect.getBody());
136        }
137    }
138
139    public void disconnect() throws Exception {
140        StompFrame frame = new StompFrame("DISCONNECT");
141        sendFrame(frame.format());
142    }
143
144    public void send(String destination, String message) throws Exception {
145        send(destination, message, null, null);
146    }
147
148    public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
149        if (headers == null) {
150            headers = new HashMap<String, String>();
151        }
152        headers.put("destination", destination);
153        if (transaction != null) {
154            headers.put("transaction", transaction);
155        }
156        StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
157        sendFrame(frame.format());
158    }
159
160    public void subscribe(String destination) throws Exception {
161        subscribe(destination, null, null);
162    }
163
164    public void subscribe(String destination, String ack) throws Exception {
165        subscribe(destination, ack, new HashMap<String, String>());
166    }
167
168    public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
169        if (headers == null) {
170            headers = new HashMap<String, String>();
171        }
172        headers.put("destination", destination);
173        if (ack != null) {
174            headers.put("ack", ack);
175        }
176        StompFrame frame = new StompFrame("SUBSCRIBE", headers);
177        sendFrame(frame.format());
178    }
179
180    public void unsubscribe(String destination) throws Exception {
181        unsubscribe(destination, null);
182    }
183
184    public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
185        if (headers == null) {
186            headers = new HashMap<String, String>();
187        }
188        headers.put("destination", destination);
189        StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
190        sendFrame(frame.format());
191    }
192
193    public void begin(String transaction) throws Exception {
194        HashMap<String, String> headers = new HashMap<String, String>();
195        headers.put("transaction", transaction);
196        StompFrame frame = new StompFrame("BEGIN", headers);
197        sendFrame(frame.format());
198    }
199
200    public void abort(String transaction) throws Exception {
201        HashMap<String, String> headers = new HashMap<String, String>();
202        headers.put("transaction", transaction);
203        StompFrame frame = new StompFrame("ABORT", headers);
204        sendFrame(frame.format());
205    }
206
207    public void commit(String transaction) throws Exception {
208        HashMap<String, String> headers = new HashMap<String, String>();
209        headers.put("transaction", transaction);
210        StompFrame frame = new StompFrame("COMMIT", headers);
211        sendFrame(frame.format());
212    }
213
214    public void ack(StompFrame frame) throws Exception {
215        ack(frame.getHeaders().get("message-id"), null);
216    }
217
218    public void ack(StompFrame frame, String transaction) throws Exception {
219        ack(frame.getHeaders().get("message-id"), transaction);
220    }
221
222    public void ack(String messageId) throws Exception {
223        ack(messageId, null);
224    }
225
226    public void ack(String messageId, String transaction) throws Exception {
227        HashMap<String, String> headers = new HashMap<String, String>();
228        headers.put("message-id", messageId);
229        if (transaction != null)
230            headers.put("transaction", transaction);
231        StompFrame frame = new StompFrame("ACK", headers);
232        sendFrame(frame.format());
233    }
234
235    protected String appendHeaders(HashMap<String, Object> headers) {
236        StringBuilder result = new StringBuilder();
237        for (String key : headers.keySet()) {
238            result.append(key + ":" + headers.get(key) + "\n");
239        }
240        result.append("\n");
241        return result.toString();
242    }
243
244}