001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.failover;
018
019import java.io.BufferedReader;
020import java.io.FileReader;
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.InterruptedIOException;
024import java.net.InetAddress;
025import java.net.MalformedURLException;
026import java.net.URI;
027import java.net.URL;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedHashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.StringTokenizer;
036import java.util.concurrent.CopyOnWriteArrayList;
037import java.util.concurrent.atomic.AtomicReference;
038
039import org.apache.activemq.broker.SslContext;
040import org.apache.activemq.command.Command;
041import org.apache.activemq.command.ConnectionControl;
042import org.apache.activemq.command.ConnectionId;
043import org.apache.activemq.command.RemoveInfo;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.state.ConnectionStateTracker;
046import org.apache.activemq.state.Tracked;
047import org.apache.activemq.thread.DefaultThreadPools;
048import org.apache.activemq.thread.Task;
049import org.apache.activemq.thread.TaskRunner;
050import org.apache.activemq.transport.CompositeTransport;
051import org.apache.activemq.transport.DefaultTransportListener;
052import org.apache.activemq.transport.FutureResponse;
053import org.apache.activemq.transport.ResponseCallback;
054import org.apache.activemq.transport.Transport;
055import org.apache.activemq.transport.TransportFactory;
056import org.apache.activemq.transport.TransportListener;
057import org.apache.activemq.util.IOExceptionSupport;
058import org.apache.activemq.util.ServiceSupport;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * A Transport that is made reliable by being able to fail over to another
064 * transport when a transport failure is detected.
065 */
066public class FailoverTransport implements CompositeTransport {
067
068    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
069    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
070    private static final int INFINITE = -1;
071    private TransportListener transportListener;
072    private boolean disposed;
073    private boolean connected;
074    private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
075    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
076
077    private final Object reconnectMutex = new Object();
078    private final Object backupMutex = new Object();
079    private final Object sleepMutex = new Object();
080    private final Object listenerMutex = new Object();
081    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
082    private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
083
084    private URI connectedTransportURI;
085    private URI failedConnectTransportURI;
086    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
087    private final TaskRunner reconnectTask;
088    private boolean started;
089    private boolean initialized;
090    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
091    private long maxReconnectDelay = 1000 * 30;
092    private double backOffMultiplier = 2d;
093    private long timeout = INFINITE;
094    private boolean useExponentialBackOff = true;
095    private boolean randomize = true;
096    private int maxReconnectAttempts = INFINITE;
097    private int startupMaxReconnectAttempts = INFINITE;
098    private int connectFailures;
099    private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
100    private Exception connectionFailure;
101    private boolean firstConnection = true;
102    // optionally always have a backup created
103    private boolean backup = false;
104    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
105    private int backupPoolSize = 1;
106    private boolean trackMessages = false;
107    private boolean trackTransactionProducers = true;
108    private int maxCacheSize = 128 * 1024;
109    private final TransportListener disposedListener = new DefaultTransportListener() {
110    };
111    private final TransportListener myTransportListener = createTransportListener();
112    private boolean updateURIsSupported = true;
113    private boolean reconnectSupported = true;
114    // remember for reconnect thread
115    private SslContext brokerSslContext;
116    private String updateURIsURL = null;
117    private boolean rebalanceUpdateURIs = true;
118    private boolean doRebalance = false;
119    private boolean connectedToPriority = false;
120
121    private boolean priorityBackup = false;
122    private ArrayList<URI> priorityList = new ArrayList<URI>();
123    private boolean priorityBackupAvailable = false;
124
125    public FailoverTransport() throws InterruptedIOException {
126        brokerSslContext = SslContext.getCurrentSslContext();
127        stateTracker.setTrackTransactions(true);
128        // Setup a task that is used to reconnect the a connection async.
129        reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
130            public boolean iterate() {
131                boolean result = false;
132                if (!started) {
133                    return result;
134                }
135                boolean buildBackup = true;
136                synchronized (backupMutex) {
137                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
138                        result = doReconnect();
139                        buildBackup = false;
140                        connectedToPriority = isPriority(connectedTransportURI);
141                    }
142                }
143                if (buildBackup) {
144                    buildBackups();
145                    if (priorityBackup && !connectedToPriority) {
146                        try {
147                            doDelay();
148                            if (reconnectTask == null) {
149                                return true;
150                            }
151                            reconnectTask.wakeup();
152                        } catch (InterruptedException e) {
153                            LOG.debug("Reconnect task has been interrupted.", e);
154                        }
155                    }
156                } else {
157                    // build backups on the next iteration
158                    buildBackup = true;
159                    try {
160                        if (reconnectTask == null) {
161                            return true;
162                        }
163                        reconnectTask.wakeup();
164                    } catch (InterruptedException e) {
165                        LOG.debug("Reconnect task has been interrupted.", e);
166                    }
167                }
168                return result;
169            }
170
171        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
172    }
173
174    TransportListener createTransportListener() {
175        return new TransportListener() {
176            public void onCommand(Object o) {
177                Command command = (Command) o;
178                if (command == null) {
179                    return;
180                }
181                if (command.isResponse()) {
182                    Object object = null;
183                    synchronized (requestMap) {
184                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
185                    }
186                    if (object != null && object.getClass() == Tracked.class) {
187                        ((Tracked) object).onResponses(command);
188                    }
189                }
190                if (!initialized) {
191                    initialized = true;
192                }
193
194                if (command.isConnectionControl()) {
195                    handleConnectionControl((ConnectionControl) command);
196                }
197                if (transportListener != null) {
198                    transportListener.onCommand(command);
199                }
200            }
201
202            public void onException(IOException error) {
203                try {
204                    handleTransportFailure(error);
205                } catch (InterruptedException e) {
206                    Thread.currentThread().interrupt();
207                    transportListener.onException(new InterruptedIOException());
208                }
209            }
210
211            public void transportInterupted() {
212                if (transportListener != null) {
213                    transportListener.transportInterupted();
214                }
215            }
216
217            public void transportResumed() {
218                if (transportListener != null) {
219                    transportListener.transportResumed();
220                }
221            }
222        };
223    }
224
225    public final void disposeTransport(Transport transport) {
226        transport.setTransportListener(disposedListener);
227        ServiceSupport.dispose(transport);
228    }
229
230    public final void handleTransportFailure(IOException e) throws InterruptedException {
231        if (LOG.isTraceEnabled()) {
232            LOG.trace(this + " handleTransportFailure: " + e);
233        }
234        Transport transport = connectedTransport.getAndSet(null);
235        if (transport == null) {
236            // sync with possible in progress reconnect
237            synchronized (reconnectMutex) {
238                transport = connectedTransport.getAndSet(null);
239            }
240        }
241        if (transport != null) {
242
243            disposeTransport(transport);
244
245            boolean reconnectOk = false;
246            synchronized (reconnectMutex) {
247                if (canReconnect()) {
248                    reconnectOk = true;
249                }
250                LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason:  " + e
251                        + (reconnectOk ? "," : ", not")  +" attempting to automatically reconnect");
252
253                initialized = false;
254                failedConnectTransportURI = connectedTransportURI;
255                connectedTransportURI = null;
256                connected = false;
257
258                // notify before any reconnect attempt so ack state can be whacked
259                if (transportListener != null) {
260                    transportListener.transportInterupted();
261                }
262
263                if (reconnectOk) {
264                    updated.remove(failedConnectTransportURI);
265                    reconnectTask.wakeup();
266                } else {
267                    propagateFailureToExceptionListener(e);
268                }
269            }
270        }
271    }
272
273    private boolean canReconnect() {
274        return started && 0 != calculateReconnectAttemptLimit();
275    }
276
277    public final void handleConnectionControl(ConnectionControl control) {
278        String reconnectStr = control.getReconnectTo();
279        if (reconnectStr != null) {
280            reconnectStr = reconnectStr.trim();
281            if (reconnectStr.length() > 0) {
282                try {
283                    URI uri = new URI(reconnectStr);
284                    if (isReconnectSupported()) {
285                        reconnect(uri);
286                        LOG.info("Reconnected to: " + uri);
287                    }
288                } catch (Exception e) {
289                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
290                }
291            }
292        }
293        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
294    }
295
296    private final void processNewTransports(boolean rebalance, String newTransports) {
297        if (newTransports != null) {
298            newTransports = newTransports.trim();
299            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
300                List<URI> list = new ArrayList<URI>();
301                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
302                while (tokenizer.hasMoreTokens()) {
303                    String str = tokenizer.nextToken();
304                    try {
305                        URI uri = new URI(str);
306                        list.add(uri);
307                    } catch (Exception e) {
308                        LOG.error("Failed to parse broker address: " + str, e);
309                    }
310                }
311                if (list.isEmpty() == false) {
312                    try {
313                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
314                    } catch (IOException e) {
315                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
316                    }
317                }
318            }
319        }
320    }
321
322    public void start() throws Exception {
323        synchronized (reconnectMutex) {
324            if (LOG.isDebugEnabled()) {
325                LOG.debug("Started " + this);
326            }
327            if (started) {
328                return;
329            }
330            started = true;
331            stateTracker.setMaxCacheSize(getMaxCacheSize());
332            stateTracker.setTrackMessages(isTrackMessages());
333            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
334            if (connectedTransport.get() != null) {
335                stateTracker.restore(connectedTransport.get());
336            } else {
337                reconnect(false);
338            }
339        }
340    }
341
342    public void stop() throws Exception {
343        Transport transportToStop = null;
344        synchronized (reconnectMutex) {
345            if (LOG.isDebugEnabled()) {
346                LOG.debug("Stopped " + this);
347            }
348            if (!started) {
349                return;
350            }
351            started = false;
352            disposed = true;
353            connected = false;
354            for (BackupTransport t : backups) {
355                t.setDisposed(true);
356            }
357            backups.clear();
358
359            if (connectedTransport.get() != null) {
360                transportToStop = connectedTransport.getAndSet(null);
361            }
362            reconnectMutex.notifyAll();
363        }
364        synchronized (sleepMutex) {
365            sleepMutex.notifyAll();
366        }
367        reconnectTask.shutdown();
368        if (transportToStop != null) {
369            transportToStop.stop();
370        }
371    }
372
373    public long getInitialReconnectDelay() {
374        return initialReconnectDelay;
375    }
376
377    public void setInitialReconnectDelay(long initialReconnectDelay) {
378        this.initialReconnectDelay = initialReconnectDelay;
379    }
380
381    public long getMaxReconnectDelay() {
382        return maxReconnectDelay;
383    }
384
385    public void setMaxReconnectDelay(long maxReconnectDelay) {
386        this.maxReconnectDelay = maxReconnectDelay;
387    }
388
389    public long getReconnectDelay() {
390        return reconnectDelay;
391    }
392
393    public void setReconnectDelay(long reconnectDelay) {
394        this.reconnectDelay = reconnectDelay;
395    }
396
397    public double getReconnectDelayExponent() {
398        return backOffMultiplier;
399    }
400
401    public void setReconnectDelayExponent(double reconnectDelayExponent) {
402        this.backOffMultiplier = reconnectDelayExponent;
403    }
404
405    public Transport getConnectedTransport() {
406        return connectedTransport.get();
407    }
408
409    public URI getConnectedTransportURI() {
410        return connectedTransportURI;
411    }
412
413    public int getMaxReconnectAttempts() {
414        return maxReconnectAttempts;
415    }
416
417    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
418        this.maxReconnectAttempts = maxReconnectAttempts;
419    }
420
421    public int getStartupMaxReconnectAttempts() {
422        return this.startupMaxReconnectAttempts;
423    }
424
425    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
426        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
427    }
428
429    public long getTimeout() {
430        return timeout;
431    }
432
433    public void setTimeout(long timeout) {
434        this.timeout = timeout;
435    }
436
437    /**
438     * @return Returns the randomize.
439     */
440    public boolean isRandomize() {
441        return randomize;
442    }
443
444    /**
445     * @param randomize The randomize to set.
446     */
447    public void setRandomize(boolean randomize) {
448        this.randomize = randomize;
449    }
450
451    public boolean isBackup() {
452        return backup;
453    }
454
455    public void setBackup(boolean backup) {
456        this.backup = backup;
457    }
458
459    public int getBackupPoolSize() {
460        return backupPoolSize;
461    }
462
463    public void setBackupPoolSize(int backupPoolSize) {
464        this.backupPoolSize = backupPoolSize;
465    }
466
467    public int getCurrentBackups() {
468        return this.backups.size();
469    }
470
471    public boolean isTrackMessages() {
472        return trackMessages;
473    }
474
475    public void setTrackMessages(boolean trackMessages) {
476        this.trackMessages = trackMessages;
477    }
478
479    public boolean isTrackTransactionProducers() {
480        return this.trackTransactionProducers;
481    }
482
483    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
484        this.trackTransactionProducers = trackTransactionProducers;
485    }
486
487    public int getMaxCacheSize() {
488        return maxCacheSize;
489    }
490
491    public void setMaxCacheSize(int maxCacheSize) {
492        this.maxCacheSize = maxCacheSize;
493    }
494
495    public boolean isPriorityBackup() {
496        return priorityBackup;
497    }
498
499    public void setPriorityBackup(boolean priorityBackup) {
500        this.priorityBackup = priorityBackup;
501    }
502
503    public void setPriorityURIs(String priorityURIs) {
504        StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
505        while (tokenizer.hasMoreTokens()) {
506            String str = tokenizer.nextToken();
507            try {
508                URI uri = new URI(str);
509                priorityList.add(uri);
510            } catch (Exception e) {
511                LOG.error("Failed to parse broker address: " + str, e);
512            }
513        }
514    }
515
516    public void oneway(Object o) throws IOException {
517
518        Command command = (Command) o;
519        Exception error = null;
520        try {
521
522            synchronized (reconnectMutex) {
523
524                if (command != null && connectedTransport.get() == null) {
525                    if (command.isShutdownInfo()) {
526                        // Skipping send of ShutdownInfo command when not connected.
527                        return;
528                    } else if (command instanceof RemoveInfo || command.isMessageAck()) {
529                        // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
530                        stateTracker.track(command);
531                        if (command.isResponseRequired()) {
532                            Response response = new Response();
533                            response.setCorrelationId(command.getCommandId());
534                            myTransportListener.onCommand(response);
535                        }
536                        return;
537                    }
538                }
539
540                // Keep trying until the message is sent.
541                for (int i = 0; !disposed; i++) {
542                    try {
543
544                        // Wait for transport to be connected.
545                        Transport transport = connectedTransport.get();
546                        long start = System.currentTimeMillis();
547                        boolean timedout = false;
548                        while (transport == null && !disposed && connectionFailure == null
549                                && !Thread.currentThread().isInterrupted()) {
550                            if (LOG.isTraceEnabled()) {
551                                LOG.trace("Waiting for transport to reconnect..: " + command);
552                            }
553                            long end = System.currentTimeMillis();
554                            if (timeout > 0 && (end - start > timeout)) {
555                                timedout = true;
556                                if (LOG.isInfoEnabled()) {
557                                    LOG.info("Failover timed out after " + (end - start) + "ms");
558                                }
559                                break;
560                            }
561                            try {
562                                reconnectMutex.wait(100);
563                            } catch (InterruptedException e) {
564                                Thread.currentThread().interrupt();
565                                if (LOG.isDebugEnabled()) {
566                                    LOG.debug("Interupted: " + e, e);
567                                }
568                            }
569                            transport = connectedTransport.get();
570                        }
571
572                        if (transport == null) {
573                            // Previous loop may have exited due to use being
574                            // disposed.
575                            if (disposed) {
576                                error = new IOException("Transport disposed.");
577                            } else if (connectionFailure != null) {
578                                error = connectionFailure;
579                            } else if (timedout == true) {
580                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
581                            } else {
582                                error = new IOException("Unexpected failure.");
583                            }
584                            break;
585                        }
586
587                        // If it was a request and it was not being tracked by
588                        // the state tracker,
589                        // then hold it in the requestMap so that we can replay
590                        // it later.
591                        Tracked tracked = stateTracker.track(command);
592                        synchronized (requestMap) {
593                            if (tracked != null && tracked.isWaitingForResponse()) {
594                                requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
595                            } else if (tracked == null && command.isResponseRequired()) {
596                                requestMap.put(Integer.valueOf(command.getCommandId()), command);
597                            }
598                        }
599
600                        // Send the message.
601                        try {
602                            transport.oneway(command);
603                            stateTracker.trackBack(command);
604                        } catch (IOException e) {
605
606                            // If the command was not tracked.. we will retry in
607                            // this method
608                            if (tracked == null) {
609
610                                // since we will retry in this method.. take it
611                                // out of the request
612                                // map so that it is not sent 2 times on
613                                // recovery
614                                if (command.isResponseRequired()) {
615                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
616                                }
617
618                                // Rethrow the exception so it will handled by
619                                // the outer catch
620                                throw e;
621                            } else {
622                                // Handle the error but allow the method to return since the
623                                // tracked commands are replayed on reconnect.
624                                if (LOG.isDebugEnabled()) {
625                                    LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
626                                }
627                                handleTransportFailure(e);
628                            }
629                        }
630
631                        return;
632
633                    } catch (IOException e) {
634                        if (LOG.isDebugEnabled()) {
635                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
636                        }
637                        handleTransportFailure(e);
638                    }
639                }
640            }
641        } catch (InterruptedException e) {
642            // Some one may be trying to stop our thread.
643            Thread.currentThread().interrupt();
644            throw new InterruptedIOException();
645        }
646
647        if (!disposed) {
648            if (error != null) {
649                if (error instanceof IOException) {
650                    throw (IOException) error;
651                }
652                throw IOExceptionSupport.create(error);
653            }
654        }
655    }
656
657    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
658        throw new AssertionError("Unsupported Method");
659    }
660
661    public Object request(Object command) throws IOException {
662        throw new AssertionError("Unsupported Method");
663    }
664
665    public Object request(Object command, int timeout) throws IOException {
666        throw new AssertionError("Unsupported Method");
667    }
668
669    public void add(boolean rebalance, URI u[]) {
670        boolean newURI = false;
671        for (URI uri : u) {
672            if (!contains(uri)) {
673                uris.add(uri);
674                newURI = true;
675            }
676        }
677        if (newURI) {
678            reconnect(rebalance);
679        }
680    }
681
682    public void remove(boolean rebalance, URI u[]) {
683        for (URI uri : u) {
684            uris.remove(uri);
685        }
686        // rebalance is automatic if any connected to removed/stopped broker
687    }
688
689    public void add(boolean rebalance, String u) {
690        try {
691            URI newURI = new URI(u);
692            if (contains(newURI) == false) {
693                uris.add(newURI);
694                reconnect(rebalance);
695            }
696
697        } catch (Exception e) {
698            LOG.error("Failed to parse URI: " + u);
699        }
700    }
701
702    public void reconnect(boolean rebalance) {
703        synchronized (reconnectMutex) {
704            if (started) {
705                if (rebalance) {
706                    doRebalance = true;
707                }
708                LOG.debug("Waking up reconnect task");
709                try {
710                    reconnectTask.wakeup();
711                } catch (InterruptedException e) {
712                    Thread.currentThread().interrupt();
713                }
714            } else {
715                LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
716            }
717        }
718    }
719
720    private List<URI> getConnectList() {
721        if (!updated.isEmpty()) {
722            if (failedConnectTransportURI != null) {
723                boolean removed = updated.remove(failedConnectTransportURI);
724                if (removed) {
725                    updated.add(failedConnectTransportURI);
726                }
727            }
728            return updated;
729        }
730        ArrayList<URI> l = new ArrayList<URI>(uris);
731        boolean removed = false;
732        if (failedConnectTransportURI != null) {
733            removed = l.remove(failedConnectTransportURI);
734        }
735        if (randomize) {
736            // Randomly, reorder the list by random swapping
737            for (int i = 0; i < l.size(); i++) {
738                int p = (int) (Math.random() * 100 % l.size());
739                URI t = l.get(p);
740                l.set(p, l.get(i));
741                l.set(i, t);
742            }
743        }
744        if (removed) {
745            l.add(failedConnectTransportURI);
746        }
747        if (LOG.isDebugEnabled()) {
748            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
749        }
750        return l;
751    }
752
753    public TransportListener getTransportListener() {
754        return transportListener;
755    }
756
757    public void setTransportListener(TransportListener commandListener) {
758        synchronized (listenerMutex) {
759            this.transportListener = commandListener;
760            listenerMutex.notifyAll();
761        }
762    }
763
764    public <T> T narrow(Class<T> target) {
765
766        if (target.isAssignableFrom(getClass())) {
767            return target.cast(this);
768        }
769        Transport transport = connectedTransport.get();
770        if (transport != null) {
771            return transport.narrow(target);
772        }
773        return null;
774
775    }
776
777    protected void restoreTransport(Transport t) throws Exception, IOException {
778        t.start();
779        // send information to the broker - informing it we are an ft client
780        ConnectionControl cc = new ConnectionControl();
781        cc.setFaultTolerant(true);
782        t.oneway(cc);
783        stateTracker.restore(t);
784        Map<Integer, Command> tmpMap = null;
785        synchronized (requestMap) {
786            tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
787        }
788        for (Command command : tmpMap.values()) {
789            if (LOG.isTraceEnabled()) {
790                LOG.trace("restore requestMap, replay: " + command);
791            }
792            t.oneway(command);
793        }
794    }
795
796    public boolean isUseExponentialBackOff() {
797        return useExponentialBackOff;
798    }
799
800    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
801        this.useExponentialBackOff = useExponentialBackOff;
802    }
803
804    @Override
805    public String toString() {
806        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
807    }
808
809    public String getRemoteAddress() {
810        Transport transport = connectedTransport.get();
811        if (transport != null) {
812            return transport.getRemoteAddress();
813        }
814        return null;
815    }
816
817    public boolean isFaultTolerant() {
818        return true;
819    }
820
821    private void doUpdateURIsFromDisk() {
822        // If updateURIsURL is specified, read the file and add any new
823        // transport URI's to this FailOverTransport.
824        // Note: Could track file timestamp to avoid unnecessary reading.
825        String fileURL = getUpdateURIsURL();
826        if (fileURL != null) {
827            BufferedReader in = null;
828            String newUris = null;
829            StringBuffer buffer = new StringBuffer();
830
831            try {
832                in = new BufferedReader(getURLStream(fileURL));
833                while (true) {
834                    String line = in.readLine();
835                    if (line == null) {
836                        break;
837                    }
838                    buffer.append(line);
839                }
840                newUris = buffer.toString();
841            } catch (IOException ioe) {
842                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
843            } finally {
844                if (in != null) {
845                    try {
846                        in.close();
847                    } catch (IOException ioe) {
848                        // ignore
849                    }
850                }
851            }
852
853            processNewTransports(isRebalanceUpdateURIs(), newUris);
854        }
855    }
856
857    final boolean doReconnect() {
858        Exception failure = null;
859        synchronized (reconnectMutex) {
860
861            // First ensure we are up to date.
862            doUpdateURIsFromDisk();
863
864            if (disposed || connectionFailure != null) {
865                reconnectMutex.notifyAll();
866            }
867            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
868                return false;
869            } else {
870                List<URI> connectList = getConnectList();
871                if (connectList.isEmpty()) {
872                    failure = new IOException("No uris available to connect to.");
873                } else {
874                    if (doRebalance) {
875                        if (connectList.get(0).equals(connectedTransportURI)) {
876                            // already connected to first in the list, no need to rebalance
877                            doRebalance = false;
878                            return false;
879                        } else {
880                            if (LOG.isDebugEnabled()) {
881                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
882                            }
883                            try {
884                                Transport transport = this.connectedTransport.getAndSet(null);
885                                if (transport != null) {
886                                    disposeTransport(transport);
887                                }
888                            } catch (Exception e) {
889                                if (LOG.isDebugEnabled()) {
890                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
891                                }
892                            }
893                        }
894                        doRebalance = false;
895                    }
896
897                    resetReconnectDelay();
898
899                    Transport transport = null;
900                    URI uri = null;
901
902                    // If we have a backup already waiting lets try it.
903                    synchronized (backupMutex) {
904                        if ((priorityBackup || backup) && !backups.isEmpty()) {
905                            ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
906                            if (randomize) {
907                                Collections.shuffle(l);
908                            }
909                            BackupTransport bt = l.remove(0);
910                            backups.remove(bt);
911                            transport = bt.getTransport();
912                            uri = bt.getUri();
913                            if (priorityBackup && priorityBackupAvailable) {
914                                Transport old = this.connectedTransport.getAndSet(null);
915                                if (transport != null) {
916                                    disposeTransport(old);
917                                }
918                                priorityBackupAvailable = false;
919                            }
920                        }
921                    }
922
923                    // Sleep for the reconnectDelay if there's no backup and we aren't trying
924                    // for the first time, or we were disposed for some reason.
925                    if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
926                        synchronized (sleepMutex) {
927                            if (LOG.isDebugEnabled()) {
928                                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
929                            }
930                            try {
931                                sleepMutex.wait(reconnectDelay);
932                            } catch (InterruptedException e) {
933                                Thread.currentThread().interrupt();
934                            }
935                        }
936                    }
937
938                    Iterator<URI> iter = connectList.iterator();
939                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
940
941                        try {
942                            SslContext.setCurrentSslContext(brokerSslContext);
943
944                            // We could be starting with a backup and if so we wait to grab a
945                            // URI from the pool until next time around.
946                            if (transport == null) {
947                                uri = iter.next();
948                                transport = TransportFactory.compositeConnect(uri);
949                            }
950
951                            if (LOG.isDebugEnabled()) {
952                                LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
953                            }
954                            transport.setTransportListener(myTransportListener);
955                            transport.start();
956
957                            if (started &&  !firstConnection) {
958                                restoreTransport(transport);
959                            }
960
961                             if (LOG.isDebugEnabled()) {
962                                LOG.debug("Connection established");
963                             }
964                            reconnectDelay = initialReconnectDelay;
965                            connectedTransportURI = uri;
966                            connectedTransport.set(transport);
967                            reconnectMutex.notifyAll();
968                            connectFailures = 0;
969
970                            // Make sure on initial startup, that the transportListener
971                            // has been initialized for this instance.
972                            synchronized (listenerMutex) {
973                                if (transportListener == null) {
974                                    try {
975                                        // if it isn't set after 2secs - it probably never will be
976                                        listenerMutex.wait(2000);
977                                    } catch (InterruptedException ex) {
978                                    }
979                                }
980                            }
981
982                            if (transportListener != null) {
983                                transportListener.transportResumed();
984                            } else {
985                                if (LOG.isDebugEnabled()) {
986                                    LOG.debug("transport resumed by transport listener not set");
987                                }
988                            }
989
990                            if (firstConnection) {
991                                firstConnection = false;
992                                LOG.info("Successfully connected to " + uri);
993                            } else {
994                                LOG.info("Successfully reconnected to " + uri);
995                            }
996
997                            connected = true;
998                            return false;
999                        } catch (Exception e) {
1000                            failure = e;
1001                            if (LOG.isDebugEnabled()) {
1002                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1003                            }
1004                            if (transport != null) {
1005                                try {
1006                                    transport.stop();
1007                                    transport = null;
1008                                } catch (Exception ee) {
1009                                    if (LOG.isDebugEnabled()) {
1010                                        LOG.debug("Stop of failed transport: " + transport +
1011                                                  " failed with reason: " + ee);
1012                                    }
1013                                }
1014                            }
1015                        } finally {
1016                            SslContext.setCurrentSslContext(null);
1017                        }
1018                    }
1019                }
1020            }
1021
1022            int reconnectLimit = calculateReconnectAttemptLimit();
1023
1024            connectFailures++;
1025            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1026                LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1027                connectionFailure = failure;
1028
1029                // Make sure on initial startup, that the transportListener has been
1030                // initialized for this instance.
1031                synchronized (listenerMutex) {
1032                    if (transportListener == null) {
1033                        try {
1034                            listenerMutex.wait(2000);
1035                        } catch (InterruptedException ex) {
1036                        }
1037                    }
1038                }
1039
1040                propagateFailureToExceptionListener(connectionFailure);
1041                return false;
1042            }
1043        }
1044
1045        if (!disposed) {
1046            doDelay();
1047        }
1048
1049        return !disposed;
1050    }
1051
1052    private void doDelay() {
1053        if (reconnectDelay > 0) {
1054            synchronized (sleepMutex) {
1055                if (LOG.isDebugEnabled()) {
1056                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1057                }
1058                try {
1059                    sleepMutex.wait(reconnectDelay);
1060                } catch (InterruptedException e) {
1061                    Thread.currentThread().interrupt();
1062                }
1063            }
1064        }
1065
1066        if (useExponentialBackOff) {
1067            // Exponential increment of reconnect delay.
1068            reconnectDelay *= backOffMultiplier;
1069            if (reconnectDelay > maxReconnectDelay) {
1070                reconnectDelay = maxReconnectDelay;
1071            }
1072        }
1073    }
1074
1075    private void resetReconnectDelay() {
1076        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1077            reconnectDelay = initialReconnectDelay;
1078        }
1079    }
1080
1081    /*
1082      * called with reconnectMutex held
1083     */
1084    private void propagateFailureToExceptionListener(Exception exception) {
1085        if (transportListener != null) {
1086            if (exception instanceof IOException) {
1087                transportListener.onException((IOException)exception);
1088            } else {
1089                transportListener.onException(IOExceptionSupport.create(exception));
1090            }
1091        }
1092        reconnectMutex.notifyAll();
1093    }
1094
1095    private int calculateReconnectAttemptLimit() {
1096        int maxReconnectValue = this.maxReconnectAttempts;
1097        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1098            maxReconnectValue = this.startupMaxReconnectAttempts;
1099        }
1100        return maxReconnectValue;
1101    }
1102
1103    final boolean buildBackups() {
1104        synchronized (backupMutex) {
1105            if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
1106                ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1107                List<URI> connectList = getConnectList();
1108                for (URI uri: connectList) {
1109                    if (!backupList.contains(uri)) {
1110                        backupList.add(uri);
1111                    }
1112                }
1113                // removed disposed backups
1114                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1115                for (BackupTransport bt : backups) {
1116                    if (bt.isDisposed()) {
1117                        disposedList.add(bt);
1118                    }
1119                }
1120                backups.removeAll(disposedList);
1121                disposedList.clear();
1122                for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
1123                    URI uri = iter.next();
1124                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1125                        try {
1126                            SslContext.setCurrentSslContext(brokerSslContext);
1127                            BackupTransport bt = new BackupTransport(this);
1128                            bt.setUri(uri);
1129                            if (!backups.contains(bt)) {
1130                                Transport t = TransportFactory.compositeConnect(uri);
1131                                t.setTransportListener(bt);
1132                                t.start();
1133                                bt.setTransport(t);
1134                                backups.add(bt);
1135                                if (priorityBackup && isPriority(uri)) {
1136                                   priorityBackupAvailable = true;
1137                                }
1138                            }
1139                        } catch (Exception e) {
1140                            LOG.debug("Failed to build backup ", e);
1141                        } finally {
1142                            SslContext.setCurrentSslContext(null);
1143                        }
1144                    }
1145                }
1146            }
1147        }
1148        return false;
1149    }
1150
1151    protected boolean isPriority(URI uri) {
1152        if (!priorityList.isEmpty()) {
1153            return priorityList.contains(uri);
1154        }
1155        return uris.indexOf(uri) == 0;
1156    }
1157
1158    public boolean isDisposed() {
1159        return disposed;
1160    }
1161
1162    public boolean isConnected() {
1163        return connected;
1164    }
1165
1166    public void reconnect(URI uri) throws IOException {
1167        add(true, new URI[]{uri});
1168    }
1169
1170    public boolean isReconnectSupported() {
1171        return this.reconnectSupported;
1172    }
1173
1174    public void setReconnectSupported(boolean value) {
1175        this.reconnectSupported = value;
1176    }
1177
1178    public boolean isUpdateURIsSupported() {
1179        return this.updateURIsSupported;
1180    }
1181
1182    public void setUpdateURIsSupported(boolean value) {
1183        this.updateURIsSupported = value;
1184    }
1185
1186    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1187        if (isUpdateURIsSupported()) {
1188            HashSet<URI> copy = new HashSet<URI>(this.updated);
1189            updated.clear();
1190            if (updatedURIs != null && updatedURIs.length > 0) {
1191                for (URI uri : updatedURIs) {
1192                    if (uri != null && !updated.contains(uri)) {
1193                        updated.add(uri);
1194                    }
1195                }
1196                if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1197                    buildBackups();
1198                    synchronized (reconnectMutex) {
1199                        reconnect(rebalance);
1200                    }
1201                }
1202            }
1203        }
1204    }
1205
1206    /**
1207     * @return the updateURIsURL
1208     */
1209    public String getUpdateURIsURL() {
1210        return this.updateURIsURL;
1211    }
1212
1213    /**
1214     * @param updateURIsURL the updateURIsURL to set
1215     */
1216    public void setUpdateURIsURL(String updateURIsURL) {
1217        this.updateURIsURL = updateURIsURL;
1218    }
1219
1220    /**
1221     * @return the rebalanceUpdateURIs
1222     */
1223    public boolean isRebalanceUpdateURIs() {
1224        return this.rebalanceUpdateURIs;
1225    }
1226
1227    /**
1228     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1229     */
1230    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1231        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1232    }
1233
1234    public int getReceiveCounter() {
1235        Transport transport = connectedTransport.get();
1236        if (transport == null) {
1237            return 0;
1238        }
1239        return transport.getReceiveCounter();
1240    }
1241
1242    public int getConnectFailures() {
1243        return connectFailures;
1244    }
1245
1246    public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1247        synchronized (reconnectMutex) {
1248            stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1249        }
1250    }
1251
1252    public ConnectionStateTracker getStateTracker() {
1253        return stateTracker;
1254    }
1255
1256    private boolean contains(URI newURI) {
1257        boolean result = false;
1258        for (URI uri : uris) {
1259            if (newURI.getPort() == uri.getPort()) {
1260                InetAddress newAddr = null;
1261                InetAddress addr = null;
1262                try {
1263                    newAddr = InetAddress.getByName(newURI.getHost());
1264                    addr = InetAddress.getByName(uri.getHost());
1265                } catch(IOException e) {
1266
1267                    if (newAddr == null) {
1268                        LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
1269                    } else {
1270                        LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
1271                    }
1272
1273                    if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
1274                        result = true;
1275                        break;
1276                    } else {
1277                        continue;
1278                    }
1279                }
1280
1281                if (addr.equals(newAddr)) {
1282                    result = true;
1283                    break;
1284                }
1285            }
1286        }
1287
1288        return result;
1289    }
1290
1291    private InputStreamReader getURLStream(String path) throws IOException {
1292        InputStreamReader result = null;
1293        URL url = null;
1294        try {
1295            url = new URL(path);
1296            result = new InputStreamReader(url.openStream());
1297        } catch (MalformedURLException e) {
1298            // ignore - it could be a path to a a local file
1299        }
1300        if (result == null) {
1301            result = new FileReader(path);
1302        }
1303        return result;
1304    }
1305}