001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.EOFException; 020 import java.io.IOException; 021 import java.net.SocketException; 022 import java.net.URI; 023 import java.util.HashMap; 024 import java.util.Iterator; 025 import java.util.LinkedList; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Properties; 029 import java.util.concurrent.ConcurrentHashMap; 030 import java.util.concurrent.CopyOnWriteArrayList; 031 import java.util.concurrent.CountDownLatch; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.atomic.AtomicBoolean; 034 import java.util.concurrent.atomic.AtomicInteger; 035 import java.util.concurrent.atomic.AtomicReference; 036 import java.util.concurrent.locks.ReentrantReadWriteLock; 037 038 import javax.transaction.xa.XAResource; 039 import org.apache.activemq.advisory.AdvisorySupport; 040 import org.apache.activemq.broker.ft.MasterBroker; 041 import org.apache.activemq.broker.region.ConnectionStatistics; 042 import org.apache.activemq.broker.region.RegionBroker; 043 import org.apache.activemq.command.*; 044 import org.apache.activemq.network.DemandForwardingBridge; 045 import org.apache.activemq.network.MBeanNetworkListener; 046 import org.apache.activemq.network.NetworkBridgeConfiguration; 047 import org.apache.activemq.network.NetworkBridgeFactory; 048 import org.apache.activemq.security.MessageAuthorizationPolicy; 049 import org.apache.activemq.state.CommandVisitor; 050 import org.apache.activemq.state.ConnectionState; 051 import org.apache.activemq.state.ConsumerState; 052 import org.apache.activemq.state.ProducerState; 053 import org.apache.activemq.state.SessionState; 054 import org.apache.activemq.state.TransactionState; 055 import org.apache.activemq.thread.DefaultThreadPools; 056 import org.apache.activemq.thread.Task; 057 import org.apache.activemq.thread.TaskRunner; 058 import org.apache.activemq.thread.TaskRunnerFactory; 059 import org.apache.activemq.transaction.Transaction; 060 import org.apache.activemq.transport.DefaultTransportListener; 061 import org.apache.activemq.transport.ResponseCorrelator; 062 import org.apache.activemq.transport.Transport; 063 import org.apache.activemq.transport.TransportDisposedIOException; 064 import org.apache.activemq.transport.TransportFactory; 065 import org.apache.activemq.util.IntrospectionSupport; 066 import org.apache.activemq.util.MarshallingSupport; 067 import org.apache.activemq.util.ServiceSupport; 068 import org.apache.activemq.util.URISupport; 069 import org.slf4j.Logger; 070 import org.slf4j.LoggerFactory; 071 import org.slf4j.MDC; 072 073 public class TransportConnection implements Connection, Task, CommandVisitor { 074 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 075 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 076 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 077 // Keeps track of the broker and connector that created this connection. 078 protected final Broker broker; 079 protected final TransportConnector connector; 080 // Keeps track of the state of the connections. 081 // protected final ConcurrentHashMap localConnectionStates=new 082 // ConcurrentHashMap(); 083 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 084 // The broker and wireformat info that was exchanged. 085 protected BrokerInfo brokerInfo; 086 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 087 protected TaskRunner taskRunner; 088 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>(); 089 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 090 private MasterBroker masterBroker; 091 private final Transport transport; 092 private MessageAuthorizationPolicy messageAuthorizationPolicy; 093 private WireFormatInfo wireFormatInfo; 094 // Used to do async dispatch.. this should perhaps be pushed down into the 095 // transport layer.. 096 private boolean inServiceException; 097 private final ConnectionStatistics statistics = new ConnectionStatistics(); 098 private boolean manageable; 099 private boolean slow; 100 private boolean markedCandidate; 101 private boolean blockedCandidate; 102 private boolean blocked; 103 private boolean connected; 104 private boolean active; 105 private boolean starting; 106 private boolean pendingStop; 107 private long timeStamp; 108 private final AtomicBoolean stopping = new AtomicBoolean(false); 109 private final CountDownLatch stopped = new CountDownLatch(1); 110 private final AtomicBoolean asyncException = new AtomicBoolean(false); 111 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 112 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 113 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 114 private ConnectionContext context; 115 private boolean networkConnection; 116 private boolean faultTolerantConnection; 117 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 118 private DemandForwardingBridge duplexBridge; 119 private final TaskRunnerFactory taskRunnerFactory; 120 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 121 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 122 private String duplexNetworkConnectorId; 123 private Throwable stopError = null; 124 125 /** 126 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 127 * else commands are sent async. 128 */ 129 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 130 TaskRunnerFactory taskRunnerFactory) { 131 this.connector = connector; 132 this.broker = broker; 133 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 134 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 135 brokerConnectionStates = rb.getConnectionStates(); 136 if (connector != null) { 137 this.statistics.setParent(connector.getStatistics()); 138 } 139 this.taskRunnerFactory = taskRunnerFactory; 140 this.transport = transport; 141 this.transport.setTransportListener(new DefaultTransportListener() { 142 @Override 143 public void onCommand(Object o) { 144 serviceLock.readLock().lock(); 145 try { 146 if (!(o instanceof Command)) { 147 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 148 } 149 Command command = (Command) o; 150 Response response = service(command); 151 if (response != null) { 152 dispatchSync(response); 153 } 154 } finally { 155 serviceLock.readLock().unlock(); 156 } 157 } 158 159 @Override 160 public void onException(IOException exception) { 161 serviceLock.readLock().lock(); 162 try { 163 serviceTransportException(exception); 164 } finally { 165 serviceLock.readLock().unlock(); 166 } 167 } 168 }); 169 connected = true; 170 } 171 172 /** 173 * Returns the number of messages to be dispatched to this connection 174 * 175 * @return size of dispatch queue 176 */ 177 public int getDispatchQueueSize() { 178 synchronized (dispatchQueue) { 179 return dispatchQueue.size(); 180 } 181 } 182 183 public void serviceTransportException(IOException e) { 184 BrokerService bService = connector.getBrokerService(); 185 if (bService.isShutdownOnSlaveFailure()) { 186 if (brokerInfo != null) { 187 if (brokerInfo.isSlaveBroker()) { 188 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e); 189 try { 190 doStop(); 191 bService.stop(); 192 } catch (Exception ex) { 193 LOG.warn("Failed to stop the master", ex); 194 } 195 } 196 } 197 } 198 if (!stopping.get()) { 199 transportException.set(e); 200 if (TRANSPORTLOG.isDebugEnabled()) { 201 TRANSPORTLOG.debug(this + " failed: " + e, e); 202 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 203 TRANSPORTLOG.warn(this + " failed: " + e); 204 } 205 stopAsync(); 206 } 207 } 208 209 private boolean expected(IOException e) { 210 return isStomp() && 211 ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 212 } 213 214 private boolean isStomp() { 215 URI uri = connector.getUri(); 216 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 217 } 218 219 /** 220 * Calls the serviceException method in an async thread. Since handling a 221 * service exception closes a socket, we should not tie up broker threads 222 * since client sockets may hang or cause deadlocks. 223 */ 224 public void serviceExceptionAsync(final IOException e) { 225 if (asyncException.compareAndSet(false, true)) { 226 new Thread("Async Exception Handler") { 227 @Override 228 public void run() { 229 serviceException(e); 230 } 231 }.start(); 232 } 233 } 234 235 /** 236 * Closes a clients connection due to a detected error. Errors are ignored 237 * if: the client is closing or broker is closing. Otherwise, the connection 238 * error transmitted to the client before stopping it's transport. 239 */ 240 public void serviceException(Throwable e) { 241 // are we a transport exception such as not being able to dispatch 242 // synchronously to a transport 243 if (e instanceof IOException) { 244 serviceTransportException((IOException) e); 245 } else if (e.getClass() == BrokerStoppedException.class) { 246 // Handle the case where the broker is stopped 247 // But the client is still connected. 248 if (!stopping.get()) { 249 if (SERVICELOG.isDebugEnabled()) { 250 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 251 } 252 ConnectionError ce = new ConnectionError(); 253 ce.setException(e); 254 dispatchSync(ce); 255 // Record the error that caused the transport to stop 256 this.stopError = e; 257 // Wait a little bit to try to get the output buffer to flush 258 // the exption notification to the client. 259 try { 260 Thread.sleep(500); 261 } catch (InterruptedException ie) { 262 Thread.currentThread().interrupt(); 263 } 264 // Worst case is we just kill the connection before the 265 // notification gets to him. 266 stopAsync(); 267 } 268 } else if (!stopping.get() && !inServiceException) { 269 inServiceException = true; 270 try { 271 SERVICELOG.warn("Async error occurred: " + e, e); 272 ConnectionError ce = new ConnectionError(); 273 ce.setException(e); 274 if (pendingStop) { 275 dispatchSync(ce); 276 } else { 277 dispatchAsync(ce); 278 } 279 } finally { 280 inServiceException = false; 281 } 282 } 283 } 284 285 public Response service(Command command) { 286 MDC.put("activemq.connector", connector.getUri().toString()); 287 Response response = null; 288 boolean responseRequired = command.isResponseRequired(); 289 int commandId = command.getCommandId(); 290 try { 291 if (!pendingStop) { 292 response = command.visit(this); 293 } else { 294 response = new ExceptionResponse(this.stopError); 295 } 296 } catch (Throwable e) { 297 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 298 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 299 + " command: " + command + ", exception: " + e, e); 300 } 301 302 if (e instanceof java.lang.SecurityException) { 303 // still need to close this down - in case the peer of this transport doesn't play nice 304 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 305 } 306 307 if (responseRequired) { 308 response = new ExceptionResponse(e); 309 } else { 310 serviceException(e); 311 } 312 } 313 if (responseRequired) { 314 if (response == null) { 315 response = new Response(); 316 } 317 response.setCorrelationId(commandId); 318 } 319 // The context may have been flagged so that the response is not 320 // sent. 321 if (context != null) { 322 if (context.isDontSendReponse()) { 323 context.setDontSendReponse(false); 324 response = null; 325 } 326 context = null; 327 } 328 MDC.remove("activemq.connector"); 329 return response; 330 } 331 332 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 333 return null; 334 } 335 336 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 337 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 338 return null; 339 } 340 341 public Response processWireFormat(WireFormatInfo info) throws Exception { 342 wireFormatInfo = info; 343 protocolVersion.set(info.getVersion()); 344 return null; 345 } 346 347 public Response processShutdown(ShutdownInfo info) throws Exception { 348 stopAsync(); 349 return null; 350 } 351 352 public Response processFlush(FlushCommand command) throws Exception { 353 return null; 354 } 355 356 public Response processBeginTransaction(TransactionInfo info) throws Exception { 357 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 358 context = null; 359 if (cs != null) { 360 context = cs.getContext(); 361 } 362 if (cs == null) { 363 throw new NullPointerException("Context is null"); 364 } 365 // Avoid replaying dup commands 366 if (cs.getTransactionState(info.getTransactionId()) == null) { 367 cs.addTransactionState(info.getTransactionId()); 368 broker.beginTransaction(context, info.getTransactionId()); 369 } 370 return null; 371 } 372 373 public Response processEndTransaction(TransactionInfo info) throws Exception { 374 // No need to do anything. This packet is just sent by the client 375 // make sure he is synced with the server as commit command could 376 // come from a different connection. 377 return null; 378 } 379 380 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 381 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 382 context = null; 383 if (cs != null) { 384 context = cs.getContext(); 385 } 386 if (cs == null) { 387 throw new NullPointerException("Context is null"); 388 } 389 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 390 if (transactionState == null) { 391 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 392 + info.getTransactionId()); 393 } 394 // Avoid dups. 395 if (!transactionState.isPrepared()) { 396 transactionState.setPrepared(true); 397 int result = broker.prepareTransaction(context, info.getTransactionId()); 398 transactionState.setPreparedResult(result); 399 if (result == XAResource.XA_RDONLY) { 400 // we are done, no further rollback or commit from TM 401 cs.removeTransactionState(info.getTransactionId()); 402 } 403 IntegerResponse response = new IntegerResponse(result); 404 return response; 405 } else { 406 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 407 return response; 408 } 409 } 410 411 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 412 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 413 context = cs.getContext(); 414 cs.removeTransactionState(info.getTransactionId()); 415 broker.commitTransaction(context, info.getTransactionId(), true); 416 return null; 417 } 418 419 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 420 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 421 context = cs.getContext(); 422 cs.removeTransactionState(info.getTransactionId()); 423 broker.commitTransaction(context, info.getTransactionId(), false); 424 return null; 425 } 426 427 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 428 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 429 context = cs.getContext(); 430 cs.removeTransactionState(info.getTransactionId()); 431 broker.rollbackTransaction(context, info.getTransactionId()); 432 return null; 433 } 434 435 public Response processForgetTransaction(TransactionInfo info) throws Exception { 436 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 437 context = cs.getContext(); 438 broker.forgetTransaction(context, info.getTransactionId()); 439 return null; 440 } 441 442 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 443 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 444 context = cs.getContext(); 445 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 446 return new DataArrayResponse(preparedTransactions); 447 } 448 449 public Response processMessage(Message messageSend) throws Exception { 450 ProducerId producerId = messageSend.getProducerId(); 451 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 452 if (producerExchange.canDispatch(messageSend)) { 453 broker.send(producerExchange, messageSend); 454 } 455 return null; 456 } 457 458 public Response processMessageAck(MessageAck ack) throws Exception { 459 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 460 if (consumerExchange != null) { 461 broker.acknowledge(consumerExchange, ack); 462 } 463 return null; 464 } 465 466 public Response processMessagePull(MessagePull pull) throws Exception { 467 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 468 } 469 470 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 471 broker.processDispatchNotification(notification); 472 return null; 473 } 474 475 public Response processAddDestination(DestinationInfo info) throws Exception { 476 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 477 broker.addDestinationInfo(cs.getContext(), info); 478 if (info.getDestination().isTemporary()) { 479 cs.addTempDestination(info); 480 } 481 return null; 482 } 483 484 public Response processRemoveDestination(DestinationInfo info) throws Exception { 485 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 486 broker.removeDestinationInfo(cs.getContext(), info); 487 if (info.getDestination().isTemporary()) { 488 cs.removeTempDestination(info.getDestination()); 489 } 490 return null; 491 } 492 493 public Response processAddProducer(ProducerInfo info) throws Exception { 494 SessionId sessionId = info.getProducerId().getParentId(); 495 ConnectionId connectionId = sessionId.getParentId(); 496 TransportConnectionState cs = lookupConnectionState(connectionId); 497 if (cs == null) { 498 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 499 + connectionId); 500 } 501 SessionState ss = cs.getSessionState(sessionId); 502 if (ss == null) { 503 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 504 + sessionId); 505 } 506 // Avoid replaying dup commands 507 if (!ss.getProducerIds().contains(info.getProducerId())) { 508 ActiveMQDestination destination = info.getDestination(); 509 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 510 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 511 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 512 } 513 } 514 broker.addProducer(cs.getContext(), info); 515 try { 516 ss.addProducer(info); 517 } catch (IllegalStateException e) { 518 broker.removeProducer(cs.getContext(), info); 519 } 520 521 } 522 return null; 523 } 524 525 public Response processRemoveProducer(ProducerId id) throws Exception { 526 SessionId sessionId = id.getParentId(); 527 ConnectionId connectionId = sessionId.getParentId(); 528 TransportConnectionState cs = lookupConnectionState(connectionId); 529 SessionState ss = cs.getSessionState(sessionId); 530 if (ss == null) { 531 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 532 + sessionId); 533 } 534 ProducerState ps = ss.removeProducer(id); 535 if (ps == null) { 536 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 537 } 538 removeProducerBrokerExchange(id); 539 broker.removeProducer(cs.getContext(), ps.getInfo()); 540 return null; 541 } 542 543 public Response processAddConsumer(ConsumerInfo info) throws Exception { 544 SessionId sessionId = info.getConsumerId().getParentId(); 545 ConnectionId connectionId = sessionId.getParentId(); 546 TransportConnectionState cs = lookupConnectionState(connectionId); 547 if (cs == null) { 548 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 549 + connectionId); 550 } 551 SessionState ss = cs.getSessionState(sessionId); 552 if (ss == null) { 553 throw new IllegalStateException(broker.getBrokerName() 554 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 555 } 556 // Avoid replaying dup commands 557 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 558 ActiveMQDestination destination = info.getDestination(); 559 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 560 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 561 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 562 } 563 } 564 565 broker.addConsumer(cs.getContext(), info); 566 try { 567 ss.addConsumer(info); 568 addConsumerBrokerExchange(info.getConsumerId()); 569 } catch (IllegalStateException e) { 570 broker.removeConsumer(cs.getContext(), info); 571 } 572 573 } 574 return null; 575 } 576 577 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 578 SessionId sessionId = id.getParentId(); 579 ConnectionId connectionId = sessionId.getParentId(); 580 TransportConnectionState cs = lookupConnectionState(connectionId); 581 if (cs == null) { 582 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 583 + connectionId); 584 } 585 SessionState ss = cs.getSessionState(sessionId); 586 if (ss == null) { 587 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 588 + sessionId); 589 } 590 ConsumerState consumerState = ss.removeConsumer(id); 591 if (consumerState == null) { 592 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 593 } 594 ConsumerInfo info = consumerState.getInfo(); 595 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 596 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 597 removeConsumerBrokerExchange(id); 598 return null; 599 } 600 601 public Response processAddSession(SessionInfo info) throws Exception { 602 ConnectionId connectionId = info.getSessionId().getParentId(); 603 TransportConnectionState cs = lookupConnectionState(connectionId); 604 // Avoid replaying dup commands 605 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 606 broker.addSession(cs.getContext(), info); 607 try { 608 cs.addSession(info); 609 } catch (IllegalStateException e) { 610 e.printStackTrace(); 611 broker.removeSession(cs.getContext(), info); 612 } 613 } 614 return null; 615 } 616 617 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 618 ConnectionId connectionId = id.getParentId(); 619 TransportConnectionState cs = lookupConnectionState(connectionId); 620 if (cs == null) { 621 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 622 } 623 SessionState session = cs.getSessionState(id); 624 if (session == null) { 625 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 626 } 627 // Don't let new consumers or producers get added while we are closing 628 // this down. 629 session.shutdown(); 630 // Cascade the connection stop to the consumers and producers. 631 for (ConsumerId consumerId : session.getConsumerIds()) { 632 try { 633 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 634 } catch (Throwable e) { 635 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e); 636 } 637 } 638 for (ProducerId producerId : session.getProducerIds()) { 639 try { 640 processRemoveProducer(producerId); 641 } catch (Throwable e) { 642 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e); 643 } 644 } 645 cs.removeSession(id); 646 broker.removeSession(cs.getContext(), session.getInfo()); 647 return null; 648 } 649 650 public Response processAddConnection(ConnectionInfo info) throws Exception { 651 // if the broker service has slave attached, wait for the slave to be 652 // attached to allow client connection. slave connection is fine 653 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave() 654 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) { 655 ServiceSupport.dispose(transport); 656 return new ExceptionResponse(new Exception("Master's slave not attached yet.")); 657 } 658 // Older clients should have been defaulting this field to true.. but 659 // they were not. 660 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 661 info.setClientMaster(true); 662 } 663 TransportConnectionState state; 664 // Make sure 2 concurrent connections by the same ID only generate 1 665 // TransportConnectionState object. 666 synchronized (brokerConnectionStates) { 667 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 668 if (state == null) { 669 state = new TransportConnectionState(info, this); 670 brokerConnectionStates.put(info.getConnectionId(), state); 671 } 672 state.incrementReference(); 673 } 674 // If there are 2 concurrent connections for the same connection id, 675 // then last one in wins, we need to sync here 676 // to figure out the winner. 677 synchronized (state.getConnectionMutex()) { 678 if (state.getConnection() != this) { 679 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); 680 state.getConnection().stop(); 681 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " 682 + state.getConnection().getRemoteAddress()); 683 state.setConnection(this); 684 state.reset(info); 685 } 686 } 687 registerConnectionState(info.getConnectionId(), state); 688 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info); 689 this.faultTolerantConnection = info.isFaultTolerant(); 690 // Setup the context. 691 String clientId = info.getClientId(); 692 context = new ConnectionContext(); 693 context.setBroker(broker); 694 context.setClientId(clientId); 695 context.setClientMaster(info.isClientMaster()); 696 context.setConnection(this); 697 context.setConnectionId(info.getConnectionId()); 698 context.setConnector(connector); 699 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 700 context.setNetworkConnection(networkConnection); 701 context.setFaultTolerant(faultTolerantConnection); 702 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 703 context.setUserName(info.getUserName()); 704 context.setWireFormatInfo(wireFormatInfo); 705 context.setReconnect(info.isFailoverReconnect()); 706 this.manageable = info.isManageable(); 707 context.setConnectionState(state); 708 state.setContext(context); 709 state.setConnection(this); 710 if (info.getClientIp() == null) { 711 info.setClientIp(getRemoteAddress()); 712 } 713 714 try { 715 broker.addConnection(context, info); 716 } catch (Exception e) { 717 synchronized (brokerConnectionStates) { 718 brokerConnectionStates.remove(info.getConnectionId()); 719 } 720 unregisterConnectionState(info.getConnectionId()); 721 LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString()); 722 if (LOG.isDebugEnabled()) { 723 LOG.debug("Exception detail:", e); 724 } 725 throw e; 726 } 727 if (info.isManageable()) { 728 // send ConnectionCommand 729 ConnectionControl command = this.connector.getConnectionControl(); 730 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 731 if (info.isFailoverReconnect()) { 732 command.setRebalanceConnection(false); 733 } 734 dispatchAsync(command); 735 } 736 return null; 737 } 738 739 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 740 throws InterruptedException { 741 LOG.debug("remove connection id: " + id); 742 TransportConnectionState cs = lookupConnectionState(id); 743 if (cs != null) { 744 // Don't allow things to be added to the connection state while we 745 // are shutting down. 746 cs.shutdown(); 747 // Cascade the connection stop to the sessions. 748 for (SessionId sessionId : cs.getSessionIds()) { 749 try { 750 processRemoveSession(sessionId, lastDeliveredSequenceId); 751 } catch (Throwable e) { 752 SERVICELOG.warn("Failed to remove session " + sessionId, e); 753 } 754 } 755 // Cascade the connection stop to temp destinations. 756 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 757 DestinationInfo di = iter.next(); 758 try { 759 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 760 } catch (Throwable e) { 761 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e); 762 } 763 iter.remove(); 764 } 765 try { 766 broker.removeConnection(cs.getContext(), cs.getInfo(), null); 767 } catch (Throwable e) { 768 SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString()); 769 if (LOG.isDebugEnabled()) { 770 SERVICELOG.debug("Exception detail:", e); 771 } 772 } 773 TransportConnectionState state = unregisterConnectionState(id); 774 if (state != null) { 775 synchronized (brokerConnectionStates) { 776 // If we are the last reference, we should remove the state 777 // from the broker. 778 if (state.decrementReference() == 0) { 779 brokerConnectionStates.remove(id); 780 } 781 } 782 } 783 } 784 return null; 785 } 786 787 public Response processProducerAck(ProducerAck ack) throws Exception { 788 // A broker should not get ProducerAck messages. 789 return null; 790 } 791 792 public Connector getConnector() { 793 return connector; 794 } 795 796 public void dispatchSync(Command message) { 797 try { 798 processDispatch(message); 799 } catch (IOException e) { 800 serviceExceptionAsync(e); 801 } 802 } 803 804 public void dispatchAsync(Command message) { 805 if (!stopping.get()) { 806 if (taskRunner == null) { 807 dispatchSync(message); 808 } else { 809 synchronized (dispatchQueue) { 810 dispatchQueue.add(message); 811 } 812 try { 813 taskRunner.wakeup(); 814 } catch (InterruptedException e) { 815 Thread.currentThread().interrupt(); 816 } 817 } 818 } else { 819 if (message.isMessageDispatch()) { 820 MessageDispatch md = (MessageDispatch) message; 821 Runnable sub = md.getTransmitCallback(); 822 broker.postProcessDispatch(md); 823 if (sub != null) { 824 sub.run(); 825 } 826 } 827 } 828 } 829 830 protected void processDispatch(Command command) throws IOException { 831 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 832 try { 833 if (!stopping.get()) { 834 if (messageDispatch != null) { 835 broker.preProcessDispatch(messageDispatch); 836 } 837 dispatch(command); 838 } 839 } finally { 840 if (messageDispatch != null) { 841 Runnable sub = messageDispatch.getTransmitCallback(); 842 broker.postProcessDispatch(messageDispatch); 843 if (sub != null) { 844 sub.run(); 845 } 846 } 847 } 848 } 849 850 public boolean iterate() { 851 try { 852 if (pendingStop || stopping.get()) { 853 if (dispatchStopped.compareAndSet(false, true)) { 854 if (transportException.get() == null) { 855 try { 856 dispatch(new ShutdownInfo()); 857 } catch (Throwable ignore) { 858 } 859 } 860 dispatchStoppedLatch.countDown(); 861 } 862 return false; 863 } 864 if (!dispatchStopped.get()) { 865 Command command = null; 866 synchronized (dispatchQueue) { 867 if (dispatchQueue.isEmpty()) { 868 return false; 869 } 870 command = dispatchQueue.remove(0); 871 } 872 processDispatch(command); 873 return true; 874 } 875 return false; 876 } catch (IOException e) { 877 if (dispatchStopped.compareAndSet(false, true)) { 878 dispatchStoppedLatch.countDown(); 879 } 880 serviceExceptionAsync(e); 881 return false; 882 } 883 } 884 885 /** 886 * Returns the statistics for this connection 887 */ 888 public ConnectionStatistics getStatistics() { 889 return statistics; 890 } 891 892 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 893 return messageAuthorizationPolicy; 894 } 895 896 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 897 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 898 } 899 900 public boolean isManageable() { 901 return manageable; 902 } 903 904 public void start() throws Exception { 905 try { 906 synchronized (this) { 907 starting = true; 908 if (taskRunnerFactory != null) { 909 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 910 + getRemoteAddress()); 911 } else { 912 taskRunner = null; 913 } 914 transport.start(); 915 active = true; 916 BrokerInfo info = connector.getBrokerInfo().copy(); 917 if (connector.isUpdateClusterClients()) { 918 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 919 } else { 920 info.setPeerBrokerInfos(null); 921 } 922 dispatchAsync(info); 923 924 connector.onStarted(this); 925 } 926 } catch (Exception e) { 927 // Force clean up on an error starting up. 928 pendingStop = true; 929 throw e; 930 } finally { 931 // stop() can be called from within the above block, 932 // but we want to be sure start() completes before 933 // stop() runs, so queue the stop until right now: 934 setStarting(false); 935 if (isPendingStop()) { 936 LOG.debug("Calling the delayed stop() after start() " + this); 937 stop(); 938 } 939 } 940 } 941 942 public void stop() throws Exception { 943 stopAsync(); 944 while (!stopped.await(5, TimeUnit.SECONDS)) { 945 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown."); 946 } 947 } 948 949 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 950 if (waitTime > 0) { 951 synchronized (this) { 952 pendingStop = true; 953 stopError = cause; 954 } 955 try { 956 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 957 public void run() { 958 try { 959 Thread.sleep(waitTime); 960 stopAsync(); 961 LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason); 962 } catch (InterruptedException e) { 963 } 964 } 965 }, "delayedStop:" + transport.getRemoteAddress()); 966 } catch (Throwable t) { 967 LOG.warn("cannot create stopAsync :", t); 968 } 969 } 970 } 971 972 public void stopAsync() { 973 // If we're in the middle of starting then go no further... for now. 974 synchronized (this) { 975 pendingStop = true; 976 if (starting) { 977 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 978 return; 979 } 980 } 981 if (stopping.compareAndSet(false, true)) { 982 // Let all the connection contexts know we are shutting down 983 // so that in progress operations can notice and unblock. 984 List<TransportConnectionState> connectionStates = listConnectionStates(); 985 for (TransportConnectionState cs : connectionStates) { 986 ConnectionContext connectionContext = cs.getContext(); 987 if (connectionContext != null) { 988 connectionContext.getStopping().set(true); 989 } 990 } 991 try { 992 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 993 public void run() { 994 serviceLock.writeLock().lock(); 995 try { 996 doStop(); 997 } catch (Throwable e) { 998 LOG.debug("Error occurred while shutting down a connection " + this, e); 999 } finally { 1000 stopped.countDown(); 1001 serviceLock.writeLock().unlock(); 1002 } 1003 } 1004 }, "StopAsync:" + transport.getRemoteAddress()); 1005 } catch (Throwable t) { 1006 LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); 1007 stopped.countDown(); 1008 } 1009 } 1010 } 1011 1012 @Override 1013 public String toString() { 1014 return "Transport Connection to: " + transport.getRemoteAddress(); 1015 } 1016 1017 protected void doStop() throws Exception, InterruptedException { 1018 LOG.debug("Stopping connection: " + transport.getRemoteAddress()); 1019 connector.onStopped(this); 1020 try { 1021 synchronized (this) { 1022 if (masterBroker != null) { 1023 masterBroker.stop(); 1024 } 1025 if (duplexBridge != null) { 1026 duplexBridge.stop(); 1027 } 1028 } 1029 } catch (Exception ignore) { 1030 LOG.trace("Exception caught stopping", ignore); 1031 } 1032 try { 1033 transport.stop(); 1034 LOG.debug("Stopped transport: " + transport.getRemoteAddress()); 1035 } catch (Exception e) { 1036 LOG.debug("Could not stop transport: " + e, e); 1037 } 1038 if (taskRunner != null) { 1039 taskRunner.shutdown(1); 1040 } 1041 active = false; 1042 // Run the MessageDispatch callbacks so that message references get 1043 // cleaned up. 1044 synchronized (dispatchQueue) { 1045 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1046 Command command = iter.next(); 1047 if (command.isMessageDispatch()) { 1048 MessageDispatch md = (MessageDispatch) command; 1049 Runnable sub = md.getTransmitCallback(); 1050 broker.postProcessDispatch(md); 1051 if (sub != null) { 1052 sub.run(); 1053 } 1054 } 1055 } 1056 dispatchQueue.clear(); 1057 } 1058 // 1059 // Remove all logical connection associated with this connection 1060 // from the broker. 1061 if (!broker.isStopped()) { 1062 List<TransportConnectionState> connectionStates = listConnectionStates(); 1063 connectionStates = listConnectionStates(); 1064 for (TransportConnectionState cs : connectionStates) { 1065 cs.getContext().getStopping().set(true); 1066 try { 1067 LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); 1068 processRemoveConnection(cs.getInfo().getConnectionId(), 0l); 1069 } catch (Throwable ignore) { 1070 ignore.printStackTrace(); 1071 } 1072 } 1073 } 1074 LOG.debug("Connection Stopped: " + getRemoteAddress()); 1075 } 1076 1077 /** 1078 * @return Returns the blockedCandidate. 1079 */ 1080 public boolean isBlockedCandidate() { 1081 return blockedCandidate; 1082 } 1083 1084 /** 1085 * @param blockedCandidate The blockedCandidate to set. 1086 */ 1087 public void setBlockedCandidate(boolean blockedCandidate) { 1088 this.blockedCandidate = blockedCandidate; 1089 } 1090 1091 /** 1092 * @return Returns the markedCandidate. 1093 */ 1094 public boolean isMarkedCandidate() { 1095 return markedCandidate; 1096 } 1097 1098 /** 1099 * @param markedCandidate The markedCandidate to set. 1100 */ 1101 public void setMarkedCandidate(boolean markedCandidate) { 1102 this.markedCandidate = markedCandidate; 1103 if (!markedCandidate) { 1104 timeStamp = 0; 1105 blockedCandidate = false; 1106 } 1107 } 1108 1109 /** 1110 * @param slow The slow to set. 1111 */ 1112 public void setSlow(boolean slow) { 1113 this.slow = slow; 1114 } 1115 1116 /** 1117 * @return true if the Connection is slow 1118 */ 1119 public boolean isSlow() { 1120 return slow; 1121 } 1122 1123 /** 1124 * @return true if the Connection is potentially blocked 1125 */ 1126 public boolean isMarkedBlockedCandidate() { 1127 return markedCandidate; 1128 } 1129 1130 /** 1131 * Mark the Connection, so we can deem if it's collectable on the next sweep 1132 */ 1133 public void doMark() { 1134 if (timeStamp == 0) { 1135 timeStamp = System.currentTimeMillis(); 1136 } 1137 } 1138 1139 /** 1140 * @return if after being marked, the Connection is still writing 1141 */ 1142 public boolean isBlocked() { 1143 return blocked; 1144 } 1145 1146 /** 1147 * @return true if the Connection is connected 1148 */ 1149 public boolean isConnected() { 1150 return connected; 1151 } 1152 1153 /** 1154 * @param blocked The blocked to set. 1155 */ 1156 public void setBlocked(boolean blocked) { 1157 this.blocked = blocked; 1158 } 1159 1160 /** 1161 * @param connected The connected to set. 1162 */ 1163 public void setConnected(boolean connected) { 1164 this.connected = connected; 1165 } 1166 1167 /** 1168 * @return true if the Connection is active 1169 */ 1170 public boolean isActive() { 1171 return active; 1172 } 1173 1174 /** 1175 * @param active The active to set. 1176 */ 1177 public void setActive(boolean active) { 1178 this.active = active; 1179 } 1180 1181 /** 1182 * @return true if the Connection is starting 1183 */ 1184 public synchronized boolean isStarting() { 1185 return starting; 1186 } 1187 1188 public synchronized boolean isNetworkConnection() { 1189 return networkConnection; 1190 } 1191 1192 public boolean isFaultTolerantConnection() { 1193 return this.faultTolerantConnection; 1194 } 1195 1196 protected synchronized void setStarting(boolean starting) { 1197 this.starting = starting; 1198 } 1199 1200 /** 1201 * @return true if the Connection needs to stop 1202 */ 1203 public synchronized boolean isPendingStop() { 1204 return pendingStop; 1205 } 1206 1207 protected synchronized void setPendingStop(boolean pendingStop) { 1208 this.pendingStop = pendingStop; 1209 } 1210 1211 public Response processBrokerInfo(BrokerInfo info) { 1212 if (info.isSlaveBroker()) { 1213 BrokerService bService = connector.getBrokerService(); 1214 // Do we only support passive slaves - or does the slave want to be 1215 // passive ? 1216 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave(); 1217 if (passive == false) { 1218 1219 // stream messages from this broker (the master) to 1220 // the slave 1221 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class); 1222 masterBroker = new MasterBroker(parent, transport); 1223 masterBroker.startProcessing(); 1224 } 1225 LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached"); 1226 bService.slaveConnectionEstablished(); 1227 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1228 // so this TransportConnection is the rear end of a network bridge 1229 // We have been requested to create a two way pipe ... 1230 try { 1231 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1232 Map<String, String> props = createMap(properties); 1233 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1234 IntrospectionSupport.setProperties(config, props, ""); 1235 config.setBrokerName(broker.getBrokerName()); 1236 1237 // check for existing duplex connection hanging about 1238 1239 // We first look if existing network connection already exists for the same broker Id and network connector name 1240 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1241 // and the duplex network connector side wanting to open a new one 1242 // In this case, the old connection must be broken 1243 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1244 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1245 synchronized (connections) { 1246 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1247 TransportConnection c = iter.next(); 1248 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1249 LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ")."); 1250 c.stopAsync(); 1251 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1252 c.getStopped().await(1, TimeUnit.SECONDS); 1253 } 1254 } 1255 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1256 } 1257 URI uri = broker.getVmConnectorURI(); 1258 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 1259 map.put("network", "true"); 1260 map.put("async", "false"); 1261 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 1262 Transport localTransport = TransportFactory.connect(uri); 1263 Transport remoteBridgeTransport = new ResponseCorrelator(transport); 1264 String duplexName = localTransport.toString(); 1265 if (duplexName.contains("#")) { 1266 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1267 } 1268 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); 1269 listener.setCreatedByDuplex(true); 1270 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1271 duplexBridge.setBrokerService(broker.getBrokerService()); 1272 // now turn duplex off this side 1273 info.setDuplexConnection(false); 1274 duplexBridge.setCreatedByDuplex(true); 1275 duplexBridge.duplexStart(this, brokerInfo, info); 1276 LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId); 1277 return null; 1278 } catch (TransportDisposedIOException e) { 1279 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started."); 1280 return null; 1281 } catch (Exception e) { 1282 LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e); 1283 return null; 1284 } 1285 } 1286 // We only expect to get one broker info command per connection 1287 if (this.brokerInfo != null) { 1288 LOG.warn("Unexpected extra broker info command received: " + info); 1289 } 1290 this.brokerInfo = info; 1291 networkConnection = true; 1292 List<TransportConnectionState> connectionStates = listConnectionStates(); 1293 for (TransportConnectionState cs : connectionStates) { 1294 cs.getContext().setNetworkConnection(true); 1295 } 1296 return null; 1297 } 1298 1299 @SuppressWarnings({"unchecked", "rawtypes"}) 1300 private HashMap<String, String> createMap(Properties properties) { 1301 return new HashMap(properties); 1302 } 1303 1304 protected void dispatch(Command command) throws IOException { 1305 try { 1306 setMarkedCandidate(true); 1307 transport.oneway(command); 1308 } finally { 1309 setMarkedCandidate(false); 1310 } 1311 } 1312 1313 public String getRemoteAddress() { 1314 return transport.getRemoteAddress(); 1315 } 1316 1317 public String getConnectionId() { 1318 List<TransportConnectionState> connectionStates = listConnectionStates(); 1319 for (TransportConnectionState cs : connectionStates) { 1320 if (cs.getInfo().getClientId() != null) { 1321 return cs.getInfo().getClientId(); 1322 } 1323 return cs.getInfo().getConnectionId().toString(); 1324 } 1325 return null; 1326 } 1327 1328 public void updateClient(ConnectionControl control) { 1329 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1330 && this.wireFormatInfo.getVersion() >= 6) { 1331 dispatchAsync(control); 1332 } 1333 } 1334 1335 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1336 ProducerBrokerExchange result = producerExchanges.get(id); 1337 if (result == null) { 1338 synchronized (producerExchanges) { 1339 result = new ProducerBrokerExchange(); 1340 TransportConnectionState state = lookupConnectionState(id); 1341 context = state.getContext(); 1342 result.setConnectionContext(context); 1343 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1344 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); 1345 } 1346 SessionState ss = state.getSessionState(id.getParentId()); 1347 if (ss != null) { 1348 result.setProducerState(ss.getProducerState(id)); 1349 ProducerState producerState = ss.getProducerState(id); 1350 if (producerState != null && producerState.getInfo() != null) { 1351 ProducerInfo info = producerState.getInfo(); 1352 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1353 } 1354 } 1355 producerExchanges.put(id, result); 1356 } 1357 } else { 1358 context = result.getConnectionContext(); 1359 } 1360 return result; 1361 } 1362 1363 private void removeProducerBrokerExchange(ProducerId id) { 1364 synchronized (producerExchanges) { 1365 producerExchanges.remove(id); 1366 } 1367 } 1368 1369 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1370 ConsumerBrokerExchange result = consumerExchanges.get(id); 1371 return result; 1372 } 1373 1374 private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) { 1375 ConsumerBrokerExchange result = consumerExchanges.get(id); 1376 if (result == null) { 1377 synchronized (consumerExchanges) { 1378 result = new ConsumerBrokerExchange(); 1379 TransportConnectionState state = lookupConnectionState(id); 1380 context = state.getContext(); 1381 result.setConnectionContext(context); 1382 SessionState ss = state.getSessionState(id.getParentId()); 1383 if (ss != null) { 1384 ConsumerState cs = ss.getConsumerState(id); 1385 if (cs != null) { 1386 ConsumerInfo info = cs.getInfo(); 1387 if (info != null) { 1388 if (info.getDestination() != null && info.getDestination().isPattern()) { 1389 result.setWildcard(true); 1390 } 1391 } 1392 } 1393 } 1394 consumerExchanges.put(id, result); 1395 } 1396 } 1397 return result; 1398 } 1399 1400 private void removeConsumerBrokerExchange(ConsumerId id) { 1401 synchronized (consumerExchanges) { 1402 consumerExchanges.remove(id); 1403 } 1404 } 1405 1406 public int getProtocolVersion() { 1407 return protocolVersion.get(); 1408 } 1409 1410 public Response processControlCommand(ControlCommand command) throws Exception { 1411 String control = command.getCommand(); 1412 if (control != null && control.equals("shutdown")) { 1413 System.exit(0); 1414 } 1415 return null; 1416 } 1417 1418 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1419 return null; 1420 } 1421 1422 public Response processConnectionControl(ConnectionControl control) throws Exception { 1423 if (control != null) { 1424 faultTolerantConnection = control.isFaultTolerant(); 1425 } 1426 return null; 1427 } 1428 1429 public Response processConnectionError(ConnectionError error) throws Exception { 1430 return null; 1431 } 1432 1433 public Response processConsumerControl(ConsumerControl control) throws Exception { 1434 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1435 broker.processConsumerControl(consumerExchange, control); 1436 return null; 1437 } 1438 1439 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1440 TransportConnectionState state) { 1441 TransportConnectionState cs = null; 1442 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1443 // swap implementations 1444 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1445 newRegister.intialize(connectionStateRegister); 1446 connectionStateRegister = newRegister; 1447 } 1448 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1449 return cs; 1450 } 1451 1452 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1453 return connectionStateRegister.unregisterConnectionState(connectionId); 1454 } 1455 1456 protected synchronized List<TransportConnectionState> listConnectionStates() { 1457 return connectionStateRegister.listConnectionStates(); 1458 } 1459 1460 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1461 return connectionStateRegister.lookupConnectionState(connectionId); 1462 } 1463 1464 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1465 return connectionStateRegister.lookupConnectionState(id); 1466 } 1467 1468 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1469 return connectionStateRegister.lookupConnectionState(id); 1470 } 1471 1472 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1473 return connectionStateRegister.lookupConnectionState(id); 1474 } 1475 1476 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1477 return connectionStateRegister.lookupConnectionState(connectionId); 1478 } 1479 1480 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1481 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1482 } 1483 1484 protected synchronized String getDuplexNetworkConnectorId() { 1485 return this.duplexNetworkConnectorId; 1486 } 1487 1488 public boolean isStopping() { 1489 return stopping.get(); 1490 } 1491 1492 protected CountDownLatch getStopped() { 1493 return stopped; 1494 } 1495 1496 private int getProducerCount(ConnectionId connectionId) { 1497 int result = 0; 1498 TransportConnectionState cs = lookupConnectionState(connectionId); 1499 if (cs != null) { 1500 for (SessionId sessionId : cs.getSessionIds()) { 1501 SessionState sessionState = cs.getSessionState(sessionId); 1502 if (sessionState != null) { 1503 result += sessionState.getProducerIds().size(); 1504 } 1505 } 1506 } 1507 return result; 1508 } 1509 1510 private int getConsumerCount(ConnectionId connectionId) { 1511 int result = 0; 1512 TransportConnectionState cs = lookupConnectionState(connectionId); 1513 if (cs != null) { 1514 for (SessionId sessionId : cs.getSessionIds()) { 1515 SessionState sessionState = cs.getSessionState(sessionId); 1516 if (sessionState != null) { 1517 result += sessionState.getConsumerIds().size(); 1518 } 1519 } 1520 } 1521 return result; 1522 } 1523 }