001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.net.URI; 020import java.util.Hashtable; 021import java.util.Map; 022import java.util.Random; 023import java.util.concurrent.ConcurrentHashMap; 024 025import javax.naming.CommunicationException; 026import javax.naming.Context; 027import javax.naming.NamingEnumeration; 028import javax.naming.directory.Attributes; 029import javax.naming.directory.DirContext; 030import javax.naming.directory.InitialDirContext; 031import javax.naming.directory.SearchControls; 032import javax.naming.directory.SearchResult; 033import javax.naming.event.EventDirContext; 034import javax.naming.event.NamespaceChangeListener; 035import javax.naming.event.NamingEvent; 036import javax.naming.event.NamingExceptionEvent; 037import javax.naming.event.ObjectChangeListener; 038 039import org.apache.activemq.util.URISupport; 040import org.apache.activemq.util.URISupport.CompositeData; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * class to create dynamic network connectors listed in an directory 046 * server using the LDAP v3 protocol as defined in RFC 2251, the 047 * entries listed in the directory server must implement the ipHost 048 * and ipService objectClasses as defined in RFC 2307. 049 * 050 * @author Trevor Pounds 051 * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a> 052 * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a> 053 * 054 * @org.apache.xbean.XBean element="ldapNetworkConnector" 055 */ 056public class LdapNetworkConnector 057 extends NetworkConnector 058 implements NamespaceChangeListener, 059 ObjectChangeListener 060{ 061 private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class); 062 063 // force returned entries to implement the ipHost and ipService object classes (RFC 2307) 064 private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))"; 065 066 // connection 067 private URI[] availableURIs = null; 068 private int availableURIsIndex = 0; 069 private String base = null; 070 private boolean failover = false; 071 private long curReconnectDelay = 1000; /* 1 sec */ 072 private long maxReconnectDelay = 30000; /* 30 sec */ 073 074 // authentication 075 private String user = null; 076 private String password = null; 077 private boolean anonymousAuthentication = false; 078 079 // search 080 private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */); 081 private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER; 082 private boolean searchEventListener = false; 083 084 // connector management 085 private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap(); 086 private Map<URI, Integer> referenceMap = new ConcurrentHashMap(); 087 private Map<String, URI> uuidMap = new ConcurrentHashMap(); 088 089 // local context 090 private DirContext context = null; 091 //currently in use URI 092 private URI ldapURI = null; 093 094 /** 095 * returns the next URI from the configured list 096 * 097 * @return random URI from the configured list 098 */ 099 public URI getUri() 100 { return availableURIs[++availableURIsIndex % availableURIs.length]; } 101 102 /** 103 * sets the LDAP server URI 104 * 105 * @param _uri LDAP server URI 106 */ 107 public void setUri(URI _uri) 108 throws Exception 109 { 110 CompositeData data = URISupport.parseComposite(_uri); 111 if(data.getScheme().equals("failover")) 112 { 113 availableURIs = data.getComponents(); 114 failover = true; 115 } 116 else 117 { availableURIs = new URI[]{ _uri }; } 118 } 119 120 /** 121 * sets the base LDAP dn used for lookup operations 122 * 123 * @param _base LDAP base dn 124 */ 125 public void setBase(String _base) 126 { base = _base; } 127 128 /** 129 * sets the LDAP user for access credentials 130 * 131 * @param _user LDAP dn of user 132 */ 133 public void setUser(String _user) 134 { user = _user; } 135 136 /** 137 * sets the LDAP password for access credentials 138 * 139 * @param _password user password 140 */ 141 public void setPassword(String _password) 142 { password = _password; } 143 144 /** 145 * sets LDAP anonymous authentication access credentials 146 * 147 * @param _anonymousAuthentication set to true to use anonymous authentication 148 */ 149 public void setAnonymousAuthentication(boolean _anonymousAuthentication) 150 { anonymousAuthentication = _anonymousAuthentication; } 151 152 /** 153 * sets the LDAP search scope 154 * 155 * @param _searchScope LDAP JNDI search scope 156 */ 157 public void setSearchScope(String _searchScope) 158 throws Exception 159 { 160 int scope; 161 if(_searchScope.equals("OBJECT_SCOPE")) 162 { scope = SearchControls.OBJECT_SCOPE; } 163 else if(_searchScope.equals("ONELEVEL_SCOPE")) 164 { scope = SearchControls.ONELEVEL_SCOPE; } 165 else if(_searchScope.equals("SUBTREE_SCOPE")) 166 { scope = SearchControls.SUBTREE_SCOPE; } 167 else 168 { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); } 169 searchControls.setSearchScope(scope); 170 } 171 172 /** 173 * sets the LDAP search filter as defined in RFC 2254 174 * 175 * @param _searchFilter LDAP search filter 176 * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a> 177 */ 178 public void setSearchFilter(String _searchFilter) 179 { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; } 180 181 /** 182 * enables/disable a persistent search to the LDAP server as defined 183 * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3) 184 * 185 * @param _searchEventListener enable = true, disable = false (default) 186 * @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a> 187 */ 188 public void setSearchEventListener(boolean _searchEventListener) 189 { searchEventListener = _searchEventListener; } 190 191 /** 192 * start the connector 193 */ 194 public void start() 195 throws Exception 196 { 197 LOG.info("connecting..."); 198 Hashtable<String, String> env = new Hashtable(); 199 env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); 200 this.ldapURI = getUri(); 201 LOG.debug(" URI [" + this.ldapURI + "]"); 202 env.put(Context.PROVIDER_URL, this.ldapURI.toString()); 203 if(anonymousAuthentication) 204 { 205 LOG.debug(" login credentials [anonymous]"); 206 env.put(Context.SECURITY_AUTHENTICATION, "none"); 207 } 208 else 209 { 210 LOG.debug(" login credentials [" + user + ":******]"); 211 env.put(Context.SECURITY_PRINCIPAL, user); 212 env.put(Context.SECURITY_CREDENTIALS, password); 213 } 214 boolean isConnected = false; 215 while(!isConnected) 216 { 217 try 218 { 219 context = new InitialDirContext(env); 220 isConnected = true; 221 } 222 catch(CommunicationException err) 223 { 224 if(failover) 225 { 226 this.ldapURI = getUri(); 227 LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]"); 228 env.put(Context.PROVIDER_URL, this.ldapURI.toString()); 229 Thread.sleep(curReconnectDelay); 230 curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay); 231 } 232 else 233 { throw err; } 234 } 235 } 236 237 // add connectors from search results 238 LOG.info("searching for network connectors..."); 239 LOG.debug(" base [" + base + "]"); 240 LOG.debug(" filter [" + searchFilter + "]"); 241 LOG.debug(" scope [" + searchControls.getSearchScope() + "]"); 242 NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls); 243 while(results.hasMore()) 244 { addConnector(results.next()); } 245 246 // register persistent search event listener 247 if(searchEventListener) 248 { 249 LOG.info("registering persistent search listener..."); 250 EventDirContext eventContext = (EventDirContext)context.lookup(""); 251 eventContext.addNamingListener(base, searchFilter, searchControls, this); 252 } 253 else // otherwise close context (i.e. connection as it is no longer needed) 254 { context.close(); } 255 } 256 257 /** 258 * stop the connector 259 */ 260 public void stop() 261 throws Exception 262 { 263 LOG.info("stopping context..."); 264 for(NetworkConnector connector : connectorMap.values()) 265 { connector.stop(); } 266 connectorMap.clear(); 267 referenceMap.clear(); 268 uuidMap.clear(); 269 context.close(); 270 } 271 272 public String toString() { 273 return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]"; 274 } 275 276 /** 277 * add connector of the given URI 278 * 279 * @param result 280 * search result of connector to add 281 */ 282 protected synchronized void addConnector(SearchResult result) 283 throws Exception 284 { 285 String uuid = toUUID(result); 286 if(uuidMap.containsKey(uuid)) 287 { 288 LOG.warn("connector already regsitered for UUID [" + uuid + "]"); 289 return; 290 } 291 292 URI connectorURI = toURI(result); 293 if(connectorMap.containsKey(connectorURI)) 294 { 295 int referenceCount = referenceMap.get(connectorURI) + 1; 296 LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]"); 297 referenceMap.put(connectorURI, referenceCount); 298 uuidMap.put(uuid, connectorURI); 299 return; 300 } 301 302 // FIXME: disable JMX listing of LDAP managed connectors, we will 303 // want to map/manage these differently in the future 304// boolean useJMX = getBrokerService().isUseJmx(); 305// getBrokerService().setUseJmx(false); 306 NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI); 307// getBrokerService().setUseJmx(useJMX); 308 309 // propogate std connector properties that may have been set via XML 310 connector.setDynamicOnly(isDynamicOnly()); 311 connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); 312 connector.setNetworkTTL(getNetworkTTL()); 313 connector.setConduitSubscriptions(isConduitSubscriptions()); 314 connector.setExcludedDestinations(getExcludedDestinations()); 315 connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations()); 316 connector.setDuplex(isDuplex()); 317 318 // XXX: set in the BrokerService.startAllConnectors method and is 319 // required to prevent remote broker exceptions upon connection 320 connector.setLocalUri(getBrokerService().getVmConnectorURI()); 321 connector.setBrokerName(getBrokerService().getBrokerName()); 322 connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations()); 323 324 // start network connector 325 connectorMap.put(connectorURI, connector); 326 referenceMap.put(connectorURI, 1); 327 uuidMap.put(uuid, connectorURI); 328 connector.start(); 329 LOG.info("connector added with URI [" + connectorURI + "]"); 330 } 331 332 /** 333 * remove connector of the given URI 334 * 335 * @param result search result of connector to remove 336 */ 337 protected synchronized void removeConnector(SearchResult result) 338 throws Exception 339 { 340 String uuid = toUUID(result); 341 if(!uuidMap.containsKey(uuid)) 342 { 343 LOG.warn("connector not regsitered for UUID [" + uuid + "]"); 344 return; 345 } 346 347 URI connectorURI = uuidMap.get(uuid); 348 if(!connectorMap.containsKey(connectorURI)) 349 { 350 LOG.warn("connector not regisitered for URI [" + connectorURI + "]"); 351 return; 352 } 353 354 int referenceCount = referenceMap.get(connectorURI) - 1; 355 referenceMap.put(connectorURI, referenceCount); 356 uuidMap.remove(uuid); 357 LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]"); 358 359 if(referenceCount > 0) 360 { return; } 361 362 NetworkConnector connector = connectorMap.remove(connectorURI); 363 connector.stop(); 364 LOG.info("connector removed with URI [" + connectorURI + "]"); 365 } 366 367 /** 368 * convert search result into URI 369 * 370 * @param result search result to convert to URI 371 */ 372 protected URI toURI(SearchResult result) 373 throws Exception 374 { 375 Attributes attributes = result.getAttributes(); 376 String address = (String)attributes.get("iphostnumber").get(); 377 String port = (String)attributes.get("ipserviceport").get(); 378 String protocol = (String)attributes.get("ipserviceprotocol").get(); 379 URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")"); 380 LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]"); 381 return connectorURI; 382 } 383 384 /** 385 * convert search result into URI 386 * 387 * @param result search result to convert to URI 388 */ 389 protected String toUUID(SearchResult result) 390 { 391 String uuid = result.getNameInNamespace(); 392 LOG.debug("retrieved UUID from SearchResult [" + uuid + "]"); 393 return uuid; 394 } 395 396 /** 397 * invoked when an entry has been added during a persistent search 398 */ 399 public void objectAdded(NamingEvent event) 400 { 401 LOG.debug("entry added"); 402 try 403 { addConnector((SearchResult)event.getNewBinding()); } 404 catch(Exception err) 405 { LOG.error("ERR: caught unexpected exception", err); } 406 } 407 408 /** 409 * invoked when an entry has been removed during a persistent search 410 */ 411 public void objectRemoved(NamingEvent event) 412 { 413 LOG.debug("entry removed"); 414 try 415 { removeConnector((SearchResult)event.getOldBinding()); } 416 catch(Exception err) 417 { LOG.error("ERR: caught unexpected exception", err); } 418 } 419 420 /** 421 * invoked when an entry has been renamed during a persistent search 422 */ 423 public void objectRenamed(NamingEvent event) 424 { 425 LOG.debug("entry renamed"); 426 // XXX: getNameInNamespace method does not seem to work properly, 427 // but getName seems to provide the result we want 428 String uuidOld = event.getOldBinding().getName(); 429 String uuidNew = event.getNewBinding().getName(); 430 URI connectorURI = uuidMap.remove(uuidOld); 431 uuidMap.put(uuidNew, connectorURI); 432 LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]"); 433 } 434 435 /** 436 * invoked when an entry has been changed during a persistent search 437 */ 438 public void objectChanged(NamingEvent event) 439 { 440 LOG.debug("entry changed"); 441 try 442 { 443 SearchResult result = (SearchResult)event.getNewBinding(); 444 removeConnector(result); 445 addConnector(result); 446 } 447 catch(Exception err) 448 { LOG.error("ERR: caught unexpected exception", err); } 449 } 450 451 /** 452 * invoked when an exception has occurred during a persistent search 453 */ 454 public void namingExceptionThrown(NamingExceptionEvent event) 455 { LOG.error("ERR: caught unexpected exception", event.getException()); } 456}