001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.discovery.multicast;
019    
020    import java.io.IOException;
021    import java.net.DatagramPacket;
022    import java.net.InetAddress;
023    import java.net.InetSocketAddress;
024    import java.net.MulticastSocket;
025    import java.net.NetworkInterface;
026    import java.net.SocketAddress;
027    import java.net.SocketTimeoutException;
028    import java.net.URI;
029    import java.util.Iterator;
030    import java.util.Map;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.LinkedBlockingQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import org.apache.activemq.command.DiscoveryEvent;
040    import org.apache.activemq.transport.discovery.DiscoveryAgent;
041    import org.apache.activemq.transport.discovery.DiscoveryListener;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
047     * encoded using any wireformat, but openwire by default.
048     * 
049     * 
050     */
051    public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
052    
053        public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
054        public static final String DEFAULT_HOST_STR = "default"; 
055        public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
056        public static final int    DEFAULT_PORT  = 6155; 
057            
058        private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
059        private static final String TYPE_SUFFIX = "ActiveMQ-4.";
060        private static final String ALIVE = "alive.";
061        private static final String DEAD = "dead.";
062        private static final String DELIMITER = "%";
063        private static final int BUFF_SIZE = 8192;
064        private static final int DEFAULT_IDLE_TIME = 500;
065        private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
066    
067        private long initialReconnectDelay = 1000 * 5;
068        private long maxReconnectDelay = 1000 * 30;
069        private long backOffMultiplier = 2;
070        private boolean useExponentialBackOff;
071        private int maxReconnectAttempts;
072    
073        private int timeToLive = 1;
074        private boolean loopBackMode;
075        private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
076        private String group = "default";
077        private URI discoveryURI;
078        private InetAddress inetAddress;
079        private SocketAddress sockAddress;
080        private DiscoveryListener discoveryListener;
081        private String selfService;
082        private MulticastSocket mcast;
083        private Thread runner;
084        private long keepAliveInterval = DEFAULT_IDLE_TIME;
085        private String mcInterface;
086        private String mcNetworkInterface;
087        private String mcJoinNetworkInterface;
088        private long lastAdvertizeTime;
089        private AtomicBoolean started = new AtomicBoolean(false);
090        private boolean reportAdvertizeFailed = true;
091        private ExecutorService executor = null;
092    
093        class RemoteBrokerData {
094            final String brokerName;
095            final String service;
096            long lastHeartBeat;
097            long recoveryTime;
098            int failureCount;
099            boolean failed;
100    
101            public RemoteBrokerData(String brokerName, String service) {
102                this.brokerName = brokerName;
103                this.service = service;
104                this.lastHeartBeat = System.currentTimeMillis();
105            }
106    
107            public synchronized void updateHeartBeat() {
108                lastHeartBeat = System.currentTimeMillis();
109    
110                // Consider that the broker recovery has succeeded if it has not
111                // failed in 60 seconds.
112                if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
113                    if (LOG.isDebugEnabled()) {
114                        LOG.debug("I now think that the " + service + " service has recovered.");
115                    }
116                    failureCount = 0;
117                    recoveryTime = 0;
118                }
119            }
120    
121            public synchronized long getLastHeartBeat() {
122                return lastHeartBeat;
123            }
124    
125            public synchronized boolean markFailed() {
126                if (!failed) {
127                    failed = true;
128                    failureCount++;
129    
130                    long reconnectDelay;
131                    if (!useExponentialBackOff) {
132                        reconnectDelay = initialReconnectDelay;
133                    } else {
134                        reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
135                        if (reconnectDelay > maxReconnectDelay) {
136                            reconnectDelay = maxReconnectDelay;
137                        }
138                    }
139    
140                    if (LOG.isDebugEnabled()) {
141                        LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
142                                  + " ms, the current failure count is: " + failureCount);
143                    }
144    
145                    recoveryTime = System.currentTimeMillis() + reconnectDelay;
146                    return true;
147                }
148                return false;
149            }
150    
151            /**
152             * @return true if this broker is marked failed and it is now the right
153             *         time to start recovery.
154             */
155            public synchronized boolean doRecovery() {
156                if (!failed) {
157                    return false;
158                }
159    
160                // Are we done trying to recover this guy?
161                if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
162                    if (LOG.isDebugEnabled()) {
163                        LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
164                    }
165                    return false;
166                }
167    
168                // Is it not yet time?
169                if (System.currentTimeMillis() < recoveryTime) {
170                    return false;
171                }
172    
173                if (LOG.isDebugEnabled()) {
174                    LOG.debug("Resuming event advertisement of the " + service + " service.");
175                }
176                failed = false;
177                return true;
178            }
179    
180            public boolean isFailed() {
181                return failed;
182            }
183        }
184    
185        /**
186         * Set the discovery listener
187         * 
188         * @param listener
189         */
190        public void setDiscoveryListener(DiscoveryListener listener) {
191            this.discoveryListener = listener;
192        }
193    
194        /**
195         * register a service
196         */
197        public void registerService(String name) throws IOException {
198            this.selfService = name;
199            if (started.get()) {
200                doAdvertizeSelf();
201            }
202        }
203    
204        /**
205         * @return Returns the loopBackMode.
206         */
207        public boolean isLoopBackMode() {
208            return loopBackMode;
209        }
210    
211        /**
212         * @param loopBackMode The loopBackMode to set.
213         */
214        public void setLoopBackMode(boolean loopBackMode) {
215            this.loopBackMode = loopBackMode;
216        }
217    
218        /**
219         * @return Returns the timeToLive.
220         */
221        public int getTimeToLive() {
222            return timeToLive;
223        }
224    
225        /**
226         * @param timeToLive The timeToLive to set.
227         */
228        public void setTimeToLive(int timeToLive) {
229            this.timeToLive = timeToLive;
230        }
231    
232        /**
233         * @return the discoveryURI
234         */
235        public URI getDiscoveryURI() {
236            return discoveryURI;
237        }
238    
239        /**
240         * Set the discoveryURI
241         * 
242         * @param discoveryURI
243         */
244        public void setDiscoveryURI(URI discoveryURI) {
245            this.discoveryURI = discoveryURI;
246        }
247    
248        public long getKeepAliveInterval() {
249            return keepAliveInterval;
250        }
251    
252        public void setKeepAliveInterval(long keepAliveInterval) {
253            this.keepAliveInterval = keepAliveInterval;
254        }
255        
256        public void setInterface(String mcInterface) {
257            this.mcInterface = mcInterface;
258        }
259        
260        public void setNetworkInterface(String mcNetworkInterface) {
261            this.mcNetworkInterface = mcNetworkInterface;    
262        }
263        
264        public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
265            this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
266        }
267        
268        /**
269         * start the discovery agent
270         * 
271         * @throws Exception
272         */
273        public void start() throws Exception {
274            
275            if (started.compareAndSet(false, true)) {               
276                                    
277                if (group == null || group.length() == 0) {
278                    throw new IOException("You must specify a group to discover");
279                }
280                String type = getType();
281                if (!type.endsWith(".")) {
282                    LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
283                    type += ".";
284                }
285                
286                if (discoveryURI == null) {
287                    discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
288                }
289                
290                if (LOG.isTraceEnabled()) 
291                            LOG.trace("start - discoveryURI = " + discoveryURI);                                      
292                      
293                      String myHost = discoveryURI.getHost();
294                      int    myPort = discoveryURI.getPort(); 
295                         
296                      if( DEFAULT_HOST_STR.equals(myHost) ) 
297                            myHost = DEFAULT_HOST_IP;                         
298                      
299                      if(myPort < 0 )
300                        myPort = DEFAULT_PORT;                  
301                      
302                      if (LOG.isTraceEnabled()) {
303                            LOG.trace("start - myHost = " + myHost); 
304                            LOG.trace("start - myPort = " + myPort);        
305                            LOG.trace("start - group  = " + group );                                
306                            LOG.trace("start - interface  = " + mcInterface );
307                            LOG.trace("start - network interface  = " + mcNetworkInterface );
308                            LOG.trace("start - join network interface  = " + mcJoinNetworkInterface );
309                      }     
310                      
311                this.inetAddress = InetAddress.getByName(myHost);
312                this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
313                mcast = new MulticastSocket(myPort);
314                mcast.setLoopbackMode(loopBackMode);
315                mcast.setTimeToLive(getTimeToLive());
316                if (mcJoinNetworkInterface != null) {
317                    mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
318                }
319                else {
320                    mcast.joinGroup(inetAddress);
321                }
322                mcast.setSoTimeout((int)keepAliveInterval);
323                if (mcInterface != null) {
324                    mcast.setInterface(InetAddress.getByName(mcInterface));
325                }
326                if (mcNetworkInterface != null) {
327                    mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
328                }
329                runner = new Thread(this);
330                runner.setName(this.toString() + ":" + runner.getName());
331                runner.setDaemon(true);
332                runner.start();
333                doAdvertizeSelf();
334            }
335        }
336    
337        /**
338         * stop the channel
339         * 
340         * @throws Exception
341         */
342        public void stop() throws Exception {
343            if (started.compareAndSet(true, false)) {
344                doAdvertizeSelf();
345                if (mcast != null) {
346                    mcast.close();
347                }
348                if (runner != null) {
349                    runner.interrupt();
350                }
351                getExecutor().shutdownNow();
352            }
353        }
354    
355        public String getType() {
356            return group + "." + TYPE_SUFFIX;
357        }
358    
359        public void run() {
360            byte[] buf = new byte[BUFF_SIZE];
361            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
362            while (started.get()) {
363                doTimeKeepingServices();
364                try {
365                    mcast.receive(packet);
366                    if (packet.getLength() > 0) {
367                        String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
368                        processData(str);
369                    }
370                } catch (SocketTimeoutException se) {
371                    // ignore
372                } catch (IOException e) {
373                    if (started.get()) {
374                        LOG.error("failed to process packet: " + e);
375                    }
376                }
377            }
378        }
379    
380        private void processData(String str) {
381            if (discoveryListener != null) {
382                if (str.startsWith(getType())) {
383                    String payload = str.substring(getType().length());
384                    if (payload.startsWith(ALIVE)) {
385                        String brokerName = getBrokerName(payload.substring(ALIVE.length()));
386                        String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
387                        processAlive(brokerName, service);
388                    } else {
389                        String brokerName = getBrokerName(payload.substring(DEAD.length()));
390                        String service = payload.substring(DEAD.length() + brokerName.length() + 2);
391                        processDead(service);
392                    }
393                }
394            }
395        }
396    
397        private void doTimeKeepingServices() {
398            if (started.get()) {
399                long currentTime = System.currentTimeMillis();
400                if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
401                    doAdvertizeSelf();
402                    lastAdvertizeTime = currentTime;
403                }
404                doExpireOldServices();
405            }
406        }
407    
408        private void doAdvertizeSelf() {
409            if (selfService != null) {
410                String payload = getType();
411                payload += started.get() ? ALIVE : DEAD;
412                payload += DELIMITER + "localhost" + DELIMITER;
413                payload += selfService;
414                try {
415                    byte[] data = payload.getBytes();
416                    DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
417                    mcast.send(packet);
418                } catch (IOException e) {
419                    // If a send fails, chances are all subsequent sends will fail
420                    // too.. No need to keep reporting the
421                    // same error over and over.
422                    if (reportAdvertizeFailed) {
423                        reportAdvertizeFailed = false;
424                        LOG.error("Failed to advertise our service: " + payload, e);
425                        if ("Operation not permitted".equals(e.getMessage())) {
426                            LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
427                                      + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
428                        }
429                    }
430                }
431            }
432        }
433    
434        private void processAlive(String brokerName, String service) {
435            if (selfService == null || !service.equals(selfService)) {
436                RemoteBrokerData data = brokersByService.get(service);
437                if (data == null) {
438                    data = new RemoteBrokerData(brokerName, service);
439                    brokersByService.put(service, data);      
440                    fireServiceAddEvent(data);
441                    doAdvertizeSelf();
442                } else {
443                    data.updateHeartBeat();
444                    if (data.doRecovery()) {
445                        fireServiceAddEvent(data);
446                    }
447                }
448            }
449        }
450    
451        private void processDead(String service) {
452            if (!service.equals(selfService)) {
453                RemoteBrokerData data = brokersByService.remove(service);
454                if (data != null && !data.isFailed()) {
455                    fireServiceRemovedEvent(data);
456                }
457            }
458        }
459    
460        private void doExpireOldServices() {
461            long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
462            for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
463                RemoteBrokerData data = i.next();
464                if (data.getLastHeartBeat() < expireTime) {
465                    processDead(data.service);
466                }
467            }
468        }
469    
470        private String getBrokerName(String str) {
471            String result = null;
472            int start = str.indexOf(DELIMITER);
473            if (start >= 0) {
474                int end = str.indexOf(DELIMITER, start + 1);
475                result = str.substring(start + 1, end);
476            }
477            return result;
478        }
479    
480        public void serviceFailed(DiscoveryEvent event) throws IOException {
481            RemoteBrokerData data = brokersByService.get(event.getServiceName());
482            if (data != null && data.markFailed()) {
483                fireServiceRemovedEvent(data);
484            }
485        }
486    
487        private void fireServiceRemovedEvent(RemoteBrokerData data) {
488            if (discoveryListener != null && started.get()) {
489                final DiscoveryEvent event = new DiscoveryEvent(data.service);
490                event.setBrokerName(data.brokerName);
491    
492                // Have the listener process the event async so that
493                // he does not block this thread since we are doing time sensitive
494                // processing of events.
495                getExecutor().execute(new Runnable() {
496                    public void run() {
497                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
498                        if (discoveryListener != null) {
499                            discoveryListener.onServiceRemove(event);
500                        }
501                    }
502                });
503            }
504        }
505    
506        private void fireServiceAddEvent(RemoteBrokerData data) {
507            if (discoveryListener != null && started.get()) {
508                final DiscoveryEvent event = new DiscoveryEvent(data.service);
509                event.setBrokerName(data.brokerName);
510                
511                // Have the listener process the event async so that
512                // he does not block this thread since we are doing time sensitive
513                // processing of events.
514                getExecutor().execute(new Runnable() {
515                    public void run() {
516                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
517                        if (discoveryListener != null) {
518                            discoveryListener.onServiceAdd(event);
519                        }
520                    }
521                });
522            }
523        }
524    
525        private ExecutorService getExecutor() {
526            if (executor == null) {
527                final String threadName = "Notifier-" + this.toString();
528                executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
529                    public Thread newThread(Runnable runable) {
530                        Thread t = new Thread(runable,  threadName);
531                        t.setDaemon(true);
532                        return t;
533                    }
534                });
535            }
536            return executor;
537        }
538    
539        public long getBackOffMultiplier() {
540            return backOffMultiplier;
541        }
542    
543        public void setBackOffMultiplier(long backOffMultiplier) {
544            this.backOffMultiplier = backOffMultiplier;
545        }
546    
547        public long getInitialReconnectDelay() {
548            return initialReconnectDelay;
549        }
550    
551        public void setInitialReconnectDelay(long initialReconnectDelay) {
552            this.initialReconnectDelay = initialReconnectDelay;
553        }
554    
555        public int getMaxReconnectAttempts() {
556            return maxReconnectAttempts;
557        }
558    
559        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
560            this.maxReconnectAttempts = maxReconnectAttempts;
561        }
562    
563        public long getMaxReconnectDelay() {
564            return maxReconnectDelay;
565        }
566    
567        public void setMaxReconnectDelay(long maxReconnectDelay) {
568            this.maxReconnectDelay = maxReconnectDelay;
569        }
570    
571        public boolean isUseExponentialBackOff() {
572            return useExponentialBackOff;
573        }
574    
575        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
576            this.useExponentialBackOff = useExponentialBackOff;
577        }
578    
579        public void setGroup(String group) {
580            this.group = group;
581        }
582        
583        @Override
584        public String toString() {
585            return  "MulticastDiscoveryAgent-"
586                + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
587        }
588    }