001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import org.apache.activemq.broker.BrokerContext; 020import org.apache.activemq.broker.BrokerContextAware; 021import org.apache.activemq.command.*; 022import org.apache.activemq.util.ByteArrayOutputStream; 023import org.apache.activemq.util.*; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import javax.jms.JMSException; 028import java.io.*; 029import java.util.*; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.atomic.AtomicBoolean; 032 033/** 034 * @author <a href="http://hiramchirino.com">chirino</a> 035 */ 036public class ProtocolConverter { 037 038 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class); 039 040 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 041 042 private static final String BROKER_VERSION; 043 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); 044 045 static { 046 InputStream in = null; 047 String version = "5.6.0"; 048 if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 049 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 050 try { 051 version = reader.readLine(); 052 } catch(Exception e) { 053 } 054 } 055 BROKER_VERSION = version; 056 } 057 058 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 059 private final SessionId sessionId = new SessionId(connectionId, -1); 060 private final ProducerId producerId = new ProducerId(sessionId, 1); 061 062 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 063 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 064 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 065 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 066 067 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 068 private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); 069 private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>(); 070 private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); 071 private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); 072 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); 073 private final StompTransport stompTransport; 074 075 private final Object commnadIdMutex = new Object(); 076 private int lastCommandId; 077 private final AtomicBoolean connected = new AtomicBoolean(false); 078 private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); 079 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 080 private final BrokerContext brokerContext; 081 private String version = "1.0"; 082 private long hbReadInterval; 083 private long hbWriteInterval; 084 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; 085 086 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { 087 this.stompTransport = stompTransport; 088 this.brokerContext = brokerContext; 089 } 090 091 protected int generateCommandId() { 092 synchronized (commnadIdMutex) { 093 return lastCommandId++; 094 } 095 } 096 097 protected ResponseHandler createResponseHandler(final StompFrame command) { 098 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 099 if (receiptId != null) { 100 return new ResponseHandler() { 101 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 102 if (response.isException()) { 103 // Generally a command can fail.. but that does not invalidate the connection. 104 // We report back the failure but we don't close the connection. 105 Throwable exception = ((ExceptionResponse)response).getException(); 106 handleException(exception, command); 107 } else { 108 StompFrame sc = new StompFrame(); 109 sc.setAction(Stomp.Responses.RECEIPT); 110 sc.setHeaders(new HashMap<String, String>(1)); 111 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 112 stompTransport.sendToStomp(sc); 113 } 114 } 115 }; 116 } 117 return null; 118 } 119 120 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 121 command.setCommandId(generateCommandId()); 122 if (handler != null) { 123 command.setResponseRequired(true); 124 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 125 } 126 stompTransport.sendToActiveMQ(command); 127 } 128 129 protected void sendToStomp(StompFrame command) throws IOException { 130 stompTransport.sendToStomp(command); 131 } 132 133 protected FrameTranslator findTranslator(String header) { 134 FrameTranslator translator = frameTranslator; 135 try { 136 if (header != null) { 137 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER 138 .newInstance(header); 139 if (translator instanceof BrokerContextAware) { 140 ((BrokerContextAware)translator).setBrokerContext(brokerContext); 141 } 142 } 143 } catch (Exception ignore) { 144 // if anything goes wrong use the default translator 145 } 146 147 return translator; 148 } 149 150 /** 151 * Convert a stomp command 152 * 153 * @param command 154 */ 155 public void onStompCommand(StompFrame command) throws IOException, JMSException { 156 try { 157 158 if (command.getClass() == StompFrameError.class) { 159 throw ((StompFrameError)command).getException(); 160 } 161 162 String action = command.getAction(); 163 if (action.startsWith(Stomp.Commands.SEND)) { 164 onStompSend(command); 165 } else if (action.startsWith(Stomp.Commands.ACK)) { 166 onStompAck(command); 167 } else if (action.startsWith(Stomp.Commands.NACK)) { 168 onStompNack(command); 169 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 170 onStompBegin(command); 171 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 172 onStompCommit(command); 173 } else if (action.startsWith(Stomp.Commands.ABORT)) { 174 onStompAbort(command); 175 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) { 176 onStompSubscribe(command); 177 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) { 178 onStompUnsubscribe(command); 179 } else if (action.startsWith(Stomp.Commands.CONNECT) || 180 action.startsWith(Stomp.Commands.STOMP)) { 181 onStompConnect(command); 182 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 183 onStompDisconnect(command); 184 } else { 185 throw new ProtocolException("Unknown STOMP action: " + action); 186 } 187 188 } catch (ProtocolException e) { 189 handleException(e, command); 190 // Some protocol errors can cause the connection to get closed. 191 if (e.isFatal()) { 192 getStompTransport().onException(e); 193 } 194 } 195 } 196 197 protected void handleException(Throwable exception, StompFrame command) throws IOException { 198 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 199 if (LOG.isDebugEnabled()) { 200 LOG.debug("Exception detail", exception); 201 } 202 203 // Let the stomp client know about any protocol errors. 204 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 205 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 206 exception.printStackTrace(stream); 207 stream.close(); 208 209 HashMap<String, String> headers = new HashMap<String, String>(); 210 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 211 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); 212 213 if (command != null) { 214 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 215 if (receiptId != null) { 216 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 217 } 218 } 219 220 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 221 sendToStomp(errorMessage); 222 } 223 224 protected void onStompSend(StompFrame command) throws IOException, JMSException { 225 checkConnected(); 226 227 Map<String, String> headers = command.getHeaders(); 228 String destination = headers.get(Stomp.Headers.Send.DESTINATION); 229 if (destination == null) { 230 throw new ProtocolException("SEND received without a Destination specified!"); 231 } 232 233 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 234 headers.remove("transaction"); 235 236 ActiveMQMessage message = convertMessage(command); 237 238 message.setProducerId(producerId); 239 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 240 message.setMessageId(id); 241 message.setJMSTimestamp(System.currentTimeMillis()); 242 243 if (stompTx != null) { 244 TransactionId activemqTx = transactions.get(stompTx); 245 if (activemqTx == null) { 246 throw new ProtocolException("Invalid transaction id: " + stompTx); 247 } 248 message.setTransactionId(activemqTx); 249 } 250 251 message.onSend(); 252 sendToActiveMQ(message, createResponseHandler(command)); 253 } 254 255 protected void onStompNack(StompFrame command) throws ProtocolException { 256 257 checkConnected(); 258 259 if (this.version.equals(Stomp.V1_0)) { 260 throw new ProtocolException("NACK received but connection is in v1.0 mode."); 261 } 262 263 Map<String, String> headers = command.getHeaders(); 264 265 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 266 if (subscriptionId == null) { 267 throw new ProtocolException("NACK received without a subscription id for acknowledge!"); 268 } 269 270 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 271 if (messageId == null) { 272 throw new ProtocolException("NACK received without a message-id to acknowledge!"); 273 } 274 275 TransactionId activemqTx = null; 276 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 277 if (stompTx != null) { 278 activemqTx = transactions.get(stompTx); 279 if (activemqTx == null) { 280 throw new ProtocolException("Invalid transaction id: " + stompTx); 281 } 282 } 283 284 if (subscriptionId != null) { 285 StompSubscription sub = this.subscriptions.get(subscriptionId); 286 if (sub != null) { 287 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); 288 if (ack != null) { 289 sendToActiveMQ(ack, createResponseHandler(command)); 290 } else { 291 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); 292 } 293 } 294 } 295 } 296 297 protected void onStompAck(StompFrame command) throws ProtocolException { 298 checkConnected(); 299 300 Map<String, String> headers = command.getHeaders(); 301 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 302 if (messageId == null) { 303 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 304 } 305 306 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 307 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { 308 throw new ProtocolException("ACK received without a subscription id for acknowledge!"); 309 } 310 311 TransactionId activemqTx = null; 312 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 313 if (stompTx != null) { 314 activemqTx = transactions.get(stompTx); 315 if (activemqTx == null) { 316 throw new ProtocolException("Invalid transaction id: " + stompTx); 317 } 318 } 319 320 boolean acked = false; 321 322 if (subscriptionId != null) { 323 324 StompSubscription sub = this.subscriptions.get(subscriptionId); 325 if (sub != null) { 326 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 327 if (ack != null) { 328 sendToActiveMQ(ack, createResponseHandler(command)); 329 acked = true; 330 } 331 } 332 333 } else { 334 335 // TODO: acking with just a message id is very bogus since the same message id 336 // could have been sent to 2 different subscriptions on the same Stomp connection. 337 // For example, when 2 subs are created on the same topic. 338 339 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 340 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 341 if (ack != null) { 342 sendToActiveMQ(ack, createResponseHandler(command)); 343 acked = true; 344 break; 345 } 346 } 347 } 348 349 if (!acked) { 350 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 351 } 352 } 353 354 protected void onStompBegin(StompFrame command) throws ProtocolException { 355 checkConnected(); 356 357 Map<String, String> headers = command.getHeaders(); 358 359 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 360 361 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 362 throw new ProtocolException("Must specify the transaction you are beginning"); 363 } 364 365 if (transactions.get(stompTx) != null) { 366 throw new ProtocolException("The transaction was allready started: " + stompTx); 367 } 368 369 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 370 transactions.put(stompTx, activemqTx); 371 372 TransactionInfo tx = new TransactionInfo(); 373 tx.setConnectionId(connectionId); 374 tx.setTransactionId(activemqTx); 375 tx.setType(TransactionInfo.BEGIN); 376 377 sendToActiveMQ(tx, createResponseHandler(command)); 378 } 379 380 protected void onStompCommit(StompFrame command) throws ProtocolException { 381 checkConnected(); 382 383 Map<String, String> headers = command.getHeaders(); 384 385 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 386 if (stompTx == null) { 387 throw new ProtocolException("Must specify the transaction you are committing"); 388 } 389 390 TransactionId activemqTx = transactions.remove(stompTx); 391 if (activemqTx == null) { 392 throw new ProtocolException("Invalid transaction id: " + stompTx); 393 } 394 395 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 396 sub.onStompCommit(activemqTx); 397 } 398 399 TransactionInfo tx = new TransactionInfo(); 400 tx.setConnectionId(connectionId); 401 tx.setTransactionId(activemqTx); 402 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 403 404 sendToActiveMQ(tx, createResponseHandler(command)); 405 } 406 407 protected void onStompAbort(StompFrame command) throws ProtocolException { 408 checkConnected(); 409 Map<String, String> headers = command.getHeaders(); 410 411 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 412 if (stompTx == null) { 413 throw new ProtocolException("Must specify the transaction you are committing"); 414 } 415 416 TransactionId activemqTx = transactions.remove(stompTx); 417 if (activemqTx == null) { 418 throw new ProtocolException("Invalid transaction id: " + stompTx); 419 } 420 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 421 try { 422 sub.onStompAbort(activemqTx); 423 } catch (Exception e) { 424 throw new ProtocolException("Transaction abort failed", false, e); 425 } 426 } 427 428 TransactionInfo tx = new TransactionInfo(); 429 tx.setConnectionId(connectionId); 430 tx.setTransactionId(activemqTx); 431 tx.setType(TransactionInfo.ROLLBACK); 432 433 sendToActiveMQ(tx, createResponseHandler(command)); 434 } 435 436 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 437 checkConnected(); 438 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 439 Map<String, String> headers = command.getHeaders(); 440 441 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 442 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 443 444 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { 445 throw new ProtocolException("SUBSCRIBE received without a subscription id!"); 446 } 447 448 ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); 449 450 if (actualDest == null) { 451 throw new ProtocolException("Invalid Destination."); 452 } 453 454 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 455 ConsumerInfo consumerInfo = new ConsumerInfo(id); 456 consumerInfo.setPrefetchSize(1000); 457 consumerInfo.setDispatchAsync(true); 458 459 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER); 460 if (browser != null && browser.equals(Stomp.TRUE)) { 461 462 if (!this.version.equals(Stomp.V1_1)) { 463 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!"); 464 } 465 466 consumerInfo.setBrowser(true); 467 } 468 469 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 470 consumerInfo.setSelector(selector); 471 472 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 473 474 consumerInfo.setDestination(translator.convertDestination(this, destination, true)); 475 476 StompSubscription stompSubscription; 477 if (!consumerInfo.isBrowser()) { 478 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 479 } else { 480 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 481 } 482 stompSubscription.setDestination(actualDest); 483 484 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 485 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 486 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 487 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 488 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 489 } else { 490 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 491 } 492 493 subscriptionsByConsumerId.put(id, stompSubscription); 494 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set. 495 if (subscriptionId != null) { 496 subscriptions.put(subscriptionId, stompSubscription); 497 } 498 499 // dispatch can beat the receipt so send it early 500 sendReceipt(command); 501 sendToActiveMQ(consumerInfo, null); 502 } 503 504 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 505 checkConnected(); 506 Map<String, String> headers = command.getHeaders(); 507 508 ActiveMQDestination destination = null; 509 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 510 if (o != null) { 511 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true); 512 } 513 514 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 515 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { 516 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); 517 } 518 519 if (subscriptionId == null && destination == null) { 520 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 521 } 522 523 // check if it is a durable subscription 524 String durable = command.getHeaders().get("activemq.subscriptionName"); 525 if (durable != null) { 526 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 527 info.setClientId(durable); 528 info.setSubscriptionName(durable); 529 info.setConnectionId(connectionId); 530 sendToActiveMQ(info, createResponseHandler(command)); 531 return; 532 } 533 534 if (subscriptionId != null) { 535 536 StompSubscription sub = this.subscriptions.remove(subscriptionId); 537 if (sub != null) { 538 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 539 return; 540 } 541 542 } else { 543 544 // Unsubscribing using a destination is a bit weird if multiple subscriptions 545 // are created with the same destination. 546 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 547 StompSubscription sub = iter.next(); 548 if (destination != null && destination.equals(sub.getDestination())) { 549 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 550 iter.remove(); 551 return; 552 } 553 } 554 } 555 556 throw new ProtocolException("No subscription matched."); 557 } 558 559 ConnectionInfo connectionInfo = new ConnectionInfo(); 560 561 protected void onStompConnect(final StompFrame command) throws ProtocolException { 562 563 if (connected.get()) { 564 throw new ProtocolException("Allready connected."); 565 } 566 567 final Map<String, String> headers = command.getHeaders(); 568 569 // allow anyone to login for now 570 String login = headers.get(Stomp.Headers.Connect.LOGIN); 571 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 572 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 573 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT); 574 String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION); 575 576 if (accepts == null) { 577 accepts = Stomp.DEFAULT_VERSION; 578 } 579 if (heartBeat == null) { 580 heartBeat = defaultHeartBeat; 581 } 582 583 HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA))); 584 acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS)); 585 if (acceptsVersions.isEmpty()) { 586 throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " + 587 Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true); 588 } else { 589 this.version = Collections.max(acceptsVersions); 590 } 591 592 configureInactivityMonitor(heartBeat); 593 594 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 595 connectionInfo.setConnectionId(connectionId); 596 if (clientId != null) { 597 connectionInfo.setClientId(clientId); 598 } else { 599 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 600 } 601 602 connectionInfo.setResponseRequired(true); 603 connectionInfo.setUserName(login); 604 connectionInfo.setPassword(passcode); 605 connectionInfo.setTransportContext(stompTransport.getPeerCertificates()); 606 607 sendToActiveMQ(connectionInfo, new ResponseHandler() { 608 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 609 610 if (response.isException()) { 611 // If the connection attempt fails we close the socket. 612 Throwable exception = ((ExceptionResponse)response).getException(); 613 handleException(exception, command); 614 getStompTransport().onException(IOExceptionSupport.create(exception)); 615 return; 616 } 617 618 final SessionInfo sessionInfo = new SessionInfo(sessionId); 619 sendToActiveMQ(sessionInfo, null); 620 621 final ProducerInfo producerInfo = new ProducerInfo(producerId); 622 sendToActiveMQ(producerInfo, new ResponseHandler() { 623 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 624 625 if (response.isException()) { 626 // If the connection attempt fails we close the socket. 627 Throwable exception = ((ExceptionResponse)response).getException(); 628 handleException(exception, command); 629 getStompTransport().onException(IOExceptionSupport.create(exception)); 630 } 631 632 connected.set(true); 633 HashMap<String, String> responseHeaders = new HashMap<String, String>(); 634 635 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 636 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 637 if (requestId == null) { 638 // TODO legacy 639 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 640 } 641 if (requestId != null) { 642 // TODO legacy 643 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 644 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 645 } 646 647 responseHeaders.put(Stomp.Headers.Connected.VERSION, version); 648 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT, 649 String.format("%d,%d", hbWriteInterval, hbReadInterval)); 650 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION); 651 652 StompFrame sc = new StompFrame(); 653 sc.setAction(Stomp.Responses.CONNECTED); 654 sc.setHeaders(responseHeaders); 655 sendToStomp(sc); 656 657 if (version.equals(Stomp.V1_1)) { 658 StompWireFormat format = stompTransport.getWireFormat(); 659 if (format != null) { 660 format.setEncodingEnabled(true); 661 } 662 } 663 } 664 }); 665 666 } 667 }); 668 } 669 670 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 671 checkConnected(); 672 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 673 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 674 connected.set(false); 675 } 676 677 protected void checkConnected() throws ProtocolException { 678 if (!connected.get()) { 679 throw new ProtocolException("Not connected."); 680 } 681 } 682 683 /** 684 * Dispatch a ActiveMQ command 685 * 686 * @param command 687 * @throws IOException 688 */ 689 public void onActiveMQCommand(Command command) throws IOException, JMSException { 690 if (command.isResponse()) { 691 Response response = (Response)command; 692 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 693 if (rh != null) { 694 rh.onResponse(this, response); 695 } else { 696 // Pass down any unexpected errors. Should this close the connection? 697 if (response.isException()) { 698 Throwable exception = ((ExceptionResponse)response).getException(); 699 handleException(exception, null); 700 } 701 } 702 } else if (command.isMessageDispatch()) { 703 MessageDispatch md = (MessageDispatch)command; 704 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 705 if (sub != null) { 706 sub.onMessageDispatch(md); 707 } 708 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { 709 stompTransport.sendToStomp(ping); 710 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 711 // Pass down any unexpected async errors. Should this close the connection? 712 Throwable exception = ((ConnectionError)command).getException(); 713 handleException(exception, null); 714 } 715 } 716 717 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 718 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 719 return msg; 720 } 721 722 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 723 if (ignoreTransformation == true) { 724 return frameTranslator.convertMessage(this, message); 725 } else { 726 return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); 727 } 728 } 729 730 public StompTransport getStompTransport() { 731 return stompTransport; 732 } 733 734 public ActiveMQDestination createTempDestination(String name, boolean topic) { 735 ActiveMQDestination rc = tempDestinations.get(name); 736 if( rc == null ) { 737 if (topic) { 738 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 739 } else { 740 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 741 } 742 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 743 tempDestinations.put(name, rc); 744 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 745 } 746 return rc; 747 } 748 749 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 750 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 751 } 752 753 public String getDefaultHeartBeat() { 754 return defaultHeartBeat; 755 } 756 757 public void setDefaultHeartBeat(String defaultHeartBeat) { 758 this.defaultHeartBeat = defaultHeartBeat; 759 } 760 761 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { 762 763 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); 764 765 if (keepAliveOpts == null || keepAliveOpts.length != 2) { 766 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 767 } else { 768 769 try { 770 hbReadInterval = Long.parseLong(keepAliveOpts[0]); 771 hbWriteInterval = Long.parseLong(keepAliveOpts[1]); 772 } catch(NumberFormatException e) { 773 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 774 } 775 776 try { 777 778 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); 779 780 monitor.setReadCheckTime(hbReadInterval); 781 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); 782 monitor.setWriteCheckTime(hbWriteInterval); 783 784 monitor.startMonitoring(); 785 786 } catch(Exception ex) { 787 hbReadInterval = 0; 788 hbWriteInterval = 0; 789 } 790 791 if (LOG.isDebugEnabled()) { 792 LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]"); 793 } 794 } 795 } 796 797 protected void sendReceipt(StompFrame command) { 798 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 799 if (receiptId != null) { 800 StompFrame sc = new StompFrame(); 801 sc.setAction(Stomp.Responses.RECEIPT); 802 sc.setHeaders(new HashMap<String, String>(1)); 803 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 804 try { 805 sendToStomp(sc); 806 } catch (IOException e) { 807 LOG.warn("Could not send a receipt for " + command, e); 808 } 809 } 810 } 811}