001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import org.apache.activemq.broker.BrokerContext;
020    import org.apache.activemq.broker.BrokerContextAware;
021    import org.apache.activemq.command.*;
022    import org.apache.activemq.util.ByteArrayOutputStream;
023    import org.apache.activemq.util.*;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    import javax.jms.JMSException;
028    import java.io.*;
029    import java.util.*;
030    import java.util.concurrent.ConcurrentHashMap;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    
033    /**
034     * @author <a href="http://hiramchirino.com">chirino</a>
035     */
036    public class ProtocolConverter {
037    
038        private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
039    
040        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
041    
042        private static final String BROKER_VERSION;
043        private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
044    
045        static {
046            InputStream in = null;
047            String version = "5.6.0";
048            if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
049                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
050                try {
051                    version = reader.readLine();
052                } catch(Exception e) {
053                }
054            }
055            BROKER_VERSION = version;
056        }
057    
058        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
059        private final SessionId sessionId = new SessionId(connectionId, -1);
060        private final ProducerId producerId = new ProducerId(sessionId, 1);
061    
062        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
063        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
064        private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
065        private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
066    
067        private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
068        private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
069        private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
070        private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
071        private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
072        private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
073        private final StompTransport stompTransport;
074    
075        private final Object commnadIdMutex = new Object();
076        private int lastCommandId;
077        private final AtomicBoolean connected = new AtomicBoolean(false);
078        private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
079        private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
080        private final BrokerContext brokerContext;
081        private String version = "1.0";
082        private long hbReadInterval;
083        private long hbWriteInterval;
084        private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
085    
086        public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
087            this.stompTransport = stompTransport;
088            this.brokerContext = brokerContext;
089        }
090    
091        protected int generateCommandId() {
092            synchronized (commnadIdMutex) {
093                return lastCommandId++;
094            }
095        }
096    
097        protected ResponseHandler createResponseHandler(final StompFrame command) {
098            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
099            if (receiptId != null) {
100                return new ResponseHandler() {
101                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
102                        if (response.isException()) {
103                            // Generally a command can fail.. but that does not invalidate the connection.
104                            // We report back the failure but we don't close the connection.
105                            Throwable exception = ((ExceptionResponse)response).getException();
106                            handleException(exception, command);
107                        } else {
108                            StompFrame sc = new StompFrame();
109                            sc.setAction(Stomp.Responses.RECEIPT);
110                            sc.setHeaders(new HashMap<String, String>(1));
111                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
112                            stompTransport.sendToStomp(sc);
113                        }
114                    }
115                };
116            }
117            return null;
118        }
119    
120        protected void sendToActiveMQ(Command command, ResponseHandler handler) {
121            command.setCommandId(generateCommandId());
122            if (handler != null) {
123                command.setResponseRequired(true);
124                resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
125            }
126            stompTransport.sendToActiveMQ(command);
127        }
128    
129        protected void sendToStomp(StompFrame command) throws IOException {
130            stompTransport.sendToStomp(command);
131        }
132    
133        protected FrameTranslator findTranslator(String header) {
134            FrameTranslator translator = frameTranslator;
135            try {
136                if (header != null) {
137                    translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
138                            .newInstance(header);
139                    if (translator instanceof BrokerContextAware) {
140                        ((BrokerContextAware)translator).setBrokerContext(brokerContext);
141                    }
142                }
143            } catch (Exception ignore) {
144                // if anything goes wrong use the default translator
145            }
146    
147            return translator;
148        }
149    
150        /**
151         * Convert a stomp command
152         *
153         * @param command
154         */
155        public void onStompCommand(StompFrame command) throws IOException, JMSException {
156            try {
157    
158                if (command.getClass() == StompFrameError.class) {
159                    throw ((StompFrameError)command).getException();
160                }
161    
162                String action = command.getAction();
163                if (action.startsWith(Stomp.Commands.SEND)) {
164                    onStompSend(command);
165                } else if (action.startsWith(Stomp.Commands.ACK)) {
166                    onStompAck(command);
167                } else if (action.startsWith(Stomp.Commands.NACK)) {
168                    onStompNack(command);
169                } else if (action.startsWith(Stomp.Commands.BEGIN)) {
170                    onStompBegin(command);
171                } else if (action.startsWith(Stomp.Commands.COMMIT)) {
172                    onStompCommit(command);
173                } else if (action.startsWith(Stomp.Commands.ABORT)) {
174                    onStompAbort(command);
175                } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
176                    onStompSubscribe(command);
177                } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
178                    onStompUnsubscribe(command);
179                } else if (action.startsWith(Stomp.Commands.CONNECT) ||
180                           action.startsWith(Stomp.Commands.STOMP)) {
181                    onStompConnect(command);
182                } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
183                    onStompDisconnect(command);
184                } else {
185                    throw new ProtocolException("Unknown STOMP action: " + action);
186                }
187    
188            } catch (ProtocolException e) {
189                handleException(e, command);
190                // Some protocol errors can cause the connection to get closed.
191                if (e.isFatal()) {
192                   getStompTransport().onException(e);
193                }
194            }
195        }
196    
197        protected void handleException(Throwable exception, StompFrame command) throws IOException {
198            LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
199            if (LOG.isDebugEnabled()) {
200                LOG.debug("Exception detail", exception);
201            }
202    
203            // Let the stomp client know about any protocol errors.
204            ByteArrayOutputStream baos = new ByteArrayOutputStream();
205            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
206            exception.printStackTrace(stream);
207            stream.close();
208    
209            HashMap<String, String> headers = new HashMap<String, String>();
210            headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
211            headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
212    
213            if (command != null) {
214                final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
215                if (receiptId != null) {
216                    headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
217                }
218            }
219    
220            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
221            sendToStomp(errorMessage);
222        }
223    
224        protected void onStompSend(StompFrame command) throws IOException, JMSException {
225            checkConnected();
226    
227            Map<String, String> headers = command.getHeaders();
228            String destination = headers.get(Stomp.Headers.Send.DESTINATION);
229            if (destination == null) {
230                throw new ProtocolException("SEND received without a Destination specified!");
231            }
232    
233            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
234            headers.remove("transaction");
235    
236            ActiveMQMessage message = convertMessage(command);
237    
238            message.setProducerId(producerId);
239            MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
240            message.setMessageId(id);
241            message.setJMSTimestamp(System.currentTimeMillis());
242    
243            if (stompTx != null) {
244                TransactionId activemqTx = transactions.get(stompTx);
245                if (activemqTx == null) {
246                    throw new ProtocolException("Invalid transaction id: " + stompTx);
247                }
248                message.setTransactionId(activemqTx);
249            }
250    
251            message.onSend();
252            sendToActiveMQ(message, createResponseHandler(command));
253        }
254    
255        protected void onStompNack(StompFrame command) throws ProtocolException {
256    
257            checkConnected();
258    
259            if (this.version.equals(Stomp.V1_0)) {
260                throw new ProtocolException("NACK received but connection is in v1.0 mode.");
261            }
262    
263            Map<String, String> headers = command.getHeaders();
264    
265            String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
266            if (subscriptionId == null) {
267                throw new ProtocolException("NACK received without a subscription id for acknowledge!");
268            }
269    
270            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
271            if (messageId == null) {
272                throw new ProtocolException("NACK received without a message-id to acknowledge!");
273            }
274    
275            TransactionId activemqTx = null;
276            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
277            if (stompTx != null) {
278                activemqTx = transactions.get(stompTx);
279                if (activemqTx == null) {
280                    throw new ProtocolException("Invalid transaction id: " + stompTx);
281                }
282            }
283    
284            if (subscriptionId != null) {
285                StompSubscription sub = this.subscriptions.get(subscriptionId);
286                if (sub != null) {
287                    MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
288                    if (ack != null) {
289                        sendToActiveMQ(ack, createResponseHandler(command));
290                    } else {
291                        throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
292                    }
293                }
294            }
295        }
296    
297        protected void onStompAck(StompFrame command) throws ProtocolException {
298            checkConnected();
299    
300            Map<String, String> headers = command.getHeaders();
301            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
302            if (messageId == null) {
303                throw new ProtocolException("ACK received without a message-id to acknowledge!");
304            }
305    
306            String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
307            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
308                throw new ProtocolException("ACK received without a subscription id for acknowledge!");
309            }
310    
311            TransactionId activemqTx = null;
312            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
313            if (stompTx != null) {
314                activemqTx = transactions.get(stompTx);
315                if (activemqTx == null) {
316                    throw new ProtocolException("Invalid transaction id: " + stompTx);
317                }
318            }
319    
320            boolean acked = false;
321    
322            if (subscriptionId != null) {
323    
324                StompSubscription sub = this.subscriptions.get(subscriptionId);
325                if (sub != null) {
326                    MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
327                    if (ack != null) {
328                        sendToActiveMQ(ack, createResponseHandler(command));
329                        acked = true;
330                    }
331                }
332    
333            } else {
334    
335                // TODO: acking with just a message id is very bogus since the same message id
336                // could have been sent to 2 different subscriptions on the same Stomp connection.
337                // For example, when 2 subs are created on the same topic.
338    
339                for (StompSubscription sub : subscriptionsByConsumerId.values()) {
340                    MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
341                    if (ack != null) {
342                        sendToActiveMQ(ack, createResponseHandler(command));
343                        acked = true;
344                        break;
345                    }
346                }
347            }
348    
349            if (!acked) {
350                throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
351            }
352        }
353    
354        protected void onStompBegin(StompFrame command) throws ProtocolException {
355            checkConnected();
356    
357            Map<String, String> headers = command.getHeaders();
358    
359            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
360    
361            if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
362                throw new ProtocolException("Must specify the transaction you are beginning");
363            }
364    
365            if (transactions.get(stompTx) != null) {
366                throw new ProtocolException("The transaction was allready started: " + stompTx);
367            }
368    
369            LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
370            transactions.put(stompTx, activemqTx);
371    
372            TransactionInfo tx = new TransactionInfo();
373            tx.setConnectionId(connectionId);
374            tx.setTransactionId(activemqTx);
375            tx.setType(TransactionInfo.BEGIN);
376    
377            sendToActiveMQ(tx, createResponseHandler(command));
378        }
379    
380        protected void onStompCommit(StompFrame command) throws ProtocolException {
381            checkConnected();
382    
383            Map<String, String> headers = command.getHeaders();
384    
385            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
386            if (stompTx == null) {
387                throw new ProtocolException("Must specify the transaction you are committing");
388            }
389    
390            TransactionId activemqTx = transactions.remove(stompTx);
391            if (activemqTx == null) {
392                throw new ProtocolException("Invalid transaction id: " + stompTx);
393            }
394    
395            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
396                sub.onStompCommit(activemqTx);
397            }
398    
399            TransactionInfo tx = new TransactionInfo();
400            tx.setConnectionId(connectionId);
401            tx.setTransactionId(activemqTx);
402            tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
403    
404            sendToActiveMQ(tx, createResponseHandler(command));
405        }
406    
407        protected void onStompAbort(StompFrame command) throws ProtocolException {
408            checkConnected();
409            Map<String, String> headers = command.getHeaders();
410    
411            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
412            if (stompTx == null) {
413                throw new ProtocolException("Must specify the transaction you are committing");
414            }
415    
416            TransactionId activemqTx = transactions.remove(stompTx);
417            if (activemqTx == null) {
418                throw new ProtocolException("Invalid transaction id: " + stompTx);
419            }
420            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
421                try {
422                    sub.onStompAbort(activemqTx);
423                } catch (Exception e) {
424                    throw new ProtocolException("Transaction abort failed", false, e);
425                }
426            }
427    
428            TransactionInfo tx = new TransactionInfo();
429            tx.setConnectionId(connectionId);
430            tx.setTransactionId(activemqTx);
431            tx.setType(TransactionInfo.ROLLBACK);
432    
433            sendToActiveMQ(tx, createResponseHandler(command));
434        }
435    
436        protected void onStompSubscribe(StompFrame command) throws ProtocolException {
437            checkConnected();
438            FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
439            Map<String, String> headers = command.getHeaders();
440    
441            String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
442            String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
443    
444            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
445                throw new ProtocolException("SUBSCRIBE received without a subscription id!");
446            }
447    
448            ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
449    
450            if (actualDest == null) {
451                throw new ProtocolException("Invalid Destination.");
452            }
453    
454            ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
455            ConsumerInfo consumerInfo = new ConsumerInfo(id);
456            consumerInfo.setPrefetchSize(1000);
457            consumerInfo.setDispatchAsync(true);
458    
459            String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
460            if (browser != null && browser.equals(Stomp.TRUE)) {
461    
462                if (!this.version.equals(Stomp.V1_1)) {
463                    throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
464                }
465    
466                consumerInfo.setBrowser(true);
467            }
468    
469            String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
470            consumerInfo.setSelector(selector);
471    
472            IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
473    
474            consumerInfo.setDestination(translator.convertDestination(this, destination, true));
475    
476            StompSubscription stompSubscription;
477            if (!consumerInfo.isBrowser()) {
478                stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
479            } else {
480                stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
481            }
482            stompSubscription.setDestination(actualDest);
483    
484            String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
485            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
486                stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
487            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
488                stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
489            } else {
490                stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
491            }
492    
493            subscriptionsByConsumerId.put(id, stompSubscription);
494            // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
495            if (subscriptionId != null) {
496                subscriptions.put(subscriptionId, stompSubscription);
497            }
498    
499            // dispatch can beat the receipt so send it early
500            sendReceipt(command);
501            sendToActiveMQ(consumerInfo, null);
502        }
503    
504        protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
505            checkConnected();
506            Map<String, String> headers = command.getHeaders();
507    
508            ActiveMQDestination destination = null;
509            Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
510            if (o != null) {
511                destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
512            }
513    
514            String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
515            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
516                throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
517            }
518    
519            if (subscriptionId == null && destination == null) {
520                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
521            }
522    
523            // check if it is a durable subscription
524            String durable = command.getHeaders().get("activemq.subscriptionName");
525            if (durable != null) {
526                RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
527                info.setClientId(durable);
528                info.setSubscriptionName(durable);
529                info.setConnectionId(connectionId);
530                sendToActiveMQ(info, createResponseHandler(command));
531                return;
532            }
533    
534            if (subscriptionId != null) {
535    
536                StompSubscription sub = this.subscriptions.remove(subscriptionId);
537                if (sub != null) {
538                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
539                    return;
540                }
541    
542            } else {
543    
544                // Unsubscribing using a destination is a bit weird if multiple subscriptions
545                // are created with the same destination.
546                for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
547                    StompSubscription sub = iter.next();
548                    if (destination != null && destination.equals(sub.getDestination())) {
549                        sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
550                        iter.remove();
551                        return;
552                    }
553                }
554            }
555    
556            throw new ProtocolException("No subscription matched.");
557        }
558    
559        ConnectionInfo connectionInfo = new ConnectionInfo();
560    
561        protected void onStompConnect(final StompFrame command) throws ProtocolException {
562    
563            if (connected.get()) {
564                throw new ProtocolException("Allready connected.");
565            }
566    
567            final Map<String, String> headers = command.getHeaders();
568    
569            // allow anyone to login for now
570            String login = headers.get(Stomp.Headers.Connect.LOGIN);
571            String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
572            String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
573            String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
574            String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
575    
576            if (accepts == null) {
577                accepts = Stomp.DEFAULT_VERSION;
578            }
579            if (heartBeat == null) {
580                heartBeat = defaultHeartBeat;
581            }
582    
583            HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
584            acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
585            if (acceptsVersions.isEmpty()) {
586                throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
587                                            Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
588            } else {
589                this.version = Collections.max(acceptsVersions);
590            }
591    
592            configureInactivityMonitor(heartBeat);
593    
594            IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
595            connectionInfo.setConnectionId(connectionId);
596            if (clientId != null) {
597                connectionInfo.setClientId(clientId);
598            } else {
599                connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
600            }
601    
602            connectionInfo.setResponseRequired(true);
603            connectionInfo.setUserName(login);
604            connectionInfo.setPassword(passcode);
605            connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
606    
607            sendToActiveMQ(connectionInfo, new ResponseHandler() {
608                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
609    
610                    if (response.isException()) {
611                        // If the connection attempt fails we close the socket.
612                        Throwable exception = ((ExceptionResponse)response).getException();
613                        handleException(exception, command);
614                        getStompTransport().onException(IOExceptionSupport.create(exception));
615                        return;
616                    }
617    
618                    final SessionInfo sessionInfo = new SessionInfo(sessionId);
619                    sendToActiveMQ(sessionInfo, null);
620    
621                    final ProducerInfo producerInfo = new ProducerInfo(producerId);
622                    sendToActiveMQ(producerInfo, new ResponseHandler() {
623                        public void onResponse(ProtocolConverter converter, Response response) throws IOException {
624    
625                            if (response.isException()) {
626                                // If the connection attempt fails we close the socket.
627                                Throwable exception = ((ExceptionResponse)response).getException();
628                                handleException(exception, command);
629                                getStompTransport().onException(IOExceptionSupport.create(exception));
630                            }
631    
632                            connected.set(true);
633                            HashMap<String, String> responseHeaders = new HashMap<String, String>();
634    
635                            responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
636                            String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
637                            if (requestId == null) {
638                                // TODO legacy
639                                requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
640                            }
641                            if (requestId != null) {
642                                // TODO legacy
643                                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
644                                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
645                            }
646    
647                            responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
648                            responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
649                                                String.format("%d,%d", hbWriteInterval, hbReadInterval));
650                            responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
651    
652                            StompFrame sc = new StompFrame();
653                            sc.setAction(Stomp.Responses.CONNECTED);
654                            sc.setHeaders(responseHeaders);
655                            sendToStomp(sc);
656    
657                            if (version.equals(Stomp.V1_1)) {
658                                StompWireFormat format = stompTransport.getWireFormat();
659                                if (format != null) {
660                                    format.setEncodingEnabled(true);
661                                }
662                            }
663                        }
664                    });
665    
666                }
667            });
668        }
669    
670        protected void onStompDisconnect(StompFrame command) throws ProtocolException {
671            checkConnected();
672            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
673            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
674            connected.set(false);
675        }
676    
677        protected void checkConnected() throws ProtocolException {
678            if (!connected.get()) {
679                throw new ProtocolException("Not connected.");
680            }
681        }
682    
683        /**
684         * Dispatch a ActiveMQ command
685         *
686         * @param command
687         * @throws IOException
688         */
689        public void onActiveMQCommand(Command command) throws IOException, JMSException {
690            if (command.isResponse()) {
691                Response response = (Response)command;
692                ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
693                if (rh != null) {
694                    rh.onResponse(this, response);
695                } else {
696                    // Pass down any unexpected errors. Should this close the connection?
697                    if (response.isException()) {
698                        Throwable exception = ((ExceptionResponse)response).getException();
699                        handleException(exception, null);
700                    }
701                }
702            } else if (command.isMessageDispatch()) {
703                MessageDispatch md = (MessageDispatch)command;
704                StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
705                if (sub != null) {
706                    sub.onMessageDispatch(md);
707                }
708            } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
709                stompTransport.sendToStomp(ping);
710            } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
711                // Pass down any unexpected async errors. Should this close the connection?
712                Throwable exception = ((ConnectionError)command).getException();
713                handleException(exception, null);
714            }
715        }
716    
717        public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
718            ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
719            return msg;
720        }
721    
722        public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
723            if (ignoreTransformation == true) {
724                return frameTranslator.convertMessage(this, message);
725            } else {
726                return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
727            }
728        }
729    
730        public StompTransport getStompTransport() {
731            return stompTransport;
732        }
733    
734        public ActiveMQDestination createTempDestination(String name, boolean topic) {
735            ActiveMQDestination rc = tempDestinations.get(name);
736            if( rc == null ) {
737                if (topic) {
738                    rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
739                } else {
740                    rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
741                }
742                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
743                tempDestinations.put(name, rc);
744                tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
745            }
746            return rc;
747        }
748    
749        public String getCreatedTempDestinationName(ActiveMQDestination destination) {
750            return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
751        }
752    
753        public String getDefaultHeartBeat() {
754            return defaultHeartBeat;
755        }
756    
757        public void setDefaultHeartBeat(String defaultHeartBeat) {
758            this.defaultHeartBeat = defaultHeartBeat;
759        }
760    
761        protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
762    
763            String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
764    
765            if (keepAliveOpts == null || keepAliveOpts.length != 2) {
766                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
767            } else {
768    
769                try {
770                    hbReadInterval = Long.parseLong(keepAliveOpts[0]);
771                    hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
772                } catch(NumberFormatException e) {
773                    throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
774                }
775    
776                try {
777    
778                    StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
779    
780                    monitor.setReadCheckTime(hbReadInterval);
781                    monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
782                    monitor.setWriteCheckTime(hbWriteInterval);
783    
784                    monitor.startMonitoring();
785    
786                } catch(Exception ex) {
787                    hbReadInterval = 0;
788                    hbWriteInterval = 0;
789                }
790    
791                if (LOG.isDebugEnabled()) {
792                    LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
793                }
794            }
795        }
796    
797        protected void sendReceipt(StompFrame command) {
798            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
799            if (receiptId != null) {
800                StompFrame sc = new StompFrame();
801                sc.setAction(Stomp.Responses.RECEIPT);
802                sc.setHeaders(new HashMap<String, String>(1));
803                sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
804                try {
805                    sendToStomp(sc);
806                } catch (IOException e) {
807                    LOG.warn("Could not send a receipt for " + command, e);
808                }
809            }
810        }
811    }