001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.failover;
018    
019    import java.io.BufferedReader;
020    import java.io.FileReader;
021    import java.io.IOException;
022    import java.io.InputStreamReader;
023    import java.io.InterruptedIOException;
024    import java.net.InetAddress;
025    import java.net.MalformedURLException;
026    import java.net.URI;
027    import java.net.URL;
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.LinkedHashMap;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.StringTokenizer;
036    import java.util.concurrent.CopyOnWriteArrayList;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    import org.apache.activemq.broker.SslContext;
040    import org.apache.activemq.command.Command;
041    import org.apache.activemq.command.ConnectionControl;
042    import org.apache.activemq.command.ConnectionId;
043    import org.apache.activemq.command.RemoveInfo;
044    import org.apache.activemq.command.Response;
045    import org.apache.activemq.state.ConnectionStateTracker;
046    import org.apache.activemq.state.Tracked;
047    import org.apache.activemq.thread.DefaultThreadPools;
048    import org.apache.activemq.thread.Task;
049    import org.apache.activemq.thread.TaskRunner;
050    import org.apache.activemq.transport.CompositeTransport;
051    import org.apache.activemq.transport.DefaultTransportListener;
052    import org.apache.activemq.transport.FutureResponse;
053    import org.apache.activemq.transport.ResponseCallback;
054    import org.apache.activemq.transport.Transport;
055    import org.apache.activemq.transport.TransportFactory;
056    import org.apache.activemq.transport.TransportListener;
057    import org.apache.activemq.util.IOExceptionSupport;
058    import org.apache.activemq.util.ServiceSupport;
059    import org.slf4j.Logger;
060    import org.slf4j.LoggerFactory;
061    
062    /**
063     * A Transport that is made reliable by being able to fail over to another
064     * transport when a transport failure is detected.
065     */
066    public class FailoverTransport implements CompositeTransport {
067    
068        private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
069        private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
070        private static final int INFINITE = -1;
071        private TransportListener transportListener;
072        private boolean disposed;
073        private boolean connected;
074        private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
075        private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
076    
077        private final Object reconnectMutex = new Object();
078        private final Object backupMutex = new Object();
079        private final Object sleepMutex = new Object();
080        private final Object listenerMutex = new Object();
081        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
082        private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
083    
084        private URI connectedTransportURI;
085        private URI failedConnectTransportURI;
086        private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
087        private final TaskRunner reconnectTask;
088        private boolean started;
089        private boolean initialized;
090        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
091        private long maxReconnectDelay = 1000 * 30;
092        private double backOffMultiplier = 2d;
093        private long timeout = INFINITE;
094        private boolean useExponentialBackOff = true;
095        private boolean randomize = true;
096        private int maxReconnectAttempts = INFINITE;
097        private int startupMaxReconnectAttempts = INFINITE;
098        private int connectFailures;
099        private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
100        private Exception connectionFailure;
101        private boolean firstConnection = true;
102        // optionally always have a backup created
103        private boolean backup = false;
104        private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
105        private int backupPoolSize = 1;
106        private boolean trackMessages = false;
107        private boolean trackTransactionProducers = true;
108        private int maxCacheSize = 128 * 1024;
109        private final TransportListener disposedListener = new DefaultTransportListener() {
110        };
111        private final TransportListener myTransportListener = createTransportListener();
112        private boolean updateURIsSupported = true;
113        private boolean reconnectSupported = true;
114        // remember for reconnect thread
115        private SslContext brokerSslContext;
116        private String updateURIsURL = null;
117        private boolean rebalanceUpdateURIs = true;
118        private boolean doRebalance = false;
119        private boolean connectedToPriority = false;
120    
121        private boolean priorityBackup = false;
122        private ArrayList<URI> priorityList = new ArrayList<URI>();
123        private boolean priorityBackupAvailable = false;
124    
125        public FailoverTransport() throws InterruptedIOException {
126            brokerSslContext = SslContext.getCurrentSslContext();
127            stateTracker.setTrackTransactions(true);
128            // Setup a task that is used to reconnect the a connection async.
129            reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
130                public boolean iterate() {
131                    boolean result = false;
132                    if (!started) {
133                        return result;
134                    }
135                    boolean buildBackup = true;
136                    synchronized (backupMutex) {
137                        if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
138                            result = doReconnect();
139                            buildBackup = false;
140                            connectedToPriority = isPriority(connectedTransportURI);
141                        }
142                    }
143                    if (buildBackup) {
144                        buildBackups();
145                        if (priorityBackup && !connectedToPriority) {
146                            try {
147                                doDelay();
148                                if (reconnectTask == null) {
149                                    return true;
150                                }
151                                reconnectTask.wakeup();
152                            } catch (InterruptedException e) {
153                                LOG.debug("Reconnect task has been interrupted.", e);
154                            }
155                        }
156                    } else {
157                        // build backups on the next iteration
158                        buildBackup = true;
159                        try {
160                            if (reconnectTask == null) {
161                                return true;
162                            }
163                            reconnectTask.wakeup();
164                        } catch (InterruptedException e) {
165                            LOG.debug("Reconnect task has been interrupted.", e);
166                        }
167                    }
168                    return result;
169                }
170    
171            }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
172        }
173    
174        TransportListener createTransportListener() {
175            return new TransportListener() {
176                public void onCommand(Object o) {
177                    Command command = (Command) o;
178                    if (command == null) {
179                        return;
180                    }
181                    if (command.isResponse()) {
182                        Object object = null;
183                        synchronized (requestMap) {
184                            object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
185                        }
186                        if (object != null && object.getClass() == Tracked.class) {
187                            ((Tracked) object).onResponses(command);
188                        }
189                    }
190                    if (!initialized) {
191                        initialized = true;
192                    }
193    
194                    if (command.isConnectionControl()) {
195                        handleConnectionControl((ConnectionControl) command);
196                    }
197                    if (transportListener != null) {
198                        transportListener.onCommand(command);
199                    }
200                }
201    
202                public void onException(IOException error) {
203                    try {
204                        handleTransportFailure(error);
205                    } catch (InterruptedException e) {
206                        Thread.currentThread().interrupt();
207                        transportListener.onException(new InterruptedIOException());
208                    }
209                }
210    
211                public void transportInterupted() {
212                    if (transportListener != null) {
213                        transportListener.transportInterupted();
214                    }
215                }
216    
217                public void transportResumed() {
218                    if (transportListener != null) {
219                        transportListener.transportResumed();
220                    }
221                }
222            };
223        }
224    
225        public final void disposeTransport(Transport transport) {
226            transport.setTransportListener(disposedListener);
227            ServiceSupport.dispose(transport);
228        }
229    
230        public final void handleTransportFailure(IOException e) throws InterruptedException {
231            if (LOG.isTraceEnabled()) {
232                LOG.trace(this + " handleTransportFailure: " + e);
233            }
234            Transport transport = connectedTransport.getAndSet(null);
235            if (transport == null) {
236                // sync with possible in progress reconnect
237                synchronized (reconnectMutex) {
238                    transport = connectedTransport.getAndSet(null);
239                }
240            }
241            if (transport != null) {
242    
243                disposeTransport(transport);
244    
245                boolean reconnectOk = false;
246                synchronized (reconnectMutex) {
247                    if (canReconnect()) {
248                        reconnectOk = true;
249                    }
250                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason:  " + e
251                            + (reconnectOk ? "," : ", not")  +" attempting to automatically reconnect");
252    
253                    initialized = false;
254                    failedConnectTransportURI = connectedTransportURI;
255                    connectedTransportURI = null;
256                    connected = false;
257    
258                    // notify before any reconnect attempt so ack state can be whacked
259                    if (transportListener != null) {
260                        transportListener.transportInterupted();
261                    }
262    
263                    if (reconnectOk) {
264                        updated.remove(failedConnectTransportURI);
265                        reconnectTask.wakeup();
266                    } else {
267                        propagateFailureToExceptionListener(e);
268                    }
269                }
270            }
271        }
272    
273        private boolean canReconnect() {
274            return started && 0 != calculateReconnectAttemptLimit();
275        }
276    
277        public final void handleConnectionControl(ConnectionControl control) {
278            String reconnectStr = control.getReconnectTo();
279            if (reconnectStr != null) {
280                reconnectStr = reconnectStr.trim();
281                if (reconnectStr.length() > 0) {
282                    try {
283                        URI uri = new URI(reconnectStr);
284                        if (isReconnectSupported()) {
285                            reconnect(uri);
286                            LOG.info("Reconnected to: " + uri);
287                        }
288                    } catch (Exception e) {
289                        LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
290                    }
291                }
292            }
293            processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
294        }
295    
296        private final void processNewTransports(boolean rebalance, String newTransports) {
297            if (newTransports != null) {
298                newTransports = newTransports.trim();
299                if (newTransports.length() > 0 && isUpdateURIsSupported()) {
300                    List<URI> list = new ArrayList<URI>();
301                    StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
302                    while (tokenizer.hasMoreTokens()) {
303                        String str = tokenizer.nextToken();
304                        try {
305                            URI uri = new URI(str);
306                            list.add(uri);
307                        } catch (Exception e) {
308                            LOG.error("Failed to parse broker address: " + str, e);
309                        }
310                    }
311                    if (list.isEmpty() == false) {
312                        try {
313                            updateURIs(rebalance, list.toArray(new URI[list.size()]));
314                        } catch (IOException e) {
315                            LOG.error("Failed to update transport URI's from: " + newTransports, e);
316                        }
317                    }
318                }
319            }
320        }
321    
322        public void start() throws Exception {
323            synchronized (reconnectMutex) {
324                if (LOG.isDebugEnabled()) {
325                    LOG.debug("Started " + this);
326                }
327                if (started) {
328                    return;
329                }
330                started = true;
331                stateTracker.setMaxCacheSize(getMaxCacheSize());
332                stateTracker.setTrackMessages(isTrackMessages());
333                stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
334                if (connectedTransport.get() != null) {
335                    stateTracker.restore(connectedTransport.get());
336                } else {
337                    reconnect(false);
338                }
339            }
340        }
341    
342        public void stop() throws Exception {
343            Transport transportToStop = null;
344            synchronized (reconnectMutex) {
345                if (LOG.isDebugEnabled()) {
346                    LOG.debug("Stopped " + this);
347                }
348                if (!started) {
349                    return;
350                }
351                started = false;
352                disposed = true;
353                connected = false;
354                for (BackupTransport t : backups) {
355                    t.setDisposed(true);
356                }
357                backups.clear();
358    
359                if (connectedTransport.get() != null) {
360                    transportToStop = connectedTransport.getAndSet(null);
361                }
362                reconnectMutex.notifyAll();
363            }
364            synchronized (sleepMutex) {
365                sleepMutex.notifyAll();
366            }
367            reconnectTask.shutdown();
368            if (transportToStop != null) {
369                transportToStop.stop();
370            }
371        }
372    
373        public long getInitialReconnectDelay() {
374            return initialReconnectDelay;
375        }
376    
377        public void setInitialReconnectDelay(long initialReconnectDelay) {
378            this.initialReconnectDelay = initialReconnectDelay;
379        }
380    
381        public long getMaxReconnectDelay() {
382            return maxReconnectDelay;
383        }
384    
385        public void setMaxReconnectDelay(long maxReconnectDelay) {
386            this.maxReconnectDelay = maxReconnectDelay;
387        }
388    
389        public long getReconnectDelay() {
390            return reconnectDelay;
391        }
392    
393        public void setReconnectDelay(long reconnectDelay) {
394            this.reconnectDelay = reconnectDelay;
395        }
396    
397        public double getReconnectDelayExponent() {
398            return backOffMultiplier;
399        }
400    
401        public void setReconnectDelayExponent(double reconnectDelayExponent) {
402            this.backOffMultiplier = reconnectDelayExponent;
403        }
404    
405        public Transport getConnectedTransport() {
406            return connectedTransport.get();
407        }
408    
409        public URI getConnectedTransportURI() {
410            return connectedTransportURI;
411        }
412    
413        public int getMaxReconnectAttempts() {
414            return maxReconnectAttempts;
415        }
416    
417        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
418            this.maxReconnectAttempts = maxReconnectAttempts;
419        }
420    
421        public int getStartupMaxReconnectAttempts() {
422            return this.startupMaxReconnectAttempts;
423        }
424    
425        public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
426            this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
427        }
428    
429        public long getTimeout() {
430            return timeout;
431        }
432    
433        public void setTimeout(long timeout) {
434            this.timeout = timeout;
435        }
436    
437        /**
438         * @return Returns the randomize.
439         */
440        public boolean isRandomize() {
441            return randomize;
442        }
443    
444        /**
445         * @param randomize The randomize to set.
446         */
447        public void setRandomize(boolean randomize) {
448            this.randomize = randomize;
449        }
450    
451        public boolean isBackup() {
452            return backup;
453        }
454    
455        public void setBackup(boolean backup) {
456            this.backup = backup;
457        }
458    
459        public int getBackupPoolSize() {
460            return backupPoolSize;
461        }
462    
463        public void setBackupPoolSize(int backupPoolSize) {
464            this.backupPoolSize = backupPoolSize;
465        }
466    
467        public int getCurrentBackups() {
468            return this.backups.size();
469        }
470    
471        public boolean isTrackMessages() {
472            return trackMessages;
473        }
474    
475        public void setTrackMessages(boolean trackMessages) {
476            this.trackMessages = trackMessages;
477        }
478    
479        public boolean isTrackTransactionProducers() {
480            return this.trackTransactionProducers;
481        }
482    
483        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
484            this.trackTransactionProducers = trackTransactionProducers;
485        }
486    
487        public int getMaxCacheSize() {
488            return maxCacheSize;
489        }
490    
491        public void setMaxCacheSize(int maxCacheSize) {
492            this.maxCacheSize = maxCacheSize;
493        }
494    
495        public boolean isPriorityBackup() {
496            return priorityBackup;
497        }
498    
499        public void setPriorityBackup(boolean priorityBackup) {
500            this.priorityBackup = priorityBackup;
501        }
502    
503        public void setPriorityURIs(String priorityURIs) {
504            StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
505            while (tokenizer.hasMoreTokens()) {
506                String str = tokenizer.nextToken();
507                try {
508                    URI uri = new URI(str);
509                    priorityList.add(uri);
510                } catch (Exception e) {
511                    LOG.error("Failed to parse broker address: " + str, e);
512                }
513            }
514        }
515    
516        public void oneway(Object o) throws IOException {
517    
518            Command command = (Command) o;
519            Exception error = null;
520            try {
521    
522                synchronized (reconnectMutex) {
523    
524                    if (command != null && connectedTransport.get() == null) {
525                        if (command.isShutdownInfo()) {
526                            // Skipping send of ShutdownInfo command when not connected.
527                            return;
528                        } else if (command instanceof RemoveInfo || command.isMessageAck()) {
529                            // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
530                            stateTracker.track(command);
531                            if (command.isResponseRequired()) {
532                                Response response = new Response();
533                                response.setCorrelationId(command.getCommandId());
534                                myTransportListener.onCommand(response);
535                            }
536                            return;
537                        }
538                    }
539    
540                    // Keep trying until the message is sent.
541                    for (int i = 0; !disposed; i++) {
542                        try {
543    
544                            // Wait for transport to be connected.
545                            Transport transport = connectedTransport.get();
546                            long start = System.currentTimeMillis();
547                            boolean timedout = false;
548                            while (transport == null && !disposed && connectionFailure == null
549                                    && !Thread.currentThread().isInterrupted()) {
550                                if (LOG.isTraceEnabled()) {
551                                    LOG.trace("Waiting for transport to reconnect..: " + command);
552                                }
553                                long end = System.currentTimeMillis();
554                                if (timeout > 0 && (end - start > timeout)) {
555                                    timedout = true;
556                                    if (LOG.isInfoEnabled()) {
557                                        LOG.info("Failover timed out after " + (end - start) + "ms");
558                                    }
559                                    break;
560                                }
561                                try {
562                                    reconnectMutex.wait(100);
563                                } catch (InterruptedException e) {
564                                    Thread.currentThread().interrupt();
565                                    if (LOG.isDebugEnabled()) {
566                                        LOG.debug("Interupted: " + e, e);
567                                    }
568                                }
569                                transport = connectedTransport.get();
570                            }
571    
572                            if (transport == null) {
573                                // Previous loop may have exited due to use being
574                                // disposed.
575                                if (disposed) {
576                                    error = new IOException("Transport disposed.");
577                                } else if (connectionFailure != null) {
578                                    error = connectionFailure;
579                                } else if (timedout == true) {
580                                    error = new IOException("Failover timeout of " + timeout + " ms reached.");
581                                } else {
582                                    error = new IOException("Unexpected failure.");
583                                }
584                                break;
585                            }
586    
587                            // If it was a request and it was not being tracked by
588                            // the state tracker,
589                            // then hold it in the requestMap so that we can replay
590                            // it later.
591                            Tracked tracked = stateTracker.track(command);
592                            synchronized (requestMap) {
593                                if (tracked != null && tracked.isWaitingForResponse()) {
594                                    requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
595                                } else if (tracked == null && command.isResponseRequired()) {
596                                    requestMap.put(Integer.valueOf(command.getCommandId()), command);
597                                }
598                            }
599    
600                            // Send the message.
601                            try {
602                                transport.oneway(command);
603                                stateTracker.trackBack(command);
604                            } catch (IOException e) {
605    
606                                // If the command was not tracked.. we will retry in
607                                // this method
608                                if (tracked == null) {
609    
610                                    // since we will retry in this method.. take it
611                                    // out of the request
612                                    // map so that it is not sent 2 times on
613                                    // recovery
614                                    if (command.isResponseRequired()) {
615                                        requestMap.remove(Integer.valueOf(command.getCommandId()));
616                                    }
617    
618                                    // Rethrow the exception so it will handled by
619                                    // the outer catch
620                                    throw e;
621                                } else {
622                                    // Handle the error but allow the method to return since the
623                                    // tracked commands are replayed on reconnect.
624                                    if (LOG.isDebugEnabled()) {
625                                        LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
626                                    }
627                                    handleTransportFailure(e);
628                                }
629                            }
630    
631                            return;
632    
633                        } catch (IOException e) {
634                            if (LOG.isDebugEnabled()) {
635                                LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
636                            }
637                            handleTransportFailure(e);
638                        }
639                    }
640                }
641            } catch (InterruptedException e) {
642                // Some one may be trying to stop our thread.
643                Thread.currentThread().interrupt();
644                throw new InterruptedIOException();
645            }
646    
647            if (!disposed) {
648                if (error != null) {
649                    if (error instanceof IOException) {
650                        throw (IOException) error;
651                    }
652                    throw IOExceptionSupport.create(error);
653                }
654            }
655        }
656    
657        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
658            throw new AssertionError("Unsupported Method");
659        }
660    
661        public Object request(Object command) throws IOException {
662            throw new AssertionError("Unsupported Method");
663        }
664    
665        public Object request(Object command, int timeout) throws IOException {
666            throw new AssertionError("Unsupported Method");
667        }
668    
669        public void add(boolean rebalance, URI u[]) {
670            boolean newURI = false;
671            for (URI uri : u) {
672                if (!contains(uri)) {
673                    uris.add(uri);
674                    newURI = true;
675                }
676            }
677            if (newURI) {
678                reconnect(rebalance);
679            }
680        }
681    
682        public void remove(boolean rebalance, URI u[]) {
683            for (URI uri : u) {
684                uris.remove(uri);
685            }
686            // rebalance is automatic if any connected to removed/stopped broker
687        }
688    
689        public void add(boolean rebalance, String u) {
690            try {
691                URI newURI = new URI(u);
692                if (contains(newURI) == false) {
693                    uris.add(newURI);
694                    reconnect(rebalance);
695                }
696    
697            } catch (Exception e) {
698                LOG.error("Failed to parse URI: " + u);
699            }
700        }
701    
702        public void reconnect(boolean rebalance) {
703            synchronized (reconnectMutex) {
704                if (started) {
705                    if (rebalance) {
706                        doRebalance = true;
707                    }
708                    LOG.debug("Waking up reconnect task");
709                    try {
710                        reconnectTask.wakeup();
711                    } catch (InterruptedException e) {
712                        Thread.currentThread().interrupt();
713                    }
714                } else {
715                    LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
716                }
717            }
718        }
719    
720        private List<URI> getConnectList() {
721            if (!updated.isEmpty()) {
722                if (failedConnectTransportURI != null) {
723                    boolean removed = updated.remove(failedConnectTransportURI);
724                    if (removed) {
725                        updated.add(failedConnectTransportURI);
726                    }
727                }
728                return updated;
729            }
730            ArrayList<URI> l = new ArrayList<URI>(uris);
731            boolean removed = false;
732            if (failedConnectTransportURI != null) {
733                removed = l.remove(failedConnectTransportURI);
734            }
735            if (randomize) {
736                // Randomly, reorder the list by random swapping
737                for (int i = 0; i < l.size(); i++) {
738                    int p = (int) (Math.random() * 100 % l.size());
739                    URI t = l.get(p);
740                    l.set(p, l.get(i));
741                    l.set(i, t);
742                }
743            }
744            if (removed) {
745                l.add(failedConnectTransportURI);
746            }
747            if (LOG.isDebugEnabled()) {
748                LOG.debug("urlList connectionList:" + l + ", from: " + uris);
749            }
750            return l;
751        }
752    
753        public TransportListener getTransportListener() {
754            return transportListener;
755        }
756    
757        public void setTransportListener(TransportListener commandListener) {
758            synchronized (listenerMutex) {
759                this.transportListener = commandListener;
760                listenerMutex.notifyAll();
761            }
762        }
763    
764        public <T> T narrow(Class<T> target) {
765    
766            if (target.isAssignableFrom(getClass())) {
767                return target.cast(this);
768            }
769            Transport transport = connectedTransport.get();
770            if (transport != null) {
771                return transport.narrow(target);
772            }
773            return null;
774    
775        }
776    
777        protected void restoreTransport(Transport t) throws Exception, IOException {
778            t.start();
779            // send information to the broker - informing it we are an ft client
780            ConnectionControl cc = new ConnectionControl();
781            cc.setFaultTolerant(true);
782            t.oneway(cc);
783            stateTracker.restore(t);
784            Map<Integer, Command> tmpMap = null;
785            synchronized (requestMap) {
786                tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
787            }
788            for (Command command : tmpMap.values()) {
789                if (LOG.isTraceEnabled()) {
790                    LOG.trace("restore requestMap, replay: " + command);
791                }
792                t.oneway(command);
793            }
794        }
795    
796        public boolean isUseExponentialBackOff() {
797            return useExponentialBackOff;
798        }
799    
800        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
801            this.useExponentialBackOff = useExponentialBackOff;
802        }
803    
804        @Override
805        public String toString() {
806            return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
807        }
808    
809        public String getRemoteAddress() {
810            Transport transport = connectedTransport.get();
811            if (transport != null) {
812                return transport.getRemoteAddress();
813            }
814            return null;
815        }
816    
817        public boolean isFaultTolerant() {
818            return true;
819        }
820    
821        private void doUpdateURIsFromDisk() {
822            // If updateURIsURL is specified, read the file and add any new
823            // transport URI's to this FailOverTransport.
824            // Note: Could track file timestamp to avoid unnecessary reading.
825            String fileURL = getUpdateURIsURL();
826            if (fileURL != null) {
827                BufferedReader in = null;
828                String newUris = null;
829                StringBuffer buffer = new StringBuffer();
830    
831                try {
832                    in = new BufferedReader(getURLStream(fileURL));
833                    while (true) {
834                        String line = in.readLine();
835                        if (line == null) {
836                            break;
837                        }
838                        buffer.append(line);
839                    }
840                    newUris = buffer.toString();
841                } catch (IOException ioe) {
842                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
843                } finally {
844                    if (in != null) {
845                        try {
846                            in.close();
847                        } catch (IOException ioe) {
848                            // ignore
849                        }
850                    }
851                }
852    
853                processNewTransports(isRebalanceUpdateURIs(), newUris);
854            }
855        }
856    
857        final boolean doReconnect() {
858            Exception failure = null;
859            synchronized (reconnectMutex) {
860    
861                // First ensure we are up to date.
862                doUpdateURIsFromDisk();
863    
864                if (disposed || connectionFailure != null) {
865                    reconnectMutex.notifyAll();
866                }
867                if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
868                    return false;
869                } else {
870                    List<URI> connectList = getConnectList();
871                    if (connectList.isEmpty()) {
872                        failure = new IOException("No uris available to connect to.");
873                    } else {
874                        if (doRebalance) {
875                            if (connectList.get(0).equals(connectedTransportURI)) {
876                                // already connected to first in the list, no need to rebalance
877                                doRebalance = false;
878                                return false;
879                            } else {
880                                if (LOG.isDebugEnabled()) {
881                                    LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
882                                }
883                                try {
884                                    Transport transport = this.connectedTransport.getAndSet(null);
885                                    if (transport != null) {
886                                        disposeTransport(transport);
887                                    }
888                                } catch (Exception e) {
889                                    if (LOG.isDebugEnabled()) {
890                                        LOG.debug("Caught an exception stopping existing transport for rebalance", e);
891                                    }
892                                }
893                            }
894                            doRebalance = false;
895                        }
896    
897                        resetReconnectDelay();
898    
899                        Transport transport = null;
900                        URI uri = null;
901    
902                        // If we have a backup already waiting lets try it.
903                        synchronized (backupMutex) {
904                            if ((priorityBackup || backup) && !backups.isEmpty()) {
905                                ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
906                                if (randomize) {
907                                    Collections.shuffle(l);
908                                }
909                                BackupTransport bt = l.remove(0);
910                                backups.remove(bt);
911                                transport = bt.getTransport();
912                                uri = bt.getUri();
913                                if (priorityBackup && priorityBackupAvailable) {
914                                    Transport old = this.connectedTransport.getAndSet(null);
915                                    if (transport != null) {
916                                        disposeTransport(old);
917                                    }
918                                    priorityBackupAvailable = false;
919                                }
920                            }
921                        }
922    
923                        // Sleep for the reconnectDelay if there's no backup and we aren't trying
924                        // for the first time, or we were disposed for some reason.
925                        if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
926                            synchronized (sleepMutex) {
927                                if (LOG.isDebugEnabled()) {
928                                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
929                                }
930                                try {
931                                    sleepMutex.wait(reconnectDelay);
932                                } catch (InterruptedException e) {
933                                    Thread.currentThread().interrupt();
934                                }
935                            }
936                        }
937    
938                        Iterator<URI> iter = connectList.iterator();
939                        while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
940    
941                            try {
942                                SslContext.setCurrentSslContext(brokerSslContext);
943    
944                                // We could be starting with a backup and if so we wait to grab a
945                                // URI from the pool until next time around.
946                                if (transport == null) {
947                                    uri = iter.next();
948                                    transport = TransportFactory.compositeConnect(uri);
949                                }
950    
951                                if (LOG.isDebugEnabled()) {
952                                    LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
953                                }
954                                transport.setTransportListener(myTransportListener);
955                                transport.start();
956    
957                                if (started &&  !firstConnection) {
958                                    restoreTransport(transport);
959                                }
960    
961                                 if (LOG.isDebugEnabled()) {
962                                    LOG.debug("Connection established");
963                                 }
964                                reconnectDelay = initialReconnectDelay;
965                                connectedTransportURI = uri;
966                                connectedTransport.set(transport);
967                                reconnectMutex.notifyAll();
968                                connectFailures = 0;
969    
970                                // Make sure on initial startup, that the transportListener
971                                // has been initialized for this instance.
972                                synchronized (listenerMutex) {
973                                    if (transportListener == null) {
974                                        try {
975                                            // if it isn't set after 2secs - it probably never will be
976                                            listenerMutex.wait(2000);
977                                        } catch (InterruptedException ex) {
978                                        }
979                                    }
980                                }
981    
982                                if (transportListener != null) {
983                                    transportListener.transportResumed();
984                                } else {
985                                    if (LOG.isDebugEnabled()) {
986                                        LOG.debug("transport resumed by transport listener not set");
987                                    }
988                                }
989    
990                                if (firstConnection) {
991                                    firstConnection = false;
992                                    LOG.info("Successfully connected to " + uri);
993                                } else {
994                                    LOG.info("Successfully reconnected to " + uri);
995                                }
996    
997                                connected = true;
998                                return false;
999                            } catch (Exception e) {
1000                                failure = e;
1001                                if (LOG.isDebugEnabled()) {
1002                                    LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1003                                }
1004                                if (transport != null) {
1005                                    try {
1006                                        transport.stop();
1007                                        transport = null;
1008                                    } catch (Exception ee) {
1009                                        if (LOG.isDebugEnabled()) {
1010                                            LOG.debug("Stop of failed transport: " + transport +
1011                                                      " failed with reason: " + ee);
1012                                        }
1013                                    }
1014                                }
1015                            } finally {
1016                                SslContext.setCurrentSslContext(null);
1017                            }
1018                        }
1019                    }
1020                }
1021    
1022                int reconnectLimit = calculateReconnectAttemptLimit();
1023    
1024                connectFailures++;
1025                if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1026                    LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1027                    connectionFailure = failure;
1028    
1029                    // Make sure on initial startup, that the transportListener has been
1030                    // initialized for this instance.
1031                    synchronized (listenerMutex) {
1032                        if (transportListener == null) {
1033                            try {
1034                                listenerMutex.wait(2000);
1035                            } catch (InterruptedException ex) {
1036                            }
1037                        }
1038                    }
1039    
1040                    propagateFailureToExceptionListener(connectionFailure);
1041                    return false;
1042                }
1043            }
1044    
1045            if (!disposed) {
1046                doDelay();
1047            }
1048    
1049            return !disposed;
1050        }
1051    
1052        private void doDelay() {
1053            if (reconnectDelay > 0) {
1054                synchronized (sleepMutex) {
1055                    if (LOG.isDebugEnabled()) {
1056                        LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1057                    }
1058                    try {
1059                        sleepMutex.wait(reconnectDelay);
1060                    } catch (InterruptedException e) {
1061                        Thread.currentThread().interrupt();
1062                    }
1063                }
1064            }
1065    
1066            if (useExponentialBackOff) {
1067                // Exponential increment of reconnect delay.
1068                reconnectDelay *= backOffMultiplier;
1069                if (reconnectDelay > maxReconnectDelay) {
1070                    reconnectDelay = maxReconnectDelay;
1071                }
1072            }
1073        }
1074    
1075        private void resetReconnectDelay() {
1076            if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1077                reconnectDelay = initialReconnectDelay;
1078            }
1079        }
1080    
1081        /*
1082          * called with reconnectMutex held
1083         */
1084        private void propagateFailureToExceptionListener(Exception exception) {
1085            if (transportListener != null) {
1086                if (exception instanceof IOException) {
1087                    transportListener.onException((IOException)exception);
1088                } else {
1089                    transportListener.onException(IOExceptionSupport.create(exception));
1090                }
1091            }
1092            reconnectMutex.notifyAll();
1093        }
1094    
1095        private int calculateReconnectAttemptLimit() {
1096            int maxReconnectValue = this.maxReconnectAttempts;
1097            if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1098                maxReconnectValue = this.startupMaxReconnectAttempts;
1099            }
1100            return maxReconnectValue;
1101        }
1102    
1103        final boolean buildBackups() {
1104            synchronized (backupMutex) {
1105                if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
1106                    ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1107                    List<URI> connectList = getConnectList();
1108                    for (URI uri: connectList) {
1109                        if (!backupList.contains(uri)) {
1110                            backupList.add(uri);
1111                        }
1112                    }
1113                    // removed disposed backups
1114                    List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1115                    for (BackupTransport bt : backups) {
1116                        if (bt.isDisposed()) {
1117                            disposedList.add(bt);
1118                        }
1119                    }
1120                    backups.removeAll(disposedList);
1121                    disposedList.clear();
1122                    for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
1123                        URI uri = iter.next();
1124                        if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1125                            try {
1126                                SslContext.setCurrentSslContext(brokerSslContext);
1127                                BackupTransport bt = new BackupTransport(this);
1128                                bt.setUri(uri);
1129                                if (!backups.contains(bt)) {
1130                                    Transport t = TransportFactory.compositeConnect(uri);
1131                                    t.setTransportListener(bt);
1132                                    t.start();
1133                                    bt.setTransport(t);
1134                                    backups.add(bt);
1135                                    if (priorityBackup && isPriority(uri)) {
1136                                       priorityBackupAvailable = true;
1137                                    }
1138                                }
1139                            } catch (Exception e) {
1140                                LOG.debug("Failed to build backup ", e);
1141                            } finally {
1142                                SslContext.setCurrentSslContext(null);
1143                            }
1144                        }
1145                    }
1146                }
1147            }
1148            return false;
1149        }
1150    
1151        protected boolean isPriority(URI uri) {
1152            if (!priorityList.isEmpty()) {
1153                return priorityList.contains(uri);
1154            }
1155            return uris.indexOf(uri) == 0;
1156        }
1157    
1158        public boolean isDisposed() {
1159            return disposed;
1160        }
1161    
1162        public boolean isConnected() {
1163            return connected;
1164        }
1165    
1166        public void reconnect(URI uri) throws IOException {
1167            add(true, new URI[]{uri});
1168        }
1169    
1170        public boolean isReconnectSupported() {
1171            return this.reconnectSupported;
1172        }
1173    
1174        public void setReconnectSupported(boolean value) {
1175            this.reconnectSupported = value;
1176        }
1177    
1178        public boolean isUpdateURIsSupported() {
1179            return this.updateURIsSupported;
1180        }
1181    
1182        public void setUpdateURIsSupported(boolean value) {
1183            this.updateURIsSupported = value;
1184        }
1185    
1186        public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1187            if (isUpdateURIsSupported()) {
1188                HashSet<URI> copy = new HashSet<URI>(this.updated);
1189                updated.clear();
1190                if (updatedURIs != null && updatedURIs.length > 0) {
1191                    for (URI uri : updatedURIs) {
1192                        if (uri != null && !updated.contains(uri)) {
1193                            updated.add(uri);
1194                        }
1195                    }
1196                    if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1197                        buildBackups();
1198                        synchronized (reconnectMutex) {
1199                            reconnect(rebalance);
1200                        }
1201                    }
1202                }
1203            }
1204        }
1205    
1206        /**
1207         * @return the updateURIsURL
1208         */
1209        public String getUpdateURIsURL() {
1210            return this.updateURIsURL;
1211        }
1212    
1213        /**
1214         * @param updateURIsURL the updateURIsURL to set
1215         */
1216        public void setUpdateURIsURL(String updateURIsURL) {
1217            this.updateURIsURL = updateURIsURL;
1218        }
1219    
1220        /**
1221         * @return the rebalanceUpdateURIs
1222         */
1223        public boolean isRebalanceUpdateURIs() {
1224            return this.rebalanceUpdateURIs;
1225        }
1226    
1227        /**
1228         * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1229         */
1230        public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1231            this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1232        }
1233    
1234        public int getReceiveCounter() {
1235            Transport transport = connectedTransport.get();
1236            if (transport == null) {
1237                return 0;
1238            }
1239            return transport.getReceiveCounter();
1240        }
1241    
1242        public int getConnectFailures() {
1243            return connectFailures;
1244        }
1245    
1246        public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1247            synchronized (reconnectMutex) {
1248                stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1249            }
1250        }
1251    
1252        public ConnectionStateTracker getStateTracker() {
1253            return stateTracker;
1254        }
1255    
1256        private boolean contains(URI newURI) {
1257            boolean result = false;
1258            for (URI uri : uris) {
1259                if (newURI.getPort() == uri.getPort()) {
1260                    InetAddress newAddr = null;
1261                    InetAddress addr = null;
1262                    try {
1263                        newAddr = InetAddress.getByName(newURI.getHost());
1264                        addr = InetAddress.getByName(uri.getHost());
1265                    } catch(IOException e) {
1266    
1267                        if (newAddr == null) {
1268                            LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
1269                        } else {
1270                            LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
1271                        }
1272    
1273                        if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
1274                            result = true;
1275                            break;
1276                        } else {
1277                            continue;
1278                        }
1279                    }
1280    
1281                    if (addr.equals(newAddr)) {
1282                        result = true;
1283                        break;
1284                    }
1285                }
1286            }
1287    
1288            return result;
1289        }
1290    
1291        private InputStreamReader getURLStream(String path) throws IOException {
1292            InputStreamReader result = null;
1293            URL url = null;
1294            try {
1295                url = new URL(path);
1296                result = new InputStreamReader(url.openStream());
1297            } catch (MalformedURLException e) {
1298                // ignore - it could be a path to a a local file
1299            }
1300            if (result == null) {
1301                result = new FileReader(path);
1302            }
1303            return result;
1304        }
1305    }