001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.discovery.simple; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.concurrent.atomic.AtomicBoolean; 022 023 import org.apache.activemq.command.DiscoveryEvent; 024 import org.apache.activemq.thread.DefaultThreadPools; 025 import org.apache.activemq.transport.discovery.DiscoveryAgent; 026 import org.apache.activemq.transport.discovery.DiscoveryListener; 027 import org.slf4j.Logger; 028 import org.slf4j.LoggerFactory; 029 030 /** 031 * A simple DiscoveryAgent that allows static configuration of the discovered 032 * services. 033 * 034 * 035 */ 036 public class SimpleDiscoveryAgent implements DiscoveryAgent { 037 038 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class); 039 private long initialReconnectDelay = 1000; 040 private long maxReconnectDelay = 1000 * 30; 041 private long backOffMultiplier = 2; 042 private boolean useExponentialBackOff=true; 043 private int maxReconnectAttempts; 044 private final Object sleepMutex = new Object(); 045 private long minConnectTime = 5000; 046 private DiscoveryListener listener; 047 private String services[] = new String[] {}; 048 private final AtomicBoolean running = new AtomicBoolean(false); 049 050 class SimpleDiscoveryEvent extends DiscoveryEvent { 051 052 private int connectFailures; 053 private long reconnectDelay = initialReconnectDelay; 054 private long connectTime = System.currentTimeMillis(); 055 private AtomicBoolean failed = new AtomicBoolean(false); 056 057 public SimpleDiscoveryEvent(String service) { 058 super(service); 059 } 060 061 @Override 062 public String toString() { 063 return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]"; 064 } 065 } 066 067 public void setDiscoveryListener(DiscoveryListener listener) { 068 this.listener = listener; 069 } 070 071 public void registerService(String name) throws IOException { 072 } 073 074 public void start() throws Exception { 075 running.set(true); 076 for (int i = 0; i < services.length; i++) { 077 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); 078 } 079 } 080 081 public void stop() throws Exception { 082 running.set(false); 083 synchronized (sleepMutex) { 084 sleepMutex.notifyAll(); 085 } 086 } 087 088 public String[] getServices() { 089 return services; 090 } 091 092 public void setServices(String services) { 093 this.services = services.split(","); 094 } 095 096 public void setServices(String services[]) { 097 this.services = services; 098 } 099 100 public void setServices(URI services[]) { 101 this.services = new String[services.length]; 102 for (int i = 0; i < services.length; i++) { 103 this.services[i] = services[i].toString(); 104 } 105 } 106 107 public void serviceFailed(DiscoveryEvent devent) throws IOException { 108 109 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; 110 if (event.failed.compareAndSet(false, true)) { 111 112 listener.onServiceRemove(event); 113 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 114 public void run() { 115 116 // We detect a failed connection attempt because the service 117 // fails right 118 // away. 119 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 120 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event); 121 122 event.connectFailures++; 123 124 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 125 LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event); 126 return; 127 } 128 129 synchronized (sleepMutex) { 130 try { 131 if (!running.get()) { 132 LOG.debug("Reconnecting disabled: stopped"); 133 return; 134 } 135 136 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); 137 sleepMutex.wait(event.reconnectDelay); 138 } catch (InterruptedException ie) { 139 LOG.debug("Reconnecting disabled: " + ie); 140 Thread.currentThread().interrupt(); 141 return; 142 } 143 } 144 145 if (!useExponentialBackOff) { 146 event.reconnectDelay = initialReconnectDelay; 147 } else { 148 // Exponential increment of reconnect delay. 149 event.reconnectDelay *= backOffMultiplier; 150 if (event.reconnectDelay > maxReconnectDelay) { 151 event.reconnectDelay = maxReconnectDelay; 152 } 153 } 154 155 } else { 156 event.connectFailures = 0; 157 event.reconnectDelay = initialReconnectDelay; 158 } 159 160 if (!running.get()) { 161 LOG.debug("Reconnecting disabled: stopped"); 162 return; 163 } 164 165 event.connectTime = System.currentTimeMillis(); 166 event.failed.set(false); 167 listener.onServiceAdd(event); 168 } 169 }, "Simple Discovery Agent"); 170 } 171 } 172 173 public long getBackOffMultiplier() { 174 return backOffMultiplier; 175 } 176 177 public void setBackOffMultiplier(long backOffMultiplier) { 178 this.backOffMultiplier = backOffMultiplier; 179 } 180 181 public long getInitialReconnectDelay() { 182 return initialReconnectDelay; 183 } 184 185 public void setInitialReconnectDelay(long initialReconnectDelay) { 186 this.initialReconnectDelay = initialReconnectDelay; 187 } 188 189 public int getMaxReconnectAttempts() { 190 return maxReconnectAttempts; 191 } 192 193 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 194 this.maxReconnectAttempts = maxReconnectAttempts; 195 } 196 197 public long getMaxReconnectDelay() { 198 return maxReconnectDelay; 199 } 200 201 public void setMaxReconnectDelay(long maxReconnectDelay) { 202 this.maxReconnectDelay = maxReconnectDelay; 203 } 204 205 public long getMinConnectTime() { 206 return minConnectTime; 207 } 208 209 public void setMinConnectTime(long minConnectTime) { 210 this.minConnectTime = minConnectTime; 211 } 212 213 public boolean isUseExponentialBackOff() { 214 return useExponentialBackOff; 215 } 216 217 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 218 this.useExponentialBackOff = useExponentialBackOff; 219 } 220 }