001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.net.SocketException;
022    import java.net.URI;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Properties;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import java.util.concurrent.atomic.AtomicInteger;
035    import java.util.concurrent.atomic.AtomicReference;
036    import java.util.concurrent.locks.ReentrantReadWriteLock;
037    
038    import javax.transaction.xa.XAResource;
039    import org.apache.activemq.advisory.AdvisorySupport;
040    import org.apache.activemq.broker.ft.MasterBroker;
041    import org.apache.activemq.broker.region.ConnectionStatistics;
042    import org.apache.activemq.broker.region.RegionBroker;
043    import org.apache.activemq.command.*;
044    import org.apache.activemq.network.DemandForwardingBridge;
045    import org.apache.activemq.network.MBeanNetworkListener;
046    import org.apache.activemq.network.NetworkBridgeConfiguration;
047    import org.apache.activemq.network.NetworkBridgeFactory;
048    import org.apache.activemq.security.MessageAuthorizationPolicy;
049    import org.apache.activemq.state.CommandVisitor;
050    import org.apache.activemq.state.ConnectionState;
051    import org.apache.activemq.state.ConsumerState;
052    import org.apache.activemq.state.ProducerState;
053    import org.apache.activemq.state.SessionState;
054    import org.apache.activemq.state.TransactionState;
055    import org.apache.activemq.thread.DefaultThreadPools;
056    import org.apache.activemq.thread.Task;
057    import org.apache.activemq.thread.TaskRunner;
058    import org.apache.activemq.thread.TaskRunnerFactory;
059    import org.apache.activemq.transaction.Transaction;
060    import org.apache.activemq.transport.DefaultTransportListener;
061    import org.apache.activemq.transport.ResponseCorrelator;
062    import org.apache.activemq.transport.Transport;
063    import org.apache.activemq.transport.TransportDisposedIOException;
064    import org.apache.activemq.transport.TransportFactory;
065    import org.apache.activemq.util.IntrospectionSupport;
066    import org.apache.activemq.util.MarshallingSupport;
067    import org.apache.activemq.util.ServiceSupport;
068    import org.apache.activemq.util.URISupport;
069    import org.slf4j.Logger;
070    import org.slf4j.LoggerFactory;
071    import org.slf4j.MDC;
072    
073    public class TransportConnection implements Connection, Task, CommandVisitor {
074        private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
075        private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
076        private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
077        // Keeps track of the broker and connector that created this connection.
078        protected final Broker broker;
079        protected final TransportConnector connector;
080        // Keeps track of the state of the connections.
081        // protected final ConcurrentHashMap localConnectionStates=new
082        // ConcurrentHashMap();
083        protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
084        // The broker and wireformat info that was exchanged.
085        protected BrokerInfo brokerInfo;
086        protected final List<Command> dispatchQueue = new LinkedList<Command>();
087        protected TaskRunner taskRunner;
088        protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
089        protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
090        private MasterBroker masterBroker;
091        private final Transport transport;
092        private MessageAuthorizationPolicy messageAuthorizationPolicy;
093        private WireFormatInfo wireFormatInfo;
094        // Used to do async dispatch.. this should perhaps be pushed down into the
095        // transport layer..
096        private boolean inServiceException;
097        private final ConnectionStatistics statistics = new ConnectionStatistics();
098        private boolean manageable;
099        private boolean slow;
100        private boolean markedCandidate;
101        private boolean blockedCandidate;
102        private boolean blocked;
103        private boolean connected;
104        private boolean active;
105        private boolean starting;
106        private boolean pendingStop;
107        private long timeStamp;
108        private final AtomicBoolean stopping = new AtomicBoolean(false);
109        private final CountDownLatch stopped = new CountDownLatch(1);
110        private final AtomicBoolean asyncException = new AtomicBoolean(false);
111        private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
112        private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
113        private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
114        private ConnectionContext context;
115        private boolean networkConnection;
116        private boolean faultTolerantConnection;
117        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
118        private DemandForwardingBridge duplexBridge;
119        private final TaskRunnerFactory taskRunnerFactory;
120        private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
121        private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
122        private String duplexNetworkConnectorId;
123        private Throwable stopError = null;
124    
125        /**
126         * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
127         *                          else commands are sent async.
128         */
129        public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
130                                   TaskRunnerFactory taskRunnerFactory) {
131            this.connector = connector;
132            this.broker = broker;
133            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
134            RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
135            brokerConnectionStates = rb.getConnectionStates();
136            if (connector != null) {
137                this.statistics.setParent(connector.getStatistics());
138            }
139            this.taskRunnerFactory = taskRunnerFactory;
140            this.transport = transport;
141            this.transport.setTransportListener(new DefaultTransportListener() {
142                @Override
143                public void onCommand(Object o) {
144                    serviceLock.readLock().lock();
145                    try {
146                        if (!(o instanceof Command)) {
147                            throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
148                        }
149                        Command command = (Command) o;
150                        Response response = service(command);
151                        if (response != null) {
152                            dispatchSync(response);
153                        }
154                    } finally {
155                        serviceLock.readLock().unlock();
156                    }
157                }
158    
159                @Override
160                public void onException(IOException exception) {
161                    serviceLock.readLock().lock();
162                    try {
163                        serviceTransportException(exception);
164                    } finally {
165                        serviceLock.readLock().unlock();
166                    }
167                }
168            });
169            connected = true;
170        }
171    
172        /**
173         * Returns the number of messages to be dispatched to this connection
174         *
175         * @return size of dispatch queue
176         */
177        public int getDispatchQueueSize() {
178            synchronized (dispatchQueue) {
179                return dispatchQueue.size();
180            }
181        }
182    
183        public void serviceTransportException(IOException e) {
184            BrokerService bService = connector.getBrokerService();
185            if (bService.isShutdownOnSlaveFailure()) {
186                if (brokerInfo != null) {
187                    if (brokerInfo.isSlaveBroker()) {
188                        LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
189                        try {
190                            doStop();
191                            bService.stop();
192                        } catch (Exception ex) {
193                            LOG.warn("Failed to stop the master", ex);
194                        }
195                    }
196                }
197            }
198            if (!stopping.get()) {
199                transportException.set(e);
200                if (TRANSPORTLOG.isDebugEnabled()) {
201                    TRANSPORTLOG.debug(this + " failed: " + e, e);
202                } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
203                    TRANSPORTLOG.warn(this + " failed: " + e);
204                }
205                stopAsync();
206            }
207        }
208    
209        private boolean expected(IOException e) {
210            return isStomp() &&
211                    ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
212        }
213    
214        private boolean isStomp() {
215            URI uri = connector.getUri();
216            return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
217        }
218    
219        /**
220         * Calls the serviceException method in an async thread. Since handling a
221         * service exception closes a socket, we should not tie up broker threads
222         * since client sockets may hang or cause deadlocks.
223         */
224        public void serviceExceptionAsync(final IOException e) {
225            if (asyncException.compareAndSet(false, true)) {
226                new Thread("Async Exception Handler") {
227                    @Override
228                    public void run() {
229                        serviceException(e);
230                    }
231                }.start();
232            }
233        }
234    
235        /**
236         * Closes a clients connection due to a detected error. Errors are ignored
237         * if: the client is closing or broker is closing. Otherwise, the connection
238         * error transmitted to the client before stopping it's transport.
239         */
240        public void serviceException(Throwable e) {
241            // are we a transport exception such as not being able to dispatch
242            // synchronously to a transport
243            if (e instanceof IOException) {
244                serviceTransportException((IOException) e);
245            } else if (e.getClass() == BrokerStoppedException.class) {
246                // Handle the case where the broker is stopped
247                // But the client is still connected.
248                if (!stopping.get()) {
249                    if (SERVICELOG.isDebugEnabled()) {
250                        SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
251                    }
252                    ConnectionError ce = new ConnectionError();
253                    ce.setException(e);
254                    dispatchSync(ce);
255                    // Record the error that caused the transport to stop
256                    this.stopError = e;
257                    // Wait a little bit to try to get the output buffer to flush
258                    // the exption notification to the client.
259                    try {
260                        Thread.sleep(500);
261                    } catch (InterruptedException ie) {
262                        Thread.currentThread().interrupt();
263                    }
264                    // Worst case is we just kill the connection before the
265                    // notification gets to him.
266                    stopAsync();
267                }
268            } else if (!stopping.get() && !inServiceException) {
269                inServiceException = true;
270                try {
271                    SERVICELOG.warn("Async error occurred: " + e, e);
272                    ConnectionError ce = new ConnectionError();
273                    ce.setException(e);
274                    if (pendingStop) {
275                        dispatchSync(ce);
276                    } else {
277                        dispatchAsync(ce);
278                    }
279                } finally {
280                    inServiceException = false;
281                }
282            }
283        }
284    
285        public Response service(Command command) {
286            MDC.put("activemq.connector", connector.getUri().toString());
287            Response response = null;
288            boolean responseRequired = command.isResponseRequired();
289            int commandId = command.getCommandId();
290            try {
291                if (!pendingStop) {
292                    response = command.visit(this);
293                } else {
294                    response = new ExceptionResponse(this.stopError);
295                }
296            } catch (Throwable e) {
297                if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
298                    SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
299                            + " command: " + command + ", exception: " + e, e);
300                }
301    
302                if (e instanceof java.lang.SecurityException) {
303                    // still need to close this down - in case the peer of this transport doesn't play nice
304                    delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
305                }
306    
307                if (responseRequired) {
308                    response = new ExceptionResponse(e);
309                } else {
310                    serviceException(e);
311                }
312            }
313            if (responseRequired) {
314                if (response == null) {
315                    response = new Response();
316                }
317                response.setCorrelationId(commandId);
318            }
319            // The context may have been flagged so that the response is not
320            // sent.
321            if (context != null) {
322                if (context.isDontSendReponse()) {
323                    context.setDontSendReponse(false);
324                    response = null;
325                }
326                context = null;
327            }
328            MDC.remove("activemq.connector");
329            return response;
330        }
331    
332        public Response processKeepAlive(KeepAliveInfo info) throws Exception {
333            return null;
334        }
335    
336        public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
337            broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
338            return null;
339        }
340    
341        public Response processWireFormat(WireFormatInfo info) throws Exception {
342            wireFormatInfo = info;
343            protocolVersion.set(info.getVersion());
344            return null;
345        }
346    
347        public Response processShutdown(ShutdownInfo info) throws Exception {
348            stopAsync();
349            return null;
350        }
351    
352        public Response processFlush(FlushCommand command) throws Exception {
353            return null;
354        }
355    
356        public Response processBeginTransaction(TransactionInfo info) throws Exception {
357            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
358            context = null;
359            if (cs != null) {
360                context = cs.getContext();
361            }
362            if (cs == null) {
363                throw new NullPointerException("Context is null");
364            }
365            // Avoid replaying dup commands
366            if (cs.getTransactionState(info.getTransactionId()) == null) {
367                cs.addTransactionState(info.getTransactionId());
368                broker.beginTransaction(context, info.getTransactionId());
369            }
370            return null;
371        }
372    
373        public Response processEndTransaction(TransactionInfo info) throws Exception {
374            // No need to do anything. This packet is just sent by the client
375            // make sure he is synced with the server as commit command could
376            // come from a different connection.
377            return null;
378        }
379    
380        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
381            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
382            context = null;
383            if (cs != null) {
384                context = cs.getContext();
385            }
386            if (cs == null) {
387                throw new NullPointerException("Context is null");
388            }
389            TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
390            if (transactionState == null) {
391                throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
392                        + info.getTransactionId());
393            }
394            // Avoid dups.
395            if (!transactionState.isPrepared()) {
396                transactionState.setPrepared(true);
397                int result = broker.prepareTransaction(context, info.getTransactionId());
398                transactionState.setPreparedResult(result);
399                if (result == XAResource.XA_RDONLY) {
400                    // we are done, no further rollback or commit from TM
401                    cs.removeTransactionState(info.getTransactionId());
402                }
403                IntegerResponse response = new IntegerResponse(result);
404                return response;
405            } else {
406                IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
407                return response;
408            }
409        }
410    
411        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
412            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
413            context = cs.getContext();
414            cs.removeTransactionState(info.getTransactionId());
415            broker.commitTransaction(context, info.getTransactionId(), true);
416            return null;
417        }
418    
419        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
420            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
421            context = cs.getContext();
422            cs.removeTransactionState(info.getTransactionId());
423            broker.commitTransaction(context, info.getTransactionId(), false);
424            return null;
425        }
426    
427        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
428            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
429            context = cs.getContext();
430            cs.removeTransactionState(info.getTransactionId());
431            broker.rollbackTransaction(context, info.getTransactionId());
432            return null;
433        }
434    
435        public Response processForgetTransaction(TransactionInfo info) throws Exception {
436            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
437            context = cs.getContext();
438            broker.forgetTransaction(context, info.getTransactionId());
439            return null;
440        }
441    
442        public Response processRecoverTransactions(TransactionInfo info) throws Exception {
443            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
444            context = cs.getContext();
445            TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
446            return new DataArrayResponse(preparedTransactions);
447        }
448    
449        public Response processMessage(Message messageSend) throws Exception {
450            ProducerId producerId = messageSend.getProducerId();
451            ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
452            if (producerExchange.canDispatch(messageSend)) {
453                broker.send(producerExchange, messageSend);
454            }
455            return null;
456        }
457    
458        public Response processMessageAck(MessageAck ack) throws Exception {
459            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
460            if (consumerExchange != null) {
461                broker.acknowledge(consumerExchange, ack);
462            }
463            return null;
464        }
465    
466        public Response processMessagePull(MessagePull pull) throws Exception {
467            return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
468        }
469    
470        public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
471            broker.processDispatchNotification(notification);
472            return null;
473        }
474    
475        public Response processAddDestination(DestinationInfo info) throws Exception {
476            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
477            broker.addDestinationInfo(cs.getContext(), info);
478            if (info.getDestination().isTemporary()) {
479                cs.addTempDestination(info);
480            }
481            return null;
482        }
483    
484        public Response processRemoveDestination(DestinationInfo info) throws Exception {
485            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
486            broker.removeDestinationInfo(cs.getContext(), info);
487            if (info.getDestination().isTemporary()) {
488                cs.removeTempDestination(info.getDestination());
489            }
490            return null;
491        }
492    
493        public Response processAddProducer(ProducerInfo info) throws Exception {
494            SessionId sessionId = info.getProducerId().getParentId();
495            ConnectionId connectionId = sessionId.getParentId();
496            TransportConnectionState cs = lookupConnectionState(connectionId);
497            if (cs == null) {
498                throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
499                        + connectionId);
500            }
501            SessionState ss = cs.getSessionState(sessionId);
502            if (ss == null) {
503                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
504                        + sessionId);
505            }
506            // Avoid replaying dup commands
507            if (!ss.getProducerIds().contains(info.getProducerId())) {
508                ActiveMQDestination destination = info.getDestination();
509                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
510                    if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
511                        throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
512                    }
513                }
514                broker.addProducer(cs.getContext(), info);
515                try {
516                    ss.addProducer(info);
517                } catch (IllegalStateException e) {
518                    broker.removeProducer(cs.getContext(), info);
519                }
520    
521            }
522            return null;
523        }
524    
525        public Response processRemoveProducer(ProducerId id) throws Exception {
526            SessionId sessionId = id.getParentId();
527            ConnectionId connectionId = sessionId.getParentId();
528            TransportConnectionState cs = lookupConnectionState(connectionId);
529            SessionState ss = cs.getSessionState(sessionId);
530            if (ss == null) {
531                throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
532                        + sessionId);
533            }
534            ProducerState ps = ss.removeProducer(id);
535            if (ps == null) {
536                throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
537            }
538            removeProducerBrokerExchange(id);
539            broker.removeProducer(cs.getContext(), ps.getInfo());
540            return null;
541        }
542    
543        public Response processAddConsumer(ConsumerInfo info) throws Exception {
544            SessionId sessionId = info.getConsumerId().getParentId();
545            ConnectionId connectionId = sessionId.getParentId();
546            TransportConnectionState cs = lookupConnectionState(connectionId);
547            if (cs == null) {
548                throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
549                        + connectionId);
550            }
551            SessionState ss = cs.getSessionState(sessionId);
552            if (ss == null) {
553                throw new IllegalStateException(broker.getBrokerName()
554                        + " Cannot add a consumer to a session that had not been registered: " + sessionId);
555            }
556            // Avoid replaying dup commands
557            if (!ss.getConsumerIds().contains(info.getConsumerId())) {
558                ActiveMQDestination destination = info.getDestination();
559                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
560                    if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
561                        throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
562                    }
563                }
564    
565                broker.addConsumer(cs.getContext(), info);
566                try {
567                    ss.addConsumer(info);
568                    addConsumerBrokerExchange(info.getConsumerId());
569                } catch (IllegalStateException e) {
570                    broker.removeConsumer(cs.getContext(), info);
571                }
572    
573            }
574            return null;
575        }
576    
577        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
578            SessionId sessionId = id.getParentId();
579            ConnectionId connectionId = sessionId.getParentId();
580            TransportConnectionState cs = lookupConnectionState(connectionId);
581            if (cs == null) {
582                throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
583                        + connectionId);
584            }
585            SessionState ss = cs.getSessionState(sessionId);
586            if (ss == null) {
587                throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
588                        + sessionId);
589            }
590            ConsumerState consumerState = ss.removeConsumer(id);
591            if (consumerState == null) {
592                throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
593            }
594            ConsumerInfo info = consumerState.getInfo();
595            info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
596            broker.removeConsumer(cs.getContext(), consumerState.getInfo());
597            removeConsumerBrokerExchange(id);
598            return null;
599        }
600    
601        public Response processAddSession(SessionInfo info) throws Exception {
602            ConnectionId connectionId = info.getSessionId().getParentId();
603            TransportConnectionState cs = lookupConnectionState(connectionId);
604            // Avoid replaying dup commands
605            if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
606                broker.addSession(cs.getContext(), info);
607                try {
608                    cs.addSession(info);
609                } catch (IllegalStateException e) {
610                    e.printStackTrace();
611                    broker.removeSession(cs.getContext(), info);
612                }
613            }
614            return null;
615        }
616    
617        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
618            ConnectionId connectionId = id.getParentId();
619            TransportConnectionState cs = lookupConnectionState(connectionId);
620            if (cs == null) {
621                throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
622            }
623            SessionState session = cs.getSessionState(id);
624            if (session == null) {
625                throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
626            }
627            // Don't let new consumers or producers get added while we are closing
628            // this down.
629            session.shutdown();
630            // Cascade the connection stop to the consumers and producers.
631            for (ConsumerId consumerId : session.getConsumerIds()) {
632                try {
633                    processRemoveConsumer(consumerId, lastDeliveredSequenceId);
634                } catch (Throwable e) {
635                    LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
636                }
637            }
638            for (ProducerId producerId : session.getProducerIds()) {
639                try {
640                    processRemoveProducer(producerId);
641                } catch (Throwable e) {
642                    LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
643                }
644            }
645            cs.removeSession(id);
646            broker.removeSession(cs.getContext(), session.getInfo());
647            return null;
648        }
649    
650        public Response processAddConnection(ConnectionInfo info) throws Exception {
651            // if the broker service has slave attached, wait for the slave to be
652            // attached to allow client connection. slave connection is fine
653            if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
654                    && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
655                ServiceSupport.dispose(transport);
656                return new ExceptionResponse(new Exception("Master's slave not attached yet."));
657            }
658            // Older clients should have been defaulting this field to true.. but
659            // they were not.
660            if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
661                info.setClientMaster(true);
662            }
663            TransportConnectionState state;
664            // Make sure 2 concurrent connections by the same ID only generate 1
665            // TransportConnectionState object.
666            synchronized (brokerConnectionStates) {
667                state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
668                if (state == null) {
669                    state = new TransportConnectionState(info, this);
670                    brokerConnectionStates.put(info.getConnectionId(), state);
671                }
672                state.incrementReference();
673            }
674            // If there are 2 concurrent connections for the same connection id,
675            // then last one in wins, we need to sync here
676            // to figure out the winner.
677            synchronized (state.getConnectionMutex()) {
678                if (state.getConnection() != this) {
679                    LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
680                    state.getConnection().stop();
681                    LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
682                            + state.getConnection().getRemoteAddress());
683                    state.setConnection(this);
684                    state.reset(info);
685                }
686            }
687            registerConnectionState(info.getConnectionId(), state);
688            LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
689            this.faultTolerantConnection = info.isFaultTolerant();
690            // Setup the context.
691            String clientId = info.getClientId();
692            context = new ConnectionContext();
693            context.setBroker(broker);
694            context.setClientId(clientId);
695            context.setClientMaster(info.isClientMaster());
696            context.setConnection(this);
697            context.setConnectionId(info.getConnectionId());
698            context.setConnector(connector);
699            context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
700            context.setNetworkConnection(networkConnection);
701            context.setFaultTolerant(faultTolerantConnection);
702            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
703            context.setUserName(info.getUserName());
704            context.setWireFormatInfo(wireFormatInfo);
705            context.setReconnect(info.isFailoverReconnect());
706            this.manageable = info.isManageable();
707            context.setConnectionState(state);
708            state.setContext(context);
709            state.setConnection(this);
710            if (info.getClientIp() == null) {
711                info.setClientIp(getRemoteAddress());
712            }
713    
714            try {
715                broker.addConnection(context, info);
716            } catch (Exception e) {
717                synchronized (brokerConnectionStates) {
718                    brokerConnectionStates.remove(info.getConnectionId());
719                }
720                unregisterConnectionState(info.getConnectionId());
721                LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
722                if (LOG.isDebugEnabled()) {
723                    LOG.debug("Exception detail:", e);
724                }
725                throw e;
726            }
727            if (info.isManageable()) {
728                // send ConnectionCommand
729                ConnectionControl command = this.connector.getConnectionControl();
730                command.setFaultTolerant(broker.isFaultTolerantConfiguration());
731                if (info.isFailoverReconnect()) {
732                    command.setRebalanceConnection(false);
733                }
734                dispatchAsync(command);
735            }
736            return null;
737        }
738    
739        public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
740                throws InterruptedException {
741            LOG.debug("remove connection id: " + id);
742            TransportConnectionState cs = lookupConnectionState(id);
743            if (cs != null) {
744                // Don't allow things to be added to the connection state while we
745                // are shutting down.
746                cs.shutdown();
747                // Cascade the connection stop to the sessions.
748                for (SessionId sessionId : cs.getSessionIds()) {
749                    try {
750                        processRemoveSession(sessionId, lastDeliveredSequenceId);
751                    } catch (Throwable e) {
752                        SERVICELOG.warn("Failed to remove session " + sessionId, e);
753                    }
754                }
755                // Cascade the connection stop to temp destinations.
756                for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
757                    DestinationInfo di = iter.next();
758                    try {
759                        broker.removeDestination(cs.getContext(), di.getDestination(), 0);
760                    } catch (Throwable e) {
761                        SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
762                    }
763                    iter.remove();
764                }
765                try {
766                    broker.removeConnection(cs.getContext(), cs.getInfo(), null);
767                } catch (Throwable e) {
768                    SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
769                    if (LOG.isDebugEnabled()) {
770                        SERVICELOG.debug("Exception detail:", e);
771                    }
772                }
773                TransportConnectionState state = unregisterConnectionState(id);
774                if (state != null) {
775                    synchronized (brokerConnectionStates) {
776                        // If we are the last reference, we should remove the state
777                        // from the broker.
778                        if (state.decrementReference() == 0) {
779                            brokerConnectionStates.remove(id);
780                        }
781                    }
782                }
783            }
784            return null;
785        }
786    
787        public Response processProducerAck(ProducerAck ack) throws Exception {
788            // A broker should not get ProducerAck messages.
789            return null;
790        }
791    
792        public Connector getConnector() {
793            return connector;
794        }
795    
796        public void dispatchSync(Command message) {
797            try {
798                processDispatch(message);
799            } catch (IOException e) {
800                serviceExceptionAsync(e);
801            }
802        }
803    
804        public void dispatchAsync(Command message) {
805            if (!stopping.get()) {
806                if (taskRunner == null) {
807                    dispatchSync(message);
808                } else {
809                    synchronized (dispatchQueue) {
810                        dispatchQueue.add(message);
811                    }
812                    try {
813                        taskRunner.wakeup();
814                    } catch (InterruptedException e) {
815                        Thread.currentThread().interrupt();
816                    }
817                }
818            } else {
819                if (message.isMessageDispatch()) {
820                    MessageDispatch md = (MessageDispatch) message;
821                    Runnable sub = md.getTransmitCallback();
822                    broker.postProcessDispatch(md);
823                    if (sub != null) {
824                        sub.run();
825                    }
826                }
827            }
828        }
829    
830        protected void processDispatch(Command command) throws IOException {
831            final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
832            try {
833                if (!stopping.get()) {
834                    if (messageDispatch != null) {
835                        broker.preProcessDispatch(messageDispatch);
836                    }
837                    dispatch(command);
838                }
839            } finally {
840                if (messageDispatch != null) {
841                    Runnable sub = messageDispatch.getTransmitCallback();
842                    broker.postProcessDispatch(messageDispatch);
843                    if (sub != null) {
844                        sub.run();
845                    }
846                }
847            }
848        }
849    
850        public boolean iterate() {
851            try {
852                if (pendingStop || stopping.get()) {
853                    if (dispatchStopped.compareAndSet(false, true)) {
854                        if (transportException.get() == null) {
855                            try {
856                                dispatch(new ShutdownInfo());
857                            } catch (Throwable ignore) {
858                            }
859                        }
860                        dispatchStoppedLatch.countDown();
861                    }
862                    return false;
863                }
864                if (!dispatchStopped.get()) {
865                    Command command = null;
866                    synchronized (dispatchQueue) {
867                        if (dispatchQueue.isEmpty()) {
868                            return false;
869                        }
870                        command = dispatchQueue.remove(0);
871                    }
872                    processDispatch(command);
873                    return true;
874                }
875                return false;
876            } catch (IOException e) {
877                if (dispatchStopped.compareAndSet(false, true)) {
878                    dispatchStoppedLatch.countDown();
879                }
880                serviceExceptionAsync(e);
881                return false;
882            }
883        }
884    
885        /**
886         * Returns the statistics for this connection
887         */
888        public ConnectionStatistics getStatistics() {
889            return statistics;
890        }
891    
892        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
893            return messageAuthorizationPolicy;
894        }
895    
896        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
897            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
898        }
899    
900        public boolean isManageable() {
901            return manageable;
902        }
903    
904        public void start() throws Exception {
905            try {
906                synchronized (this) {
907                    starting = true;
908                    if (taskRunnerFactory != null) {
909                        taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
910                                + getRemoteAddress());
911                    } else {
912                        taskRunner = null;
913                    }
914                    transport.start();
915                    active = true;
916                    BrokerInfo info = connector.getBrokerInfo().copy();
917                    if (connector.isUpdateClusterClients()) {
918                        info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
919                    } else {
920                        info.setPeerBrokerInfos(null);
921                    }
922                    dispatchAsync(info);
923    
924                    connector.onStarted(this);
925                }
926            } catch (Exception e) {
927                // Force clean up on an error starting up.
928                pendingStop = true;
929                throw e;
930            } finally {
931                // stop() can be called from within the above block,
932                // but we want to be sure start() completes before
933                // stop() runs, so queue the stop until right now:
934                setStarting(false);
935                if (isPendingStop()) {
936                    LOG.debug("Calling the delayed stop() after start() " + this);
937                    stop();
938                }
939            }
940        }
941    
942        public void stop() throws Exception {
943            stopAsync();
944            while (!stopped.await(5, TimeUnit.SECONDS)) {
945                LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
946            }
947        }
948    
949        public void delayedStop(final int waitTime, final String reason, Throwable cause) {
950            if (waitTime > 0) {
951                synchronized (this) {
952                    pendingStop = true;
953                    stopError = cause;
954                }
955                try {
956                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
957                        public void run() {
958                            try {
959                                Thread.sleep(waitTime);
960                                stopAsync();
961                                LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
962                            } catch (InterruptedException e) {
963                            }
964                        }
965                    }, "delayedStop:" + transport.getRemoteAddress());
966                } catch (Throwable t) {
967                    LOG.warn("cannot create stopAsync :", t);
968                }
969            }
970        }
971    
972        public void stopAsync() {
973            // If we're in the middle of starting then go no further... for now.
974            synchronized (this) {
975                pendingStop = true;
976                if (starting) {
977                    LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
978                    return;
979                }
980            }
981            if (stopping.compareAndSet(false, true)) {
982                // Let all the connection contexts know we are shutting down
983                // so that in progress operations can notice and unblock.
984                List<TransportConnectionState> connectionStates = listConnectionStates();
985                for (TransportConnectionState cs : connectionStates) {
986                    ConnectionContext connectionContext = cs.getContext();
987                    if (connectionContext != null) {
988                        connectionContext.getStopping().set(true);
989                    }
990                }
991                try {
992                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
993                        public void run() {
994                            serviceLock.writeLock().lock();
995                            try {
996                                doStop();
997                            } catch (Throwable e) {
998                                LOG.debug("Error occurred while shutting down a connection " + this, e);
999                            } finally {
1000                                stopped.countDown();
1001                                serviceLock.writeLock().unlock();
1002                            }
1003                        }
1004                    }, "StopAsync:" + transport.getRemoteAddress());
1005                } catch (Throwable t) {
1006                    LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
1007                    stopped.countDown();
1008                }
1009            }
1010        }
1011    
1012        @Override
1013        public String toString() {
1014            return "Transport Connection to: " + transport.getRemoteAddress();
1015        }
1016    
1017        protected void doStop() throws Exception, InterruptedException {
1018            LOG.debug("Stopping connection: " + transport.getRemoteAddress());
1019            connector.onStopped(this);
1020            try {
1021                synchronized (this) {
1022                    if (masterBroker != null) {
1023                        masterBroker.stop();
1024                    }
1025                    if (duplexBridge != null) {
1026                        duplexBridge.stop();
1027                    }
1028                }
1029            } catch (Exception ignore) {
1030                LOG.trace("Exception caught stopping", ignore);
1031            }
1032            try {
1033                transport.stop();
1034                LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1035            } catch (Exception e) {
1036                LOG.debug("Could not stop transport: " + e, e);
1037            }
1038            if (taskRunner != null) {
1039                taskRunner.shutdown(1);
1040            }
1041            active = false;
1042            // Run the MessageDispatch callbacks so that message references get
1043            // cleaned up.
1044            synchronized (dispatchQueue) {
1045                for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1046                    Command command = iter.next();
1047                    if (command.isMessageDispatch()) {
1048                        MessageDispatch md = (MessageDispatch) command;
1049                        Runnable sub = md.getTransmitCallback();
1050                        broker.postProcessDispatch(md);
1051                        if (sub != null) {
1052                            sub.run();
1053                        }
1054                    }
1055                }
1056                dispatchQueue.clear();
1057            }
1058            //
1059            // Remove all logical connection associated with this connection
1060            // from the broker.
1061            if (!broker.isStopped()) {
1062                List<TransportConnectionState> connectionStates = listConnectionStates();
1063                connectionStates = listConnectionStates();
1064                for (TransportConnectionState cs : connectionStates) {
1065                    cs.getContext().getStopping().set(true);
1066                    try {
1067                        LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1068                        processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1069                    } catch (Throwable ignore) {
1070                        ignore.printStackTrace();
1071                    }
1072                }
1073            }
1074            LOG.debug("Connection Stopped: " + getRemoteAddress());
1075        }
1076    
1077        /**
1078         * @return Returns the blockedCandidate.
1079         */
1080        public boolean isBlockedCandidate() {
1081            return blockedCandidate;
1082        }
1083    
1084        /**
1085         * @param blockedCandidate The blockedCandidate to set.
1086         */
1087        public void setBlockedCandidate(boolean blockedCandidate) {
1088            this.blockedCandidate = blockedCandidate;
1089        }
1090    
1091        /**
1092         * @return Returns the markedCandidate.
1093         */
1094        public boolean isMarkedCandidate() {
1095            return markedCandidate;
1096        }
1097    
1098        /**
1099         * @param markedCandidate The markedCandidate to set.
1100         */
1101        public void setMarkedCandidate(boolean markedCandidate) {
1102            this.markedCandidate = markedCandidate;
1103            if (!markedCandidate) {
1104                timeStamp = 0;
1105                blockedCandidate = false;
1106            }
1107        }
1108    
1109        /**
1110         * @param slow The slow to set.
1111         */
1112        public void setSlow(boolean slow) {
1113            this.slow = slow;
1114        }
1115    
1116        /**
1117         * @return true if the Connection is slow
1118         */
1119        public boolean isSlow() {
1120            return slow;
1121        }
1122    
1123        /**
1124         * @return true if the Connection is potentially blocked
1125         */
1126        public boolean isMarkedBlockedCandidate() {
1127            return markedCandidate;
1128        }
1129    
1130        /**
1131         * Mark the Connection, so we can deem if it's collectable on the next sweep
1132         */
1133        public void doMark() {
1134            if (timeStamp == 0) {
1135                timeStamp = System.currentTimeMillis();
1136            }
1137        }
1138    
1139        /**
1140         * @return if after being marked, the Connection is still writing
1141         */
1142        public boolean isBlocked() {
1143            return blocked;
1144        }
1145    
1146        /**
1147         * @return true if the Connection is connected
1148         */
1149        public boolean isConnected() {
1150            return connected;
1151        }
1152    
1153        /**
1154         * @param blocked The blocked to set.
1155         */
1156        public void setBlocked(boolean blocked) {
1157            this.blocked = blocked;
1158        }
1159    
1160        /**
1161         * @param connected The connected to set.
1162         */
1163        public void setConnected(boolean connected) {
1164            this.connected = connected;
1165        }
1166    
1167        /**
1168         * @return true if the Connection is active
1169         */
1170        public boolean isActive() {
1171            return active;
1172        }
1173    
1174        /**
1175         * @param active The active to set.
1176         */
1177        public void setActive(boolean active) {
1178            this.active = active;
1179        }
1180    
1181        /**
1182         * @return true if the Connection is starting
1183         */
1184        public synchronized boolean isStarting() {
1185            return starting;
1186        }
1187    
1188        public synchronized boolean isNetworkConnection() {
1189            return networkConnection;
1190        }
1191    
1192        public boolean isFaultTolerantConnection() {
1193            return this.faultTolerantConnection;
1194        }
1195    
1196        protected synchronized void setStarting(boolean starting) {
1197            this.starting = starting;
1198        }
1199    
1200        /**
1201         * @return true if the Connection needs to stop
1202         */
1203        public synchronized boolean isPendingStop() {
1204            return pendingStop;
1205        }
1206    
1207        protected synchronized void setPendingStop(boolean pendingStop) {
1208            this.pendingStop = pendingStop;
1209        }
1210    
1211        public Response processBrokerInfo(BrokerInfo info) {
1212            if (info.isSlaveBroker()) {
1213                BrokerService bService = connector.getBrokerService();
1214                // Do we only support passive slaves - or does the slave want to be
1215                // passive ?
1216                boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1217                if (passive == false) {
1218    
1219                    // stream messages from this broker (the master) to
1220                    // the slave
1221                    MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1222                    masterBroker = new MasterBroker(parent, transport);
1223                    masterBroker.startProcessing();
1224                }
1225                LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
1226                bService.slaveConnectionEstablished();
1227            } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1228                // so this TransportConnection is the rear end of a network bridge
1229                // We have been requested to create a two way pipe ...
1230                try {
1231                    Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1232                    Map<String, String> props = createMap(properties);
1233                    NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1234                    IntrospectionSupport.setProperties(config, props, "");
1235                    config.setBrokerName(broker.getBrokerName());
1236    
1237                    // check for existing duplex connection hanging about
1238    
1239                    // We first look if existing network connection already exists for the same broker Id and network connector name
1240                    // It's possible in case of brief network fault to have this transport connector side of the connection always active
1241                    // and the duplex network connector side wanting to open a new one
1242                    // In this case, the old connection must be broken
1243                    String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1244                    CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1245                    synchronized (connections) {
1246                        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1247                            TransportConnection c = iter.next();
1248                            if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1249                                LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1250                                c.stopAsync();
1251                                // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1252                                c.getStopped().await(1, TimeUnit.SECONDS);
1253                            }
1254                        }
1255                        setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1256                    }
1257                    URI uri = broker.getVmConnectorURI();
1258                    HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1259                    map.put("network", "true");
1260                    map.put("async", "false");
1261                    uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1262                    Transport localTransport = TransportFactory.connect(uri);
1263                    Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1264                    String duplexName = localTransport.toString();
1265                    if (duplexName.contains("#")) {
1266                        duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1267                    }
1268                    MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1269                    listener.setCreatedByDuplex(true);
1270                    duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1271                    duplexBridge.setBrokerService(broker.getBrokerService());
1272                    // now turn duplex off this side
1273                    info.setDuplexConnection(false);
1274                    duplexBridge.setCreatedByDuplex(true);
1275                    duplexBridge.duplexStart(this, brokerInfo, info);
1276                    LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1277                    return null;
1278                } catch (TransportDisposedIOException e) {
1279                    LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1280                    return null;
1281                } catch (Exception e) {
1282                    LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
1283                    return null;
1284                }
1285            }
1286            // We only expect to get one broker info command per connection
1287            if (this.brokerInfo != null) {
1288                LOG.warn("Unexpected extra broker info command received: " + info);
1289            }
1290            this.brokerInfo = info;
1291            networkConnection = true;
1292            List<TransportConnectionState> connectionStates = listConnectionStates();
1293            for (TransportConnectionState cs : connectionStates) {
1294                cs.getContext().setNetworkConnection(true);
1295            }
1296            return null;
1297        }
1298    
1299        @SuppressWarnings({"unchecked", "rawtypes"})
1300        private HashMap<String, String> createMap(Properties properties) {
1301            return new HashMap(properties);
1302        }
1303    
1304        protected void dispatch(Command command) throws IOException {
1305            try {
1306                setMarkedCandidate(true);
1307                transport.oneway(command);
1308            } finally {
1309                setMarkedCandidate(false);
1310            }
1311        }
1312    
1313        public String getRemoteAddress() {
1314            return transport.getRemoteAddress();
1315        }
1316    
1317        public String getConnectionId() {
1318            List<TransportConnectionState> connectionStates = listConnectionStates();
1319            for (TransportConnectionState cs : connectionStates) {
1320                if (cs.getInfo().getClientId() != null) {
1321                    return cs.getInfo().getClientId();
1322                }
1323                return cs.getInfo().getConnectionId().toString();
1324            }
1325            return null;
1326        }
1327    
1328        public void updateClient(ConnectionControl control) {
1329            if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1330                    && this.wireFormatInfo.getVersion() >= 6) {
1331                dispatchAsync(control);
1332            }
1333        }
1334    
1335        private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1336            ProducerBrokerExchange result = producerExchanges.get(id);
1337            if (result == null) {
1338                synchronized (producerExchanges) {
1339                    result = new ProducerBrokerExchange();
1340                    TransportConnectionState state = lookupConnectionState(id);
1341                    context = state.getContext();
1342                    result.setConnectionContext(context);
1343                    if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1344                        result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1345                    }
1346                    SessionState ss = state.getSessionState(id.getParentId());
1347                    if (ss != null) {
1348                        result.setProducerState(ss.getProducerState(id));
1349                        ProducerState producerState = ss.getProducerState(id);
1350                        if (producerState != null && producerState.getInfo() != null) {
1351                            ProducerInfo info = producerState.getInfo();
1352                            result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1353                        }
1354                    }
1355                    producerExchanges.put(id, result);
1356                }
1357            } else {
1358                context = result.getConnectionContext();
1359            }
1360            return result;
1361        }
1362    
1363        private void removeProducerBrokerExchange(ProducerId id) {
1364            synchronized (producerExchanges) {
1365                producerExchanges.remove(id);
1366            }
1367        }
1368    
1369        private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1370            ConsumerBrokerExchange result = consumerExchanges.get(id);
1371            return result;
1372        }
1373    
1374        private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1375            ConsumerBrokerExchange result = consumerExchanges.get(id);
1376            if (result == null) {
1377                synchronized (consumerExchanges) {
1378                    result = new ConsumerBrokerExchange();
1379                    TransportConnectionState state = lookupConnectionState(id);
1380                    context = state.getContext();
1381                    result.setConnectionContext(context);
1382                    SessionState ss = state.getSessionState(id.getParentId());
1383                    if (ss != null) {
1384                        ConsumerState cs = ss.getConsumerState(id);
1385                        if (cs != null) {
1386                            ConsumerInfo info = cs.getInfo();
1387                            if (info != null) {
1388                                if (info.getDestination() != null && info.getDestination().isPattern()) {
1389                                    result.setWildcard(true);
1390                                }
1391                            }
1392                        }
1393                    }
1394                    consumerExchanges.put(id, result);
1395                }
1396            }
1397            return result;
1398        }
1399    
1400        private void removeConsumerBrokerExchange(ConsumerId id) {
1401            synchronized (consumerExchanges) {
1402                consumerExchanges.remove(id);
1403            }
1404        }
1405    
1406        public int getProtocolVersion() {
1407            return protocolVersion.get();
1408        }
1409    
1410        public Response processControlCommand(ControlCommand command) throws Exception {
1411            String control = command.getCommand();
1412            if (control != null && control.equals("shutdown")) {
1413                System.exit(0);
1414            }
1415            return null;
1416        }
1417    
1418        public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1419            return null;
1420        }
1421    
1422        public Response processConnectionControl(ConnectionControl control) throws Exception {
1423            if (control != null) {
1424                faultTolerantConnection = control.isFaultTolerant();
1425            }
1426            return null;
1427        }
1428    
1429        public Response processConnectionError(ConnectionError error) throws Exception {
1430            return null;
1431        }
1432    
1433        public Response processConsumerControl(ConsumerControl control) throws Exception {
1434            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1435            broker.processConsumerControl(consumerExchange, control);
1436            return null;
1437        }
1438    
1439        protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1440                                                                                TransportConnectionState state) {
1441            TransportConnectionState cs = null;
1442            if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1443                // swap implementations
1444                TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1445                newRegister.intialize(connectionStateRegister);
1446                connectionStateRegister = newRegister;
1447            }
1448            cs = connectionStateRegister.registerConnectionState(connectionId, state);
1449            return cs;
1450        }
1451    
1452        protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1453            return connectionStateRegister.unregisterConnectionState(connectionId);
1454        }
1455    
1456        protected synchronized List<TransportConnectionState> listConnectionStates() {
1457            return connectionStateRegister.listConnectionStates();
1458        }
1459    
1460        protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1461            return connectionStateRegister.lookupConnectionState(connectionId);
1462        }
1463    
1464        protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1465            return connectionStateRegister.lookupConnectionState(id);
1466        }
1467    
1468        protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1469            return connectionStateRegister.lookupConnectionState(id);
1470        }
1471    
1472        protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1473            return connectionStateRegister.lookupConnectionState(id);
1474        }
1475    
1476        protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1477            return connectionStateRegister.lookupConnectionState(connectionId);
1478        }
1479    
1480        protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1481            this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1482        }
1483    
1484        protected synchronized String getDuplexNetworkConnectorId() {
1485            return this.duplexNetworkConnectorId;
1486        }
1487    
1488        public boolean isStopping() {
1489            return stopping.get();
1490        }
1491    
1492        protected CountDownLatch getStopped() {
1493            return stopped;
1494        }
1495    
1496        private int getProducerCount(ConnectionId connectionId) {
1497            int result = 0;
1498            TransportConnectionState cs = lookupConnectionState(connectionId);
1499            if (cs != null) {
1500                for (SessionId sessionId : cs.getSessionIds()) {
1501                    SessionState sessionState = cs.getSessionState(sessionId);
1502                    if (sessionState != null) {
1503                        result += sessionState.getProducerIds().size();
1504                    }
1505                }
1506            }
1507            return result;
1508        }
1509    
1510        private int getConsumerCount(ConnectionId connectionId) {
1511            int result = 0;
1512            TransportConnectionState cs = lookupConnectionState(connectionId);
1513            if (cs != null) {
1514                for (SessionId sessionId : cs.getSessionIds()) {
1515                    SessionState sessionState = cs.getSessionState(sessionId);
1516                    if (sessionState != null) {
1517                        result += sessionState.getConsumerIds().size();
1518                    }
1519                }
1520            }
1521            return result;
1522        }
1523    }