001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import org.apache.activemq.broker.BrokerContext;
020import org.apache.activemq.broker.BrokerContextAware;
021import org.apache.activemq.command.*;
022import org.apache.activemq.util.ByteArrayOutputStream;
023import org.apache.activemq.util.*;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import javax.jms.JMSException;
028import java.io.*;
029import java.util.*;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.atomic.AtomicBoolean;
032
033/**
034 * @author <a href="http://hiramchirino.com">chirino</a>
035 */
036public class ProtocolConverter {
037
038    private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
039
040    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
041
042    private static final String BROKER_VERSION;
043    private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
044
045    static {
046        InputStream in = null;
047        String version = "5.6.0";
048        if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
049            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
050            try {
051                version = reader.readLine();
052            } catch(Exception e) {
053            }
054        }
055        BROKER_VERSION = version;
056    }
057
058    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
059    private final SessionId sessionId = new SessionId(connectionId, -1);
060    private final ProducerId producerId = new ProducerId(sessionId, 1);
061
062    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
063    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
064    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
065    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
066
067    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
068    private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
069    private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
070    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
071    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
072    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
073    private final StompTransport stompTransport;
074
075    private final Object commnadIdMutex = new Object();
076    private int lastCommandId;
077    private final AtomicBoolean connected = new AtomicBoolean(false);
078    private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
079    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
080    private final BrokerContext brokerContext;
081    private String version = "1.0";
082    private long hbReadInterval;
083    private long hbWriteInterval;
084    private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
085
086    public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
087        this.stompTransport = stompTransport;
088        this.brokerContext = brokerContext;
089    }
090
091    protected int generateCommandId() {
092        synchronized (commnadIdMutex) {
093            return lastCommandId++;
094        }
095    }
096
097    protected ResponseHandler createResponseHandler(final StompFrame command) {
098        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
099        if (receiptId != null) {
100            return new ResponseHandler() {
101                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
102                    if (response.isException()) {
103                        // Generally a command can fail.. but that does not invalidate the connection.
104                        // We report back the failure but we don't close the connection.
105                        Throwable exception = ((ExceptionResponse)response).getException();
106                        handleException(exception, command);
107                    } else {
108                        StompFrame sc = new StompFrame();
109                        sc.setAction(Stomp.Responses.RECEIPT);
110                        sc.setHeaders(new HashMap<String, String>(1));
111                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
112                        stompTransport.sendToStomp(sc);
113                    }
114                }
115            };
116        }
117        return null;
118    }
119
120    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
121        command.setCommandId(generateCommandId());
122        if (handler != null) {
123            command.setResponseRequired(true);
124            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
125        }
126        stompTransport.sendToActiveMQ(command);
127    }
128
129    protected void sendToStomp(StompFrame command) throws IOException {
130        stompTransport.sendToStomp(command);
131    }
132
133    protected FrameTranslator findTranslator(String header) {
134        FrameTranslator translator = frameTranslator;
135        try {
136            if (header != null) {
137                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
138                        .newInstance(header);
139                if (translator instanceof BrokerContextAware) {
140                    ((BrokerContextAware)translator).setBrokerContext(brokerContext);
141                }
142            }
143        } catch (Exception ignore) {
144            // if anything goes wrong use the default translator
145        }
146
147        return translator;
148    }
149
150    /**
151     * Convert a stomp command
152     *
153     * @param command
154     */
155    public void onStompCommand(StompFrame command) throws IOException, JMSException {
156        try {
157
158            if (command.getClass() == StompFrameError.class) {
159                throw ((StompFrameError)command).getException();
160            }
161
162            String action = command.getAction();
163            if (action.startsWith(Stomp.Commands.SEND)) {
164                onStompSend(command);
165            } else if (action.startsWith(Stomp.Commands.ACK)) {
166                onStompAck(command);
167            } else if (action.startsWith(Stomp.Commands.NACK)) {
168                onStompNack(command);
169            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
170                onStompBegin(command);
171            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
172                onStompCommit(command);
173            } else if (action.startsWith(Stomp.Commands.ABORT)) {
174                onStompAbort(command);
175            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
176                onStompSubscribe(command);
177            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
178                onStompUnsubscribe(command);
179            } else if (action.startsWith(Stomp.Commands.CONNECT) ||
180                       action.startsWith(Stomp.Commands.STOMP)) {
181                onStompConnect(command);
182            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
183                onStompDisconnect(command);
184            } else {
185                throw new ProtocolException("Unknown STOMP action: " + action);
186            }
187
188        } catch (ProtocolException e) {
189            handleException(e, command);
190            // Some protocol errors can cause the connection to get closed.
191            if (e.isFatal()) {
192               getStompTransport().onException(e);
193            }
194        }
195    }
196
197    protected void handleException(Throwable exception, StompFrame command) throws IOException {
198        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
199        if (LOG.isDebugEnabled()) {
200            LOG.debug("Exception detail", exception);
201        }
202
203        // Let the stomp client know about any protocol errors.
204        ByteArrayOutputStream baos = new ByteArrayOutputStream();
205        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
206        exception.printStackTrace(stream);
207        stream.close();
208
209        HashMap<String, String> headers = new HashMap<String, String>();
210        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
211        headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
212
213        if (command != null) {
214            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
215            if (receiptId != null) {
216                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
217            }
218        }
219
220        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
221        sendToStomp(errorMessage);
222    }
223
224    protected void onStompSend(StompFrame command) throws IOException, JMSException {
225        checkConnected();
226
227        Map<String, String> headers = command.getHeaders();
228        String destination = headers.get(Stomp.Headers.Send.DESTINATION);
229        if (destination == null) {
230            throw new ProtocolException("SEND received without a Destination specified!");
231        }
232
233        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
234        headers.remove("transaction");
235
236        ActiveMQMessage message = convertMessage(command);
237
238        message.setProducerId(producerId);
239        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
240        message.setMessageId(id);
241        message.setJMSTimestamp(System.currentTimeMillis());
242
243        if (stompTx != null) {
244            TransactionId activemqTx = transactions.get(stompTx);
245            if (activemqTx == null) {
246                throw new ProtocolException("Invalid transaction id: " + stompTx);
247            }
248            message.setTransactionId(activemqTx);
249        }
250
251        message.onSend();
252        sendToActiveMQ(message, createResponseHandler(command));
253    }
254
255    protected void onStompNack(StompFrame command) throws ProtocolException {
256
257        checkConnected();
258
259        if (this.version.equals(Stomp.V1_0)) {
260            throw new ProtocolException("NACK received but connection is in v1.0 mode.");
261        }
262
263        Map<String, String> headers = command.getHeaders();
264
265        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
266        if (subscriptionId == null) {
267            throw new ProtocolException("NACK received without a subscription id for acknowledge!");
268        }
269
270        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
271        if (messageId == null) {
272            throw new ProtocolException("NACK received without a message-id to acknowledge!");
273        }
274
275        TransactionId activemqTx = null;
276        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
277        if (stompTx != null) {
278            activemqTx = transactions.get(stompTx);
279            if (activemqTx == null) {
280                throw new ProtocolException("Invalid transaction id: " + stompTx);
281            }
282        }
283
284        if (subscriptionId != null) {
285            StompSubscription sub = this.subscriptions.get(subscriptionId);
286            if (sub != null) {
287                MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
288                if (ack != null) {
289                    sendToActiveMQ(ack, createResponseHandler(command));
290                } else {
291                    throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
292                }
293            }
294        }
295    }
296
297    protected void onStompAck(StompFrame command) throws ProtocolException {
298        checkConnected();
299
300        Map<String, String> headers = command.getHeaders();
301        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
302        if (messageId == null) {
303            throw new ProtocolException("ACK received without a message-id to acknowledge!");
304        }
305
306        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
307        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
308            throw new ProtocolException("ACK received without a subscription id for acknowledge!");
309        }
310
311        TransactionId activemqTx = null;
312        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
313        if (stompTx != null) {
314            activemqTx = transactions.get(stompTx);
315            if (activemqTx == null) {
316                throw new ProtocolException("Invalid transaction id: " + stompTx);
317            }
318        }
319
320        boolean acked = false;
321
322        if (subscriptionId != null) {
323
324            StompSubscription sub = this.subscriptions.get(subscriptionId);
325            if (sub != null) {
326                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
327                if (ack != null) {
328                    sendToActiveMQ(ack, createResponseHandler(command));
329                    acked = true;
330                }
331            }
332
333        } else {
334
335            // TODO: acking with just a message id is very bogus since the same message id
336            // could have been sent to 2 different subscriptions on the same Stomp connection.
337            // For example, when 2 subs are created on the same topic.
338
339            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
340                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
341                if (ack != null) {
342                    sendToActiveMQ(ack, createResponseHandler(command));
343                    acked = true;
344                    break;
345                }
346            }
347        }
348
349        if (!acked) {
350            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
351        }
352    }
353
354    protected void onStompBegin(StompFrame command) throws ProtocolException {
355        checkConnected();
356
357        Map<String, String> headers = command.getHeaders();
358
359        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
360
361        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
362            throw new ProtocolException("Must specify the transaction you are beginning");
363        }
364
365        if (transactions.get(stompTx) != null) {
366            throw new ProtocolException("The transaction was allready started: " + stompTx);
367        }
368
369        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
370        transactions.put(stompTx, activemqTx);
371
372        TransactionInfo tx = new TransactionInfo();
373        tx.setConnectionId(connectionId);
374        tx.setTransactionId(activemqTx);
375        tx.setType(TransactionInfo.BEGIN);
376
377        sendToActiveMQ(tx, createResponseHandler(command));
378    }
379
380    protected void onStompCommit(StompFrame command) throws ProtocolException {
381        checkConnected();
382
383        Map<String, String> headers = command.getHeaders();
384
385        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
386        if (stompTx == null) {
387            throw new ProtocolException("Must specify the transaction you are committing");
388        }
389
390        TransactionId activemqTx = transactions.remove(stompTx);
391        if (activemqTx == null) {
392            throw new ProtocolException("Invalid transaction id: " + stompTx);
393        }
394
395        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
396            sub.onStompCommit(activemqTx);
397        }
398
399        TransactionInfo tx = new TransactionInfo();
400        tx.setConnectionId(connectionId);
401        tx.setTransactionId(activemqTx);
402        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
403
404        sendToActiveMQ(tx, createResponseHandler(command));
405    }
406
407    protected void onStompAbort(StompFrame command) throws ProtocolException {
408        checkConnected();
409        Map<String, String> headers = command.getHeaders();
410
411        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
412        if (stompTx == null) {
413            throw new ProtocolException("Must specify the transaction you are committing");
414        }
415
416        TransactionId activemqTx = transactions.remove(stompTx);
417        if (activemqTx == null) {
418            throw new ProtocolException("Invalid transaction id: " + stompTx);
419        }
420        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
421            try {
422                sub.onStompAbort(activemqTx);
423            } catch (Exception e) {
424                throw new ProtocolException("Transaction abort failed", false, e);
425            }
426        }
427
428        TransactionInfo tx = new TransactionInfo();
429        tx.setConnectionId(connectionId);
430        tx.setTransactionId(activemqTx);
431        tx.setType(TransactionInfo.ROLLBACK);
432
433        sendToActiveMQ(tx, createResponseHandler(command));
434    }
435
436    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
437        checkConnected();
438        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
439        Map<String, String> headers = command.getHeaders();
440
441        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
442        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
443
444        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
445            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
446        }
447
448        ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
449
450        if (actualDest == null) {
451            throw new ProtocolException("Invalid Destination.");
452        }
453
454        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
455        ConsumerInfo consumerInfo = new ConsumerInfo(id);
456        consumerInfo.setPrefetchSize(1000);
457        consumerInfo.setDispatchAsync(true);
458
459        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
460        if (browser != null && browser.equals(Stomp.TRUE)) {
461
462            if (!this.version.equals(Stomp.V1_1)) {
463                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
464            }
465
466            consumerInfo.setBrowser(true);
467        }
468
469        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
470        consumerInfo.setSelector(selector);
471
472        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
473
474        consumerInfo.setDestination(translator.convertDestination(this, destination, true));
475
476        StompSubscription stompSubscription;
477        if (!consumerInfo.isBrowser()) {
478            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
479        } else {
480            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
481        }
482        stompSubscription.setDestination(actualDest);
483
484        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
485        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
486            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
487        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
488            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
489        } else {
490            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
491        }
492
493        subscriptionsByConsumerId.put(id, stompSubscription);
494        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
495        if (subscriptionId != null) {
496            subscriptions.put(subscriptionId, stompSubscription);
497        }
498
499        // dispatch can beat the receipt so send it early
500        sendReceipt(command);
501        sendToActiveMQ(consumerInfo, null);
502    }
503
504    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
505        checkConnected();
506        Map<String, String> headers = command.getHeaders();
507
508        ActiveMQDestination destination = null;
509        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
510        if (o != null) {
511            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
512        }
513
514        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
515        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
516            throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
517        }
518
519        if (subscriptionId == null && destination == null) {
520            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
521        }
522
523        // check if it is a durable subscription
524        String durable = command.getHeaders().get("activemq.subscriptionName");
525        if (durable != null) {
526            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
527            info.setClientId(durable);
528            info.setSubscriptionName(durable);
529            info.setConnectionId(connectionId);
530            sendToActiveMQ(info, createResponseHandler(command));
531            return;
532        }
533
534        if (subscriptionId != null) {
535
536            StompSubscription sub = this.subscriptions.remove(subscriptionId);
537            if (sub != null) {
538                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
539                return;
540            }
541
542        } else {
543
544            // Unsubscribing using a destination is a bit weird if multiple subscriptions
545            // are created with the same destination.
546            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
547                StompSubscription sub = iter.next();
548                if (destination != null && destination.equals(sub.getDestination())) {
549                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
550                    iter.remove();
551                    return;
552                }
553            }
554        }
555
556        throw new ProtocolException("No subscription matched.");
557    }
558
559    ConnectionInfo connectionInfo = new ConnectionInfo();
560
561    protected void onStompConnect(final StompFrame command) throws ProtocolException {
562
563        if (connected.get()) {
564            throw new ProtocolException("Allready connected.");
565        }
566
567        final Map<String, String> headers = command.getHeaders();
568
569        // allow anyone to login for now
570        String login = headers.get(Stomp.Headers.Connect.LOGIN);
571        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
572        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
573        String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
574        String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
575
576        if (accepts == null) {
577            accepts = Stomp.DEFAULT_VERSION;
578        }
579        if (heartBeat == null) {
580            heartBeat = defaultHeartBeat;
581        }
582
583        HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
584        acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
585        if (acceptsVersions.isEmpty()) {
586            throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
587                                        Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
588        } else {
589            this.version = Collections.max(acceptsVersions);
590        }
591
592        configureInactivityMonitor(heartBeat);
593
594        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
595        connectionInfo.setConnectionId(connectionId);
596        if (clientId != null) {
597            connectionInfo.setClientId(clientId);
598        } else {
599            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
600        }
601
602        connectionInfo.setResponseRequired(true);
603        connectionInfo.setUserName(login);
604        connectionInfo.setPassword(passcode);
605        connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
606
607        sendToActiveMQ(connectionInfo, new ResponseHandler() {
608            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
609
610                if (response.isException()) {
611                    // If the connection attempt fails we close the socket.
612                    Throwable exception = ((ExceptionResponse)response).getException();
613                    handleException(exception, command);
614                    getStompTransport().onException(IOExceptionSupport.create(exception));
615                    return;
616                }
617
618                final SessionInfo sessionInfo = new SessionInfo(sessionId);
619                sendToActiveMQ(sessionInfo, null);
620
621                final ProducerInfo producerInfo = new ProducerInfo(producerId);
622                sendToActiveMQ(producerInfo, new ResponseHandler() {
623                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
624
625                        if (response.isException()) {
626                            // If the connection attempt fails we close the socket.
627                            Throwable exception = ((ExceptionResponse)response).getException();
628                            handleException(exception, command);
629                            getStompTransport().onException(IOExceptionSupport.create(exception));
630                        }
631
632                        connected.set(true);
633                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
634
635                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
636                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
637                        if (requestId == null) {
638                            // TODO legacy
639                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
640                        }
641                        if (requestId != null) {
642                            // TODO legacy
643                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
644                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
645                        }
646
647                        responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
648                        responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
649                                            String.format("%d,%d", hbWriteInterval, hbReadInterval));
650                        responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
651
652                        StompFrame sc = new StompFrame();
653                        sc.setAction(Stomp.Responses.CONNECTED);
654                        sc.setHeaders(responseHeaders);
655                        sendToStomp(sc);
656
657                        if (version.equals(Stomp.V1_1)) {
658                            StompWireFormat format = stompTransport.getWireFormat();
659                            if (format != null) {
660                                format.setEncodingEnabled(true);
661                            }
662                        }
663                    }
664                });
665
666            }
667        });
668    }
669
670    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
671        checkConnected();
672        sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
673        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
674        connected.set(false);
675    }
676
677    protected void checkConnected() throws ProtocolException {
678        if (!connected.get()) {
679            throw new ProtocolException("Not connected.");
680        }
681    }
682
683    /**
684     * Dispatch a ActiveMQ command
685     *
686     * @param command
687     * @throws IOException
688     */
689    public void onActiveMQCommand(Command command) throws IOException, JMSException {
690        if (command.isResponse()) {
691            Response response = (Response)command;
692            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
693            if (rh != null) {
694                rh.onResponse(this, response);
695            } else {
696                // Pass down any unexpected errors. Should this close the connection?
697                if (response.isException()) {
698                    Throwable exception = ((ExceptionResponse)response).getException();
699                    handleException(exception, null);
700                }
701            }
702        } else if (command.isMessageDispatch()) {
703            MessageDispatch md = (MessageDispatch)command;
704            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
705            if (sub != null) {
706                sub.onMessageDispatch(md);
707            }
708        } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
709            stompTransport.sendToStomp(ping);
710        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
711            // Pass down any unexpected async errors. Should this close the connection?
712            Throwable exception = ((ConnectionError)command).getException();
713            handleException(exception, null);
714        }
715    }
716
717    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
718        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
719        return msg;
720    }
721
722    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
723        if (ignoreTransformation == true) {
724            return frameTranslator.convertMessage(this, message);
725        } else {
726            return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
727        }
728    }
729
730    public StompTransport getStompTransport() {
731        return stompTransport;
732    }
733
734    public ActiveMQDestination createTempDestination(String name, boolean topic) {
735        ActiveMQDestination rc = tempDestinations.get(name);
736        if( rc == null ) {
737            if (topic) {
738                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
739            } else {
740                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
741            }
742            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
743            tempDestinations.put(name, rc);
744            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
745        }
746        return rc;
747    }
748
749    public String getCreatedTempDestinationName(ActiveMQDestination destination) {
750        return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
751    }
752
753    public String getDefaultHeartBeat() {
754        return defaultHeartBeat;
755    }
756
757    public void setDefaultHeartBeat(String defaultHeartBeat) {
758        this.defaultHeartBeat = defaultHeartBeat;
759    }
760
761    protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
762
763        String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
764
765        if (keepAliveOpts == null || keepAliveOpts.length != 2) {
766            throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
767        } else {
768
769            try {
770                hbReadInterval = Long.parseLong(keepAliveOpts[0]);
771                hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
772            } catch(NumberFormatException e) {
773                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
774            }
775
776            try {
777
778                StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
779
780                monitor.setReadCheckTime(hbReadInterval);
781                monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
782                monitor.setWriteCheckTime(hbWriteInterval);
783
784                monitor.startMonitoring();
785
786            } catch(Exception ex) {
787                hbReadInterval = 0;
788                hbWriteInterval = 0;
789            }
790
791            if (LOG.isDebugEnabled()) {
792                LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
793            }
794        }
795    }
796
797    protected void sendReceipt(StompFrame command) {
798        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
799        if (receiptId != null) {
800            StompFrame sc = new StompFrame();
801            sc.setAction(Stomp.Responses.RECEIPT);
802            sc.setHeaders(new HashMap<String, String>(1));
803            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
804            try {
805                sendToStomp(sc);
806            } catch (IOException e) {
807                LOG.warn("Could not send a receipt for " + command, e);
808            }
809        }
810    }
811}