001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.state;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.Map;
023    import java.util.Vector;
024    import java.util.Map.Entry;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAResource;
029    
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.ConnectionId;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerId;
035    import org.apache.activemq.command.ConsumerInfo;
036    import org.apache.activemq.command.DestinationInfo;
037    import org.apache.activemq.command.ExceptionResponse;
038    import org.apache.activemq.command.IntegerResponse;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessagePull;
041    import org.apache.activemq.command.ProducerId;
042    import org.apache.activemq.command.ProducerInfo;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.command.SessionId;
045    import org.apache.activemq.command.SessionInfo;
046    import org.apache.activemq.command.TransactionInfo;
047    import org.apache.activemq.transport.Transport;
048    import org.apache.activemq.util.IOExceptionSupport;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    /**
053     * Tracks the state of a connection so a newly established transport can be
054     * re-initialized to the state that was tracked.
055     * 
056     * 
057     */
058    public class ConnectionStateTracker extends CommandVisitorAdapter {
059        private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060    
061        private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062        private static final int MESSAGE_PULL_SIZE = 400;
063        protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
064    
065        private boolean trackTransactions;
066        private boolean restoreSessions = true;
067        private boolean restoreConsumers = true;
068        private boolean restoreProducers = true;
069        private boolean restoreTransaction = true;
070        private boolean trackMessages = true;
071        private boolean trackTransactionProducers = true;
072        private int maxCacheSize = 128 * 1024;
073        private int currentCacheSize;
074        private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075            protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076                boolean result = currentCacheSize > maxCacheSize;
077                if (result) {
078                    if (eldest.getValue() instanceof Message) {
079                        currentCacheSize -= ((Message)eldest.getValue()).getSize();
080                    } else if (eldest.getValue() instanceof MessagePull) {
081                        currentCacheSize -= MESSAGE_PULL_SIZE;
082                    }
083                    if (LOG.isTraceEnabled()) {
084                        LOG.trace("removing tracked message: " + eldest.getKey());
085                    }
086                }
087                return result;
088            }
089        };
090        
091        private class RemoveTransactionAction implements ResponseHandler {
092            private final TransactionInfo info;
093    
094            public RemoveTransactionAction(TransactionInfo info) {
095                this.info = info;
096            }
097    
098            public void onResponse(Command response) {
099                ConnectionId connectionId = info.getConnectionId();
100                ConnectionState cs = connectionStates.get(connectionId);
101                cs.removeTransactionState(info.getTransactionId());
102            }
103        }
104        
105        private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
106    
107            public PrepareReadonlyTransactionAction(TransactionInfo info) {
108                super(info);
109            }
110    
111            public void onResponse(Command command) {
112                IntegerResponse response = (IntegerResponse) command;
113                if (XAResource.XA_RDONLY == response.getResult()) {
114                    // all done, no commit or rollback from TM
115                    super.onResponse(command);
116                }
117            }
118        }
119    
120        /**
121         * 
122         * 
123         * @param command
124         * @return null if the command is not state tracked.
125         * @throws IOException
126         */
127        public Tracked track(Command command) throws IOException {
128            try {
129                return (Tracked)command.visit(this);
130            } catch (IOException e) {
131                throw e;
132            } catch (Throwable e) {
133                throw IOExceptionSupport.create(e);
134            }
135        }
136        
137        public void trackBack(Command command) {
138            if (command != null) {
139                if (trackMessages && command.isMessage()) {
140                    Message message = (Message) command;
141                    if (message.getTransactionId()==null) {
142                        currentCacheSize = currentCacheSize +  message.getSize();
143                    }
144                } else if (command instanceof MessagePull) {
145                    // just needs to be a rough estimate of size, ~4 identifiers
146                    currentCacheSize += MESSAGE_PULL_SIZE;
147                }
148            }
149        }
150    
151        public void restore(Transport transport) throws IOException {
152            // Restore the connections.
153            for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
154                ConnectionState connectionState = iter.next();
155                connectionState.getInfo().setFailoverReconnect(true);
156                if (LOG.isDebugEnabled()) {
157                    LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
158                }
159                transport.oneway(connectionState.getInfo());
160                restoreTempDestinations(transport, connectionState);
161    
162                if (restoreSessions) {
163                    restoreSessions(transport, connectionState);
164                }
165    
166                if (restoreTransaction) {
167                    restoreTransactions(transport, connectionState);
168                }
169            }
170            //now flush messages
171            for (Command msg:messageCache.values()) {
172                if (LOG.isDebugEnabled()) {
173                    LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
174                }
175                transport.oneway(msg);
176            }
177        }
178    
179        private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
180            Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
181            for (TransactionState transactionState : connectionState.getTransactionStates()) {
182                if (LOG.isDebugEnabled()) {
183                    LOG.debug("tx: " + transactionState.getId());
184                }
185                
186                // rollback any completed transactions - no way to know if commit got there
187                // or if reply went missing
188                //
189                if (!transactionState.getCommands().isEmpty()) {
190                    Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
191                    if (lastCommand instanceof TransactionInfo) {
192                        TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
193                        if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
194                            if (LOG.isDebugEnabled()) {
195                                LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
196                            }
197                            toRollback.add(transactionInfo);
198                            continue;
199                        }
200                    }
201                }
202                
203                // replay short lived producers that may have been involved in the transaction
204                for (ProducerState producerState : transactionState.getProducerStates().values()) {
205                    if (LOG.isDebugEnabled()) {
206                        LOG.debug("tx replay producer :" + producerState.getInfo());
207                    }
208                    transport.oneway(producerState.getInfo());
209                }
210                
211                for (Command command : transactionState.getCommands()) {
212                    if (LOG.isDebugEnabled()) {
213                        LOG.debug("tx replay: " + command);
214                    }
215                    transport.oneway(command);
216                }
217                
218                for (ProducerState producerState : transactionState.getProducerStates().values()) {
219                    if (LOG.isDebugEnabled()) {
220                        LOG.debug("tx remove replayed producer :" + producerState.getInfo());
221                    }
222                    transport.oneway(producerState.getInfo().createRemoveCommand());
223                }
224            }
225            
226            for (TransactionInfo command: toRollback) {
227                // respond to the outstanding commit
228                ExceptionResponse response = new ExceptionResponse();
229                response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
230                response.setCorrelationId(command.getCommandId());
231                transport.getTransportListener().onCommand(response);
232            }
233        }
234    
235        /**
236         * @param transport
237         * @param connectionState
238         * @throws IOException
239         */
240        protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
241            // Restore the connection's sessions
242            for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
243                SessionState sessionState = (SessionState)iter2.next();
244                if (LOG.isDebugEnabled()) {
245                    LOG.debug("session: " + sessionState.getInfo().getSessionId());
246                }
247                transport.oneway(sessionState.getInfo());
248    
249                if (restoreProducers) {
250                    restoreProducers(transport, sessionState);
251                }
252    
253                if (restoreConsumers) {
254                    restoreConsumers(transport, sessionState);
255                }
256            }
257        }
258    
259        /**
260         * @param transport
261         * @param sessionState
262         * @throws IOException
263         */
264        protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
265            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
266            final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
267            final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
268            for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
269                ConsumerInfo infoToSend = consumerState.getInfo();
270                if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
271                    infoToSend = consumerState.getInfo().copy();
272                    connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
273                    infoToSend.setPrefetchSize(0);
274                    if (LOG.isDebugEnabled()) {
275                        LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
276                    }
277                }
278                if (LOG.isDebugEnabled()) {
279                    LOG.debug("restore consumer: " + infoToSend.getConsumerId());
280                }
281                transport.oneway(infoToSend);
282            }
283        }
284    
285        /**
286         * @param transport
287         * @param sessionState
288         * @throws IOException
289         */
290        protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
291            // Restore the session's producers
292            for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
293                ProducerState producerState = (ProducerState)iter3.next();
294                if (LOG.isDebugEnabled()) {
295                    LOG.debug("producer: " + producerState.getInfo().getProducerId());
296                }
297                transport.oneway(producerState.getInfo());
298            }
299        }
300    
301        /**
302         * @param transport
303         * @param connectionState
304         * @throws IOException
305         */
306        protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
307            throws IOException {
308            // Restore the connection's temp destinations.
309            for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
310                transport.oneway((DestinationInfo)iter2.next());
311            }
312        }
313    
314        public Response processAddDestination(DestinationInfo info) {
315            if (info != null) {
316                ConnectionState cs = connectionStates.get(info.getConnectionId());
317                if (cs != null && info.getDestination().isTemporary()) {
318                    cs.addTempDestination(info);
319                }
320            }
321            return TRACKED_RESPONSE_MARKER;
322        }
323    
324        public Response processRemoveDestination(DestinationInfo info) {
325            if (info != null) {
326                ConnectionState cs = connectionStates.get(info.getConnectionId());
327                if (cs != null && info.getDestination().isTemporary()) {
328                    cs.removeTempDestination(info.getDestination());
329                }
330            }
331            return TRACKED_RESPONSE_MARKER;
332        }
333    
334        public Response processAddProducer(ProducerInfo info) {
335            if (info != null && info.getProducerId() != null) {
336                SessionId sessionId = info.getProducerId().getParentId();
337                if (sessionId != null) {
338                    ConnectionId connectionId = sessionId.getParentId();
339                    if (connectionId != null) {
340                        ConnectionState cs = connectionStates.get(connectionId);
341                        if (cs != null) {
342                            SessionState ss = cs.getSessionState(sessionId);
343                            if (ss != null) {
344                                ss.addProducer(info);
345                            }
346                        }
347                    }
348                }
349            }
350            return TRACKED_RESPONSE_MARKER;
351        }
352    
353        public Response processRemoveProducer(ProducerId id) {
354            if (id != null) {
355                SessionId sessionId = id.getParentId();
356                if (sessionId != null) {
357                    ConnectionId connectionId = sessionId.getParentId();
358                    if (connectionId != null) {
359                        ConnectionState cs = connectionStates.get(connectionId);
360                        if (cs != null) {
361                            SessionState ss = cs.getSessionState(sessionId);
362                            if (ss != null) {
363                                ss.removeProducer(id);
364                            }
365                        }
366                    }
367                }
368            }
369            return TRACKED_RESPONSE_MARKER;
370        }
371    
372        public Response processAddConsumer(ConsumerInfo info) {
373            if (info != null) {
374                SessionId sessionId = info.getConsumerId().getParentId();
375                if (sessionId != null) {
376                    ConnectionId connectionId = sessionId.getParentId();
377                    if (connectionId != null) {
378                        ConnectionState cs = connectionStates.get(connectionId);
379                        if (cs != null) {
380                            SessionState ss = cs.getSessionState(sessionId);
381                            if (ss != null) {
382                                ss.addConsumer(info);
383                            }
384                        }
385                    }
386                }
387            }
388            return TRACKED_RESPONSE_MARKER;
389        }
390    
391        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
392            if (id != null) {
393                SessionId sessionId = id.getParentId();
394                if (sessionId != null) {
395                    ConnectionId connectionId = sessionId.getParentId();
396                    if (connectionId != null) {
397                        ConnectionState cs = connectionStates.get(connectionId);
398                        if (cs != null) {
399                            SessionState ss = cs.getSessionState(sessionId);
400                            if (ss != null) {
401                                ss.removeConsumer(id);
402                            }
403                        }
404                    }
405                }
406            }
407            return TRACKED_RESPONSE_MARKER;
408        }
409    
410        public Response processAddSession(SessionInfo info) {
411            if (info != null) {
412                ConnectionId connectionId = info.getSessionId().getParentId();
413                if (connectionId != null) {
414                    ConnectionState cs = connectionStates.get(connectionId);
415                    if (cs != null) {
416                        cs.addSession(info);
417                    }
418                }
419            }
420            return TRACKED_RESPONSE_MARKER;
421        }
422    
423        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
424            if (id != null) {
425                ConnectionId connectionId = id.getParentId();
426                if (connectionId != null) {
427                    ConnectionState cs = connectionStates.get(connectionId);
428                    if (cs != null) {
429                        cs.removeSession(id);
430                    }
431                }
432            }
433            return TRACKED_RESPONSE_MARKER;
434        }
435    
436        public Response processAddConnection(ConnectionInfo info) {
437            if (info != null) {
438                connectionStates.put(info.getConnectionId(), new ConnectionState(info));
439            }
440            return TRACKED_RESPONSE_MARKER;
441        }
442    
443        public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
444            if (id != null) {
445                connectionStates.remove(id);
446            }
447            return TRACKED_RESPONSE_MARKER;
448        }
449    
450        public Response processMessage(Message send) throws Exception {
451            if (send != null) {
452                if (trackTransactions && send.getTransactionId() != null) {
453                    ProducerId producerId = send.getProducerId();
454                    ConnectionId connectionId = producerId.getParentId().getParentId();
455                    if (connectionId != null) {
456                        ConnectionState cs = connectionStates.get(connectionId);
457                        if (cs != null) {
458                            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
459                            if (transactionState != null) {
460                                transactionState.addCommand(send);
461                                
462                                if (trackTransactionProducers) {
463                                    // for jmstemplate, track the producer in case it is closed before commit
464                                    // and needs to be replayed
465                                    SessionState ss = cs.getSessionState(producerId.getParentId());
466                                    ProducerState producerState = ss.getProducerState(producerId);
467                                    producerState.setTransactionState(transactionState);            
468                                }
469                            }
470                        }
471                    }
472                    return TRACKED_RESPONSE_MARKER;
473                }else if (trackMessages) {
474                    messageCache.put(send.getMessageId(), send);
475                }
476            }
477            return null;
478        }
479    
480        public Response processBeginTransaction(TransactionInfo info) {
481            if (trackTransactions && info != null && info.getTransactionId() != null) {
482                ConnectionId connectionId = info.getConnectionId();
483                if (connectionId != null) {
484                    ConnectionState cs = connectionStates.get(connectionId);
485                    if (cs != null) {
486                        cs.addTransactionState(info.getTransactionId());
487                        TransactionState state = cs.getTransactionState(info.getTransactionId());
488                        state.addCommand(info);
489                    }
490                }
491                return TRACKED_RESPONSE_MARKER;
492            }
493            return null;
494        }
495    
496        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
497            if (trackTransactions && info != null) {
498                ConnectionId connectionId = info.getConnectionId();
499                if (connectionId != null) {
500                    ConnectionState cs = connectionStates.get(connectionId);
501                    if (cs != null) {
502                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
503                        if (transactionState != null) {
504                            transactionState.addCommand(info);
505                            return new Tracked(new PrepareReadonlyTransactionAction(info));
506                        }
507                    }
508                }
509            }
510            return null;
511        }
512    
513        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
514            if (trackTransactions && info != null) {
515                ConnectionId connectionId = info.getConnectionId();
516                if (connectionId != null) {
517                    ConnectionState cs = connectionStates.get(connectionId);
518                    if (cs != null) {
519                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
520                        if (transactionState != null) {
521                            transactionState.addCommand(info);
522                            return new Tracked(new RemoveTransactionAction(info));
523                        }
524                    }
525                }
526            }
527            return null;
528        }
529    
530        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
531            if (trackTransactions && info != null) {
532                ConnectionId connectionId = info.getConnectionId();
533                if (connectionId != null) {
534                    ConnectionState cs = connectionStates.get(connectionId);
535                    if (cs != null) {
536                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
537                        if (transactionState != null) {
538                            transactionState.addCommand(info);
539                            return new Tracked(new RemoveTransactionAction(info));
540                        }
541                    }
542                }
543            }
544            return null;
545        }
546    
547        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
548            if (trackTransactions && info != null) {
549                ConnectionId connectionId = info.getConnectionId();
550                if (connectionId != null) {
551                    ConnectionState cs = connectionStates.get(connectionId);
552                    if (cs != null) {
553                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
554                        if (transactionState != null) {
555                            transactionState.addCommand(info);
556                            return new Tracked(new RemoveTransactionAction(info));
557                        }
558                    }
559                }
560            }
561            return null;
562        }
563    
564        public Response processEndTransaction(TransactionInfo info) throws Exception {
565            if (trackTransactions && info != null) {
566                ConnectionId connectionId = info.getConnectionId();
567                if (connectionId != null) {
568                    ConnectionState cs = connectionStates.get(connectionId);
569                    if (cs != null) {
570                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
571                        if (transactionState != null) {
572                            transactionState.addCommand(info);
573                        }
574                    }
575                }
576                return TRACKED_RESPONSE_MARKER;
577            }
578            return null;
579        }
580    
581        @Override
582        public Response processMessagePull(MessagePull pull) throws Exception {
583            if (pull != null) {
584                // leave a single instance in the cache
585                final String id = pull.getDestination() + "::" + pull.getConsumerId();
586                messageCache.put(id.intern(), pull);
587            }
588            return null;
589        }
590    
591        public boolean isRestoreConsumers() {
592            return restoreConsumers;
593        }
594    
595        public void setRestoreConsumers(boolean restoreConsumers) {
596            this.restoreConsumers = restoreConsumers;
597        }
598    
599        public boolean isRestoreProducers() {
600            return restoreProducers;
601        }
602    
603        public void setRestoreProducers(boolean restoreProducers) {
604            this.restoreProducers = restoreProducers;
605        }
606    
607        public boolean isRestoreSessions() {
608            return restoreSessions;
609        }
610    
611        public void setRestoreSessions(boolean restoreSessions) {
612            this.restoreSessions = restoreSessions;
613        }
614    
615        public boolean isTrackTransactions() {
616            return trackTransactions;
617        }
618    
619        public void setTrackTransactions(boolean trackTransactions) {
620            this.trackTransactions = trackTransactions;
621        }
622        
623        public boolean isTrackTransactionProducers() {
624            return this.trackTransactionProducers;
625        }
626    
627        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
628            this.trackTransactionProducers = trackTransactionProducers;
629        }
630        
631        public boolean isRestoreTransaction() {
632            return restoreTransaction;
633        }
634    
635        public void setRestoreTransaction(boolean restoreTransaction) {
636            this.restoreTransaction = restoreTransaction;
637        }
638    
639        public boolean isTrackMessages() {
640            return trackMessages;
641        }
642    
643        public void setTrackMessages(boolean trackMessages) {
644            this.trackMessages = trackMessages;
645        }
646    
647        public int getMaxCacheSize() {
648            return maxCacheSize;
649        }
650    
651        public void setMaxCacheSize(int maxCacheSize) {
652            this.maxCacheSize = maxCacheSize;
653        }
654    
655        public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
656            ConnectionState connectionState = connectionStates.get(connectionId);
657            if (connectionState != null) {
658                connectionState.setConnectionInterruptProcessingComplete(true);
659                Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
660                for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
661                    ConsumerControl control = new ConsumerControl();
662                    control.setConsumerId(entry.getKey());
663                    control.setPrefetch(entry.getValue().getPrefetchSize());
664                    control.setDestination(entry.getValue().getDestination());
665                    try {
666                        if (LOG.isDebugEnabled()) {
667                            LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
668                        }
669                        transport.oneway(control);  
670                    } catch (Exception ex) {
671                        if (LOG.isDebugEnabled()) {
672                            LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
673                                    + " with: " + control.getPrefetch(), ex);
674                        }
675                    }
676                }
677                stalledConsumers.clear();
678            }
679        }
680    
681        public void transportInterrupted(ConnectionId connectionId) {
682            ConnectionState connectionState = connectionStates.get(connectionId);
683            if (connectionState != null) {
684                connectionState.setConnectionInterruptProcessingComplete(false);
685            }
686        }
687    }