001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.simple;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import org.apache.activemq.command.DiscoveryEvent;
024    import org.apache.activemq.thread.DefaultThreadPools;
025    import org.apache.activemq.transport.discovery.DiscoveryAgent;
026    import org.apache.activemq.transport.discovery.DiscoveryListener;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * A simple DiscoveryAgent that allows static configuration of the discovered
032     * services.
033     * 
034     * 
035     */
036    public class SimpleDiscoveryAgent implements DiscoveryAgent {
037    
038        private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039        private long initialReconnectDelay = 1000;
040        private long maxReconnectDelay = 1000 * 30;
041        private long backOffMultiplier = 2;
042        private boolean useExponentialBackOff=true;
043        private int maxReconnectAttempts;
044        private final Object sleepMutex = new Object();
045        private long minConnectTime = 5000;
046        private DiscoveryListener listener;
047        private String services[] = new String[] {};
048        private final AtomicBoolean running = new AtomicBoolean(false);
049    
050        class SimpleDiscoveryEvent extends DiscoveryEvent {
051    
052            private int connectFailures;
053            private long reconnectDelay = initialReconnectDelay;
054            private long connectTime = System.currentTimeMillis();
055            private AtomicBoolean failed = new AtomicBoolean(false);
056    
057            public SimpleDiscoveryEvent(String service) {
058                super(service);
059            }
060    
061            @Override
062            public String toString() {
063                return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
064            }
065        }
066    
067        public void setDiscoveryListener(DiscoveryListener listener) {
068            this.listener = listener;
069        }
070    
071        public void registerService(String name) throws IOException {
072        }
073    
074        public void start() throws Exception {
075            running.set(true);
076            for (int i = 0; i < services.length; i++) {
077                listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
078            }
079        }
080    
081        public void stop() throws Exception {
082            running.set(false);
083            synchronized (sleepMutex) {
084                sleepMutex.notifyAll();
085            }
086        }
087    
088        public String[] getServices() {
089            return services;
090        }
091    
092        public void setServices(String services) {
093            this.services = services.split(",");
094        }
095    
096        public void setServices(String services[]) {
097            this.services = services;
098        }
099    
100        public void setServices(URI services[]) {
101            this.services = new String[services.length];
102            for (int i = 0; i < services.length; i++) {
103                this.services[i] = services[i].toString();
104            }
105        }
106    
107        public void serviceFailed(DiscoveryEvent devent) throws IOException {
108    
109            final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
110            if (event.failed.compareAndSet(false, true)) {
111    
112                listener.onServiceRemove(event);
113                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
114                    public void run() {
115    
116                        // We detect a failed connection attempt because the service
117                        // fails right
118                        // away.
119                        if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
120                            LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
121    
122                            event.connectFailures++;
123    
124                            if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
125                                LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled for: " + event);
126                                return;
127                            }
128    
129                            synchronized (sleepMutex) {
130                                try {
131                                    if (!running.get()) {
132                                        LOG.debug("Reconnecting disabled: stopped");
133                                        return;
134                                    }
135    
136                                    LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
137                                    sleepMutex.wait(event.reconnectDelay);
138                                } catch (InterruptedException ie) {
139                                    LOG.debug("Reconnecting disabled: " + ie);
140                                    Thread.currentThread().interrupt();
141                                    return;
142                                }
143                            }
144    
145                            if (!useExponentialBackOff) {
146                                event.reconnectDelay = initialReconnectDelay;
147                            } else {
148                                // Exponential increment of reconnect delay.
149                                event.reconnectDelay *= backOffMultiplier;
150                                if (event.reconnectDelay > maxReconnectDelay) {
151                                    event.reconnectDelay = maxReconnectDelay;
152                                }
153                            }
154    
155                        } else {
156                            event.connectFailures = 0;
157                            event.reconnectDelay = initialReconnectDelay;
158                        }
159    
160                        if (!running.get()) {
161                            LOG.debug("Reconnecting disabled: stopped");
162                            return;
163                        }
164    
165                        event.connectTime = System.currentTimeMillis();
166                        event.failed.set(false);
167                        listener.onServiceAdd(event);
168                    }
169                }, "Simple Discovery Agent");
170            }
171        }
172    
173        public long getBackOffMultiplier() {
174            return backOffMultiplier;
175        }
176    
177        public void setBackOffMultiplier(long backOffMultiplier) {
178            this.backOffMultiplier = backOffMultiplier;
179        }
180    
181        public long getInitialReconnectDelay() {
182            return initialReconnectDelay;
183        }
184    
185        public void setInitialReconnectDelay(long initialReconnectDelay) {
186            this.initialReconnectDelay = initialReconnectDelay;
187        }
188    
189        public int getMaxReconnectAttempts() {
190            return maxReconnectAttempts;
191        }
192    
193        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
194            this.maxReconnectAttempts = maxReconnectAttempts;
195        }
196    
197        public long getMaxReconnectDelay() {
198            return maxReconnectDelay;
199        }
200    
201        public void setMaxReconnectDelay(long maxReconnectDelay) {
202            this.maxReconnectDelay = maxReconnectDelay;
203        }
204    
205        public long getMinConnectTime() {
206            return minConnectTime;
207        }
208    
209        public void setMinConnectTime(long minConnectTime) {
210            this.minConnectTime = minConnectTime;
211        }
212    
213        public boolean isUseExponentialBackOff() {
214            return useExponentialBackOff;
215        }
216    
217        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
218            this.useExponentialBackOff = useExponentialBackOff;
219        }
220    }