001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.StringTokenizer; 025 import java.util.concurrent.CopyOnWriteArrayList; 026 import java.util.regex.Pattern; 027 028 import javax.management.ObjectName; 029 import org.apache.activemq.broker.jmx.ManagedTransportConnector; 030 import org.apache.activemq.broker.jmx.ManagementContext; 031 import org.apache.activemq.broker.region.ConnectorStatistics; 032 import org.apache.activemq.command.BrokerInfo; 033 import org.apache.activemq.command.ConnectionControl; 034 import org.apache.activemq.security.MessageAuthorizationPolicy; 035 import org.apache.activemq.thread.DefaultThreadPools; 036 import org.apache.activemq.thread.TaskRunnerFactory; 037 import org.apache.activemq.transport.Transport; 038 import org.apache.activemq.transport.TransportAcceptListener; 039 import org.apache.activemq.transport.TransportFactory; 040 import org.apache.activemq.transport.TransportServer; 041 import org.apache.activemq.transport.discovery.DiscoveryAgent; 042 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 043 import org.apache.activemq.util.ServiceStopper; 044 import org.apache.activemq.util.ServiceSupport; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 /** 049 * @org.apache.xbean.XBean 050 * 051 */ 052 public class TransportConnector implements Connector, BrokerServiceAware { 053 054 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class); 055 056 protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); 057 protected TransportStatusDetector statusDector; 058 private BrokerService brokerService; 059 private TransportServer server; 060 private URI uri; 061 private BrokerInfo brokerInfo = new BrokerInfo(); 062 private TaskRunnerFactory taskRunnerFactory; 063 private MessageAuthorizationPolicy messageAuthorizationPolicy; 064 private DiscoveryAgent discoveryAgent; 065 private final ConnectorStatistics statistics = new ConnectorStatistics(); 066 private URI discoveryUri; 067 private URI connectUri; 068 private String name; 069 private boolean disableAsyncDispatch; 070 private boolean enableStatusMonitor = false; 071 private Broker broker; 072 private boolean updateClusterClients = false; 073 private boolean rebalanceClusterClients; 074 private boolean updateClusterClientsOnRemove = false; 075 private String updateClusterFilter; 076 private boolean auditNetworkProducers = false; 077 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE; 078 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE; 079 080 LinkedList<String> peerBrokers = new LinkedList<String>(); 081 082 public TransportConnector() { 083 } 084 085 public TransportConnector(TransportServer server) { 086 this(); 087 setServer(server); 088 if (server != null && server.getConnectURI() != null) { 089 URI uri = server.getConnectURI(); 090 if (uri != null && uri.getScheme().equals("vm")) { 091 setEnableStatusMonitor(false); 092 } 093 } 094 095 } 096 097 /** 098 * @return Returns the connections. 099 */ 100 public CopyOnWriteArrayList<TransportConnection> getConnections() { 101 return connections; 102 } 103 104 /** 105 * Factory method to create a JMX managed version of this transport 106 * connector 107 */ 108 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) 109 throws IOException, URISyntaxException { 110 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); 111 rc.setBrokerInfo(getBrokerInfo()); 112 rc.setConnectUri(getConnectUri()); 113 rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); 114 rc.setDiscoveryAgent(getDiscoveryAgent()); 115 rc.setDiscoveryUri(getDiscoveryUri()); 116 rc.setEnableStatusMonitor(isEnableStatusMonitor()); 117 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 118 rc.setName(getName()); 119 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 120 rc.setUri(getUri()); 121 rc.setBrokerService(brokerService); 122 rc.setUpdateClusterClients(isUpdateClusterClients()); 123 rc.setRebalanceClusterClients(isRebalanceClusterClients()); 124 rc.setUpdateClusterFilter(getUpdateClusterFilter()); 125 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove()); 126 rc.setAuditNetworkProducers(isAuditNetworkProducers()); 127 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection()); 128 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection()); 129 return rc; 130 } 131 132 public BrokerInfo getBrokerInfo() { 133 return brokerInfo; 134 } 135 136 public void setBrokerInfo(BrokerInfo brokerInfo) { 137 this.brokerInfo = brokerInfo; 138 } 139 140 /** 141 * 142 * @deprecated use the {@link #setBrokerService(BrokerService)} method 143 * instead. 144 */ 145 @Deprecated 146 public void setBrokerName(String name) { 147 if (this.brokerInfo == null) { 148 this.brokerInfo = new BrokerInfo(); 149 } 150 this.brokerInfo.setBrokerName(name); 151 } 152 153 public TransportServer getServer() throws IOException, URISyntaxException { 154 if (server == null) { 155 setServer(createTransportServer()); 156 } 157 return server; 158 } 159 160 public void setServer(TransportServer server) { 161 this.server = server; 162 } 163 164 public URI getUri() { 165 if (uri == null) { 166 try { 167 uri = getConnectUri(); 168 } catch (Throwable e) { 169 } 170 } 171 return uri; 172 } 173 174 /** 175 * Sets the server transport URI to use if there is not a 176 * {@link TransportServer} configured via the 177 * {@link #setServer(TransportServer)} method. This value is used to lazy 178 * create a {@link TransportServer} instance 179 * 180 * @param uri 181 */ 182 public void setUri(URI uri) { 183 this.uri = uri; 184 } 185 186 public TaskRunnerFactory getTaskRunnerFactory() { 187 return taskRunnerFactory; 188 } 189 190 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 191 this.taskRunnerFactory = taskRunnerFactory; 192 } 193 194 /** 195 * @return the statistics for this connector 196 */ 197 public ConnectorStatistics getStatistics() { 198 return statistics; 199 } 200 201 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 202 return messageAuthorizationPolicy; 203 } 204 205 /** 206 * Sets the policy used to decide if the current connection is authorized to 207 * consume a given message 208 */ 209 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 210 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 211 } 212 213 public void start() throws Exception { 214 broker = brokerService.getBroker(); 215 brokerInfo.setBrokerName(broker.getBrokerName()); 216 brokerInfo.setBrokerId(broker.getBrokerId()); 217 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 218 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); 219 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); 220 getServer().setAcceptListener(new TransportAcceptListener() { 221 public void onAccept(final Transport transport) { 222 try { 223 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 224 public void run() { 225 try { 226 Connection connection = createConnection(transport); 227 connection.start(); 228 } catch (Exception e) { 229 String remoteHost = transport.getRemoteAddress(); 230 ServiceSupport.dispose(transport); 231 onAcceptError(e, remoteHost); 232 } 233 } 234 }); 235 } catch (Exception e) { 236 String remoteHost = transport.getRemoteAddress(); 237 ServiceSupport.dispose(transport); 238 onAcceptError(e, remoteHost); 239 } 240 } 241 242 public void onAcceptError(Exception error) { 243 onAcceptError(error, null); 244 } 245 246 private void onAcceptError(Exception error, String remoteHost) { 247 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " 248 + error); 249 LOG.debug("Reason: " + error, error); 250 } 251 }); 252 getServer().setBrokerInfo(brokerInfo); 253 getServer().start(); 254 255 DiscoveryAgent da = getDiscoveryAgent(); 256 if (da != null) { 257 da.registerService(getPublishableConnectString()); 258 da.start(); 259 } 260 if (enableStatusMonitor) { 261 this.statusDector = new TransportStatusDetector(this); 262 this.statusDector.start(); 263 } 264 265 LOG.info("Connector " + getName() + " Started"); 266 } 267 268 public String getPublishableConnectString() throws Exception { 269 return getPublishableConnectString(getConnectUri()); 270 } 271 272 public String getPublishableConnectString(URI theConnectURI) throws Exception { 273 String publishableConnectString = null; 274 if (theConnectURI != null) { 275 publishableConnectString = theConnectURI.toString(); 276 // strip off server side query parameters which may not be compatible to 277 // clients 278 if (theConnectURI.getRawQuery() != null) { 279 publishableConnectString = publishableConnectString.substring(0, publishableConnectString 280 .indexOf(theConnectURI.getRawQuery()) - 1); 281 } 282 } 283 if (LOG.isDebugEnabled()) { 284 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI); 285 } 286 return publishableConnectString; 287 } 288 289 public void stop() throws Exception { 290 ServiceStopper ss = new ServiceStopper(); 291 if (discoveryAgent != null) { 292 ss.stop(discoveryAgent); 293 } 294 if (server != null) { 295 ss.stop(server); 296 } 297 if (this.statusDector != null) { 298 this.statusDector.stop(); 299 } 300 301 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) { 302 TransportConnection c = iter.next(); 303 ss.stop(c); 304 } 305 server = null; 306 ss.throwFirstException(); 307 LOG.info("Connector " + getName() + " Stopped"); 308 } 309 310 // Implementation methods 311 // ------------------------------------------------------------------------- 312 protected Connection createConnection(Transport transport) throws IOException { 313 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null 314 : taskRunnerFactory); 315 boolean statEnabled = this.getStatistics().isEnabled(); 316 answer.getStatistics().setEnabled(statEnabled); 317 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 318 return answer; 319 } 320 321 protected TransportServer createTransportServer() throws IOException, URISyntaxException { 322 if (uri == null) { 323 throw new IllegalArgumentException("You must specify either a server or uri property"); 324 } 325 if (brokerService == null) { 326 throw new IllegalArgumentException( 327 "You must specify the brokerService property. Maybe this connector should be added to a broker?"); 328 } 329 return TransportFactory.bind(brokerService, uri); 330 } 331 332 public DiscoveryAgent getDiscoveryAgent() throws IOException { 333 if (discoveryAgent == null) { 334 discoveryAgent = createDiscoveryAgent(); 335 } 336 return discoveryAgent; 337 } 338 339 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 340 if (discoveryUri != null) { 341 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 342 343 if( agent!=null && agent instanceof BrokerServiceAware ) { 344 ((BrokerServiceAware)agent).setBrokerService(brokerService); 345 } 346 347 return agent; 348 } 349 return null; 350 } 351 352 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 353 this.discoveryAgent = discoveryAgent; 354 } 355 356 public URI getDiscoveryUri() { 357 return discoveryUri; 358 } 359 360 public void setDiscoveryUri(URI discoveryUri) { 361 this.discoveryUri = discoveryUri; 362 } 363 364 public URI getConnectUri() throws IOException, URISyntaxException { 365 if (connectUri == null) { 366 if (server != null) { 367 connectUri = server.getConnectURI(); 368 } 369 } 370 return connectUri; 371 } 372 373 public void setConnectUri(URI transportUri) { 374 this.connectUri = transportUri; 375 } 376 377 public void onStarted(TransportConnection connection) { 378 connections.add(connection); 379 } 380 381 public void onStopped(TransportConnection connection) { 382 connections.remove(connection); 383 } 384 385 public String getName() { 386 if (name == null) { 387 uri = getUri(); 388 if (uri != null) { 389 name = uri.toString(); 390 } 391 } 392 return name; 393 } 394 395 public void setName(String name) { 396 this.name = name; 397 } 398 399 @Override 400 public String toString() { 401 String rc = getName(); 402 if (rc == null) { 403 rc = super.toString(); 404 } 405 return rc; 406 } 407 408 protected ConnectionControl getConnectionControl() { 409 boolean rebalance = isRebalanceClusterClients(); 410 String connectedBrokers = ""; 411 String separator = ""; 412 413 if (isUpdateClusterClients()) { 414 synchronized (peerBrokers) { 415 for (String uri : getPeerBrokers()) { 416 connectedBrokers += separator + uri; 417 separator = ","; 418 } 419 420 if (rebalance) { 421 String shuffle = getPeerBrokers().removeFirst(); 422 getPeerBrokers().addLast(shuffle); 423 } 424 } 425 } 426 ConnectionControl control = new ConnectionControl(); 427 control.setConnectedBrokers(connectedBrokers); 428 control.setRebalanceConnection(rebalance); 429 return control; 430 431 } 432 433 public void addPeerBroker(BrokerInfo info) { 434 if (isMatchesClusterFilter(info.getBrokerName())) { 435 synchronized (peerBrokers) { 436 getPeerBrokers().addLast(info.getBrokerURL()); 437 } 438 } 439 } 440 441 public void removePeerBroker(BrokerInfo info) { 442 synchronized (peerBrokers) { 443 getPeerBrokers().remove(info.getBrokerURL()); 444 } 445 } 446 447 public LinkedList<String> getPeerBrokers() { 448 synchronized (peerBrokers) { 449 if (peerBrokers.isEmpty()) { 450 peerBrokers.add(brokerService.getDefaultSocketURIString()); 451 } 452 return peerBrokers; 453 } 454 } 455 456 public void updateClientClusterInfo() { 457 458 if (isRebalanceClusterClients() || isUpdateClusterClients()) { 459 ConnectionControl control = getConnectionControl(); 460 for (Connection c : this.connections) { 461 c.updateClient(control); 462 if (isRebalanceClusterClients()) { 463 control = getConnectionControl(); 464 } 465 } 466 } 467 } 468 469 private boolean isMatchesClusterFilter(String brokerName) { 470 boolean result = true; 471 String filter = getUpdateClusterFilter(); 472 if (filter != null) { 473 filter = filter.trim(); 474 if (filter.length() > 0) { 475 StringTokenizer tokenizer = new StringTokenizer(filter, ","); 476 while (result && tokenizer.hasMoreTokens()) { 477 String token = tokenizer.nextToken(); 478 result = isMatchesClusterFilter(brokerName, token); 479 } 480 } 481 } 482 return result; 483 } 484 485 private boolean isMatchesClusterFilter(String brokerName, String match) { 486 boolean result = true; 487 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) { 488 result = Pattern.matches(match, brokerName); 489 } 490 return result; 491 } 492 493 public boolean isDisableAsyncDispatch() { 494 return disableAsyncDispatch; 495 } 496 497 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 498 this.disableAsyncDispatch = disableAsyncDispatch; 499 } 500 501 /** 502 * @return the enableStatusMonitor 503 */ 504 public boolean isEnableStatusMonitor() { 505 return enableStatusMonitor; 506 } 507 508 /** 509 * @param enableStatusMonitor 510 * the enableStatusMonitor to set 511 */ 512 public void setEnableStatusMonitor(boolean enableStatusMonitor) { 513 this.enableStatusMonitor = enableStatusMonitor; 514 } 515 516 /** 517 * This is called by the BrokerService right before it starts the transport. 518 */ 519 public void setBrokerService(BrokerService brokerService) { 520 this.brokerService = brokerService; 521 } 522 523 public Broker getBroker() { 524 return broker; 525 } 526 527 public BrokerService getBrokerService() { 528 return brokerService; 529 } 530 531 /** 532 * @return the updateClusterClients 533 */ 534 public boolean isUpdateClusterClients() { 535 return this.updateClusterClients; 536 } 537 538 /** 539 * @param updateClusterClients 540 * the updateClusterClients to set 541 */ 542 public void setUpdateClusterClients(boolean updateClusterClients) { 543 this.updateClusterClients = updateClusterClients; 544 } 545 546 /** 547 * @return the rebalanceClusterClients 548 */ 549 public boolean isRebalanceClusterClients() { 550 return this.rebalanceClusterClients; 551 } 552 553 /** 554 * @param rebalanceClusterClients 555 * the rebalanceClusterClients to set 556 */ 557 public void setRebalanceClusterClients(boolean rebalanceClusterClients) { 558 this.rebalanceClusterClients = rebalanceClusterClients; 559 } 560 561 /** 562 * @return the updateClusterClientsOnRemove 563 */ 564 public boolean isUpdateClusterClientsOnRemove() { 565 return this.updateClusterClientsOnRemove; 566 } 567 568 /** 569 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set 570 */ 571 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { 572 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; 573 } 574 575 /** 576 * @return the updateClusterFilter 577 */ 578 public String getUpdateClusterFilter() { 579 return this.updateClusterFilter; 580 } 581 582 /** 583 * @param updateClusterFilter 584 * the updateClusterFilter to set 585 */ 586 public void setUpdateClusterFilter(String updateClusterFilter) { 587 this.updateClusterFilter = updateClusterFilter; 588 } 589 590 public int connectionCount() { 591 return connections.size(); 592 } 593 594 public boolean isAuditNetworkProducers() { 595 return auditNetworkProducers; 596 } 597 598 /** 599 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend. 600 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics 601 * @param auditNetworkProducers 602 */ 603 public void setAuditNetworkProducers(boolean auditNetworkProducers) { 604 this.auditNetworkProducers = auditNetworkProducers; 605 } 606 607 public int getMaximumProducersAllowedPerConnection() { 608 return maximumProducersAllowedPerConnection; 609 } 610 611 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) { 612 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection; 613 } 614 615 public int getMaximumConsumersAllowedPerConnection() { 616 return maximumConsumersAllowedPerConnection; 617 } 618 619 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) { 620 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection; 621 } 622 623 }