001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    import java.util.Hashtable;
021    import java.util.Map;
022    import java.util.Random;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import javax.naming.CommunicationException;
026    import javax.naming.Context;
027    import javax.naming.NamingEnumeration;
028    import javax.naming.directory.Attributes;
029    import javax.naming.directory.DirContext;
030    import javax.naming.directory.InitialDirContext;
031    import javax.naming.directory.SearchControls;
032    import javax.naming.directory.SearchResult;
033    import javax.naming.event.EventDirContext;
034    import javax.naming.event.NamespaceChangeListener;
035    import javax.naming.event.NamingEvent;
036    import javax.naming.event.NamingExceptionEvent;
037    import javax.naming.event.ObjectChangeListener;
038    
039    import org.apache.activemq.util.URISupport;
040    import org.apache.activemq.util.URISupport.CompositeData;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * class to create dynamic network connectors listed in an directory
046     * server using the LDAP v3 protocol as defined in RFC 2251, the
047     * entries listed in the directory server must implement the ipHost
048     * and ipService objectClasses as defined in RFC 2307.
049     * 
050     * @author Trevor Pounds
051     * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
052     * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
053     *
054     * @org.apache.xbean.XBean element="ldapNetworkConnector"
055     */
056    public class      LdapNetworkConnector
057           extends    NetworkConnector
058           implements NamespaceChangeListener,
059                      ObjectChangeListener
060    {
061       private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
062    
063       // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
064       private static final String REQUIRED_OBJECT_CLASS_FILTER  = "(&(objectClass=ipHost)(objectClass=ipService))";
065    
066       // connection
067       private URI[]   availableURIs      = null;
068       private int     availableURIsIndex = 0;
069       private String  base               = null;
070       private boolean failover           = false;
071       private long    curReconnectDelay  = 1000;  /* 1 sec */
072       private long    maxReconnectDelay  = 30000; /* 30 sec */
073    
074       // authentication
075       private String  user                    = null;
076       private String  password                = null;
077       private boolean anonymousAuthentication = false;
078    
079       // search
080       private SearchControls searchControls      = new SearchControls(/* ONELEVEL_SCOPE */);
081       private String         searchFilter        = REQUIRED_OBJECT_CLASS_FILTER;
082       private boolean        searchEventListener = false;
083    
084       // connector management
085       private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap();
086       private Map<URI, Integer>          referenceMap = new ConcurrentHashMap();
087       private Map<String, URI>           uuidMap      = new ConcurrentHashMap();
088    
089       // local context
090       private DirContext context = null;
091       //currently in use URI
092       private URI ldapURI = null;
093    
094       /**
095        * returns the next URI from the configured list
096        *
097        * @return random URI from the configured list
098        */
099       public URI getUri()
100          { return availableURIs[++availableURIsIndex % availableURIs.length]; }
101    
102       /**
103        * sets the LDAP server URI
104        *
105        * @param _uri LDAP server URI
106        */
107       public void setUri(URI _uri)
108          throws Exception
109       {
110          CompositeData data = URISupport.parseComposite(_uri);
111          if(data.getScheme().equals("failover"))
112          {
113             availableURIs = data.getComponents();
114             failover = true;
115          }
116          else
117             { availableURIs = new URI[]{ _uri }; }
118       }
119    
120       /**
121        * sets the base LDAP dn used for lookup operations
122        *
123        * @param _base LDAP base dn
124        */
125       public void setBase(String _base)
126          { base = _base; }
127    
128       /**
129        * sets the LDAP user for access credentials
130        *
131        * @param _user LDAP dn of user
132        */
133       public void setUser(String _user)
134          { user = _user; }
135    
136       /**
137        * sets the LDAP password for access credentials
138        *
139        * @param _password user password
140        */
141       public void setPassword(String _password)
142          { password = _password; }
143    
144       /**
145        * sets LDAP anonymous authentication access credentials
146        *
147        * @param _anonymousAuthentication set to true to use anonymous authentication
148        */
149       public void setAnonymousAuthentication(boolean _anonymousAuthentication)
150          { anonymousAuthentication = _anonymousAuthentication; }
151    
152       /**
153        * sets the LDAP search scope
154        *
155        * @param _searchScope LDAP JNDI search scope
156        */
157       public void setSearchScope(String _searchScope)
158          throws Exception
159       {
160          int scope;
161          if(_searchScope.equals("OBJECT_SCOPE"))
162             { scope = SearchControls.OBJECT_SCOPE; }
163          else if(_searchScope.equals("ONELEVEL_SCOPE"))
164             { scope = SearchControls.ONELEVEL_SCOPE; }
165          else if(_searchScope.equals("SUBTREE_SCOPE"))
166             { scope = SearchControls.SUBTREE_SCOPE; }
167          else
168             { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); }
169          searchControls.setSearchScope(scope);
170       }
171    
172       /**
173        * sets the LDAP search filter as defined in RFC 2254
174        *
175        * @param _searchFilter LDAP search filter
176        * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
177        */
178       public void setSearchFilter(String _searchFilter)
179          { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; }
180    
181       /**
182        * enables/disable a persistent search to the LDAP server as defined
183        * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
184        *
185        * @param _searchEventListener enable = true, disable = false (default)
186        * @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
187        */
188       public void setSearchEventListener(boolean _searchEventListener)
189          { searchEventListener = _searchEventListener; }
190    
191       /**
192        * start the connector
193        */
194       public void start()
195          throws Exception
196       {
197          LOG.info("connecting...");
198          Hashtable<String, String> env = new Hashtable();
199          env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
200          this.ldapURI = getUri();
201          LOG.debug("    URI [" + this.ldapURI + "]");
202          env.put(Context.PROVIDER_URL, this.ldapURI.toString());
203          if(anonymousAuthentication)
204          {
205             LOG.debug("    login credentials [anonymous]");
206             env.put(Context.SECURITY_AUTHENTICATION, "none");
207          }
208          else
209          {
210             LOG.debug("    login credentials [" + user + ":******]");
211             env.put(Context.SECURITY_PRINCIPAL,   user);
212             env.put(Context.SECURITY_CREDENTIALS, password);
213          }
214          boolean isConnected = false;
215          while(!isConnected)
216          {
217             try
218             {
219                context = new InitialDirContext(env);
220                isConnected = true;
221             }
222             catch(CommunicationException err)
223             {
224                if(failover)
225                {
226                    this.ldapURI = getUri();
227                   LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
228                   env.put(Context.PROVIDER_URL, this.ldapURI.toString());
229                   Thread.sleep(curReconnectDelay);
230                   curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
231                }
232                else
233                   { throw err; }
234             }
235          }
236    
237          // add connectors from search results
238          LOG.info("searching for network connectors...");
239          LOG.debug("    base   [" + base + "]");
240          LOG.debug("    filter [" + searchFilter + "]");
241          LOG.debug("    scope  [" + searchControls.getSearchScope() + "]");
242          NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
243          while(results.hasMore())
244             { addConnector(results.next()); }
245    
246          // register persistent search event listener
247          if(searchEventListener)
248          {
249             LOG.info("registering persistent search listener...");
250             EventDirContext eventContext = (EventDirContext)context.lookup("");
251             eventContext.addNamingListener(base, searchFilter, searchControls, this);
252          }
253          else // otherwise close context (i.e. connection as it is no longer needed)
254             { context.close(); }
255       }
256    
257       /**
258        * stop the connector
259        */
260       public void stop()
261          throws Exception
262       {
263          LOG.info("stopping context...");
264          for(NetworkConnector connector : connectorMap.values())
265             { connector.stop(); }
266          connectorMap.clear();
267          referenceMap.clear();
268          uuidMap.clear();
269          context.close();
270       }
271    
272       public String toString() {
273           return this.getClass().getName() + getName()  + "[" + ldapURI.toString() + "]";
274       }
275    
276       /**
277         * add connector of the given URI
278         * 
279         * @param result
280         *            search result of connector to add
281         */
282       protected synchronized void addConnector(SearchResult result)
283          throws Exception
284       {
285          String uuid = toUUID(result);
286          if(uuidMap.containsKey(uuid))
287          {
288             LOG.warn("connector already regsitered for UUID [" + uuid + "]");
289             return;
290          }
291    
292          URI connectorURI = toURI(result);
293          if(connectorMap.containsKey(connectorURI))
294          {
295             int referenceCount = referenceMap.get(connectorURI) + 1;
296             LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
297             referenceMap.put(connectorURI, referenceCount);
298             uuidMap.put(uuid, connectorURI);
299             return;
300          }
301    
302          // FIXME: disable JMX listing of LDAP managed connectors, we will
303          //       want to map/manage these differently in the future
304    //      boolean useJMX = getBrokerService().isUseJmx();
305    //      getBrokerService().setUseJmx(false);
306          NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
307    //      getBrokerService().setUseJmx(useJMX);
308    
309          // propogate std connector properties that may have been set via XML
310          connector.setDynamicOnly(isDynamicOnly());
311          connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
312          connector.setNetworkTTL(getNetworkTTL());
313          connector.setConduitSubscriptions(isConduitSubscriptions());
314          connector.setExcludedDestinations(getExcludedDestinations());
315          connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
316          connector.setDuplex(isDuplex());
317    
318          // XXX: set in the BrokerService.startAllConnectors method and is 
319          //      required to prevent remote broker exceptions upon connection
320          connector.setLocalUri(getBrokerService().getVmConnectorURI());
321          connector.setBrokerName(getBrokerService().getBrokerName());
322          connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
323    
324          // start network connector
325          connectorMap.put(connectorURI, connector);
326          referenceMap.put(connectorURI, 1);
327          uuidMap.put(uuid, connectorURI);
328          connector.start();
329          LOG.info("connector added with URI [" + connectorURI + "]");
330       }
331    
332       /**
333        * remove connector of the given URI
334        *
335        * @param result search result of connector to remove
336        */
337       protected synchronized void removeConnector(SearchResult result)
338          throws Exception
339       {
340          String uuid = toUUID(result);
341          if(!uuidMap.containsKey(uuid))
342          {
343             LOG.warn("connector not regsitered for UUID [" + uuid + "]");
344             return;
345          }
346    
347          URI connectorURI = uuidMap.get(uuid);
348          if(!connectorMap.containsKey(connectorURI))
349          {
350             LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
351             return;
352          }
353    
354          int referenceCount = referenceMap.get(connectorURI) - 1;
355          referenceMap.put(connectorURI, referenceCount);
356          uuidMap.remove(uuid);
357          LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
358    
359          if(referenceCount > 0)
360             { return; }
361    
362          NetworkConnector connector = connectorMap.remove(connectorURI);
363          connector.stop();
364          LOG.info("connector removed with URI [" + connectorURI + "]");
365       }
366    
367       /**
368        * convert search result into URI
369        *
370        * @param result search result to convert to URI
371        */
372       protected URI toURI(SearchResult result)
373          throws Exception
374       {
375          Attributes attributes = result.getAttributes();
376          String address  = (String)attributes.get("iphostnumber").get();
377          String port     = (String)attributes.get("ipserviceport").get();
378          String protocol = (String)attributes.get("ipserviceprotocol").get();
379          URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
380          LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
381          return connectorURI;
382       }
383    
384       /**
385        * convert search result into URI
386        *
387        * @param result search result to convert to URI
388        */
389       protected String toUUID(SearchResult result)
390       {
391          String uuid = result.getNameInNamespace();
392          LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
393          return uuid;
394       }
395    
396       /**
397        * invoked when an entry has been added during a persistent search
398        */
399       public void objectAdded(NamingEvent event)
400       {
401          LOG.debug("entry added");
402          try
403             { addConnector((SearchResult)event.getNewBinding()); }
404          catch(Exception err)
405             { LOG.error("ERR: caught unexpected exception", err); }
406       }
407    
408       /**
409        * invoked when an entry has been removed during a persistent search
410        */
411       public void objectRemoved(NamingEvent event)
412       {
413          LOG.debug("entry removed");
414          try
415             { removeConnector((SearchResult)event.getOldBinding()); }
416          catch(Exception err)
417             { LOG.error("ERR: caught unexpected exception", err); }
418       }
419    
420       /**
421        * invoked when an entry has been renamed during a persistent search
422        */
423       public void objectRenamed(NamingEvent event)
424       {
425          LOG.debug("entry renamed");
426          // XXX: getNameInNamespace method does not seem to work properly,
427          //      but getName seems to provide the result we want
428          String uuidOld = event.getOldBinding().getName();
429          String uuidNew = event.getNewBinding().getName();
430          URI connectorURI = uuidMap.remove(uuidOld);
431          uuidMap.put(uuidNew, connectorURI);
432          LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
433       }
434    
435       /**
436        * invoked when an entry has been changed during a persistent search
437        */
438       public void objectChanged(NamingEvent event)
439       {
440          LOG.debug("entry changed");
441          try
442          {
443             SearchResult result = (SearchResult)event.getNewBinding();
444             removeConnector(result);
445             addConnector(result);
446          }
447          catch(Exception err)
448             { LOG.error("ERR: caught unexpected exception", err); }
449       }
450    
451       /**
452        * invoked when an exception has occurred during a persistent search
453        */
454       public void namingExceptionThrown(NamingExceptionEvent event)
455          { LOG.error("ERR: caught unexpected exception", event.getException()); }
456    }