001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.List; 025import java.util.Properties; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicLong; 031 032import javax.management.ObjectName; 033import org.apache.activemq.Service; 034import org.apache.activemq.advisory.AdvisorySupport; 035import org.apache.activemq.broker.BrokerService; 036import org.apache.activemq.broker.BrokerServiceAware; 037import org.apache.activemq.broker.TransportConnection; 038import org.apache.activemq.broker.region.AbstractRegion; 039import org.apache.activemq.broker.region.DurableTopicSubscription; 040import org.apache.activemq.broker.region.Region; 041import org.apache.activemq.broker.region.RegionBroker; 042import org.apache.activemq.broker.region.Subscription; 043import org.apache.activemq.broker.region.policy.PolicyEntry; 044import org.apache.activemq.command.*; 045import org.apache.activemq.filter.DestinationFilter; 046import org.apache.activemq.filter.MessageEvaluationContext; 047import org.apache.activemq.thread.DefaultThreadPools; 048import org.apache.activemq.thread.TaskRunnerFactory; 049import org.apache.activemq.transport.DefaultTransportListener; 050import org.apache.activemq.transport.FutureResponse; 051import org.apache.activemq.transport.ResponseCallback; 052import org.apache.activemq.transport.Transport; 053import org.apache.activemq.transport.TransportDisposedIOException; 054import org.apache.activemq.transport.TransportFilter; 055import org.apache.activemq.transport.tcp.SslTransport; 056import org.apache.activemq.util.IdGenerator; 057import org.apache.activemq.util.IntrospectionSupport; 058import org.apache.activemq.util.LongSequenceGenerator; 059import org.apache.activemq.util.MarshallingSupport; 060import org.apache.activemq.util.ServiceStopper; 061import org.apache.activemq.util.ServiceSupport; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * A useful base class for implementing demand forwarding bridges. 067 */ 068public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 069 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 070 private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory(); 071 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 072 protected final Transport localBroker; 073 protected final Transport remoteBroker; 074 protected final IdGenerator idGenerator = new IdGenerator(); 075 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 076 protected ConnectionInfo localConnectionInfo; 077 protected ConnectionInfo remoteConnectionInfo; 078 protected SessionInfo localSessionInfo; 079 protected ProducerInfo producerInfo; 080 protected String remoteBrokerName = "Unknown"; 081 protected String localClientId; 082 protected ConsumerInfo demandConsumerInfo; 083 protected int demandConsumerDispatched; 084 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 085 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 086 protected AtomicBoolean disposed = new AtomicBoolean(); 087 protected BrokerId localBrokerId; 088 protected ActiveMQDestination[] excludedDestinations; 089 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 090 protected ActiveMQDestination[] staticallyIncludedDestinations; 091 protected ActiveMQDestination[] durableDestinations; 092 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 093 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 094 protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; 095 protected CountDownLatch startedLatch = new CountDownLatch(2); 096 protected CountDownLatch localStartedLatch = new CountDownLatch(1); 097 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 098 protected NetworkBridgeConfiguration configuration; 099 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 100 101 protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null}; 102 protected Object brokerInfoMutex = new Object(); 103 protected BrokerId remoteBrokerId; 104 105 final AtomicLong enqueueCounter = new AtomicLong(); 106 final AtomicLong dequeueCounter = new AtomicLong(); 107 108 private NetworkBridgeListener networkBridgeListener; 109 private boolean createdByDuplex; 110 private BrokerInfo localBrokerInfo; 111 private BrokerInfo remoteBrokerInfo; 112 113 private final AtomicBoolean started = new AtomicBoolean(); 114 private TransportConnection duplexInitiatingConnection; 115 private BrokerService brokerService = null; 116 private ObjectName mbeanObjectName; 117 118 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 119 this.configuration = configuration; 120 this.localBroker = localBroker; 121 this.remoteBroker = remoteBroker; 122 } 123 124 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 125 this.localBrokerInfo = localBrokerInfo; 126 this.remoteBrokerInfo = remoteBrokerInfo; 127 this.duplexInitiatingConnection = connection; 128 start(); 129 serviceRemoteCommand(remoteBrokerInfo); 130 } 131 132 public void start() throws Exception { 133 if (started.compareAndSet(false, true)) { 134 localBroker.setTransportListener(new DefaultTransportListener() { 135 136 @Override 137 public void onCommand(Object o) { 138 Command command = (Command) o; 139 serviceLocalCommand(command); 140 } 141 142 @Override 143 public void onException(IOException error) { 144 serviceLocalException(error); 145 } 146 }); 147 remoteBroker.setTransportListener(new DefaultTransportListener() { 148 149 public void onCommand(Object o) { 150 Command command = (Command) o; 151 serviceRemoteCommand(command); 152 } 153 154 public void onException(IOException error) { 155 serviceRemoteException(error); 156 } 157 158 }); 159 160 localBroker.start(); 161 remoteBroker.start(); 162 if (!disposed.get()) { 163 try { 164 triggerRemoteStartBridge(); 165 } catch (IOException e) { 166 LOG.warn("Caught exception from remote start", e); 167 } 168 } else { 169 LOG.warn ("Bridge was disposed before the start() method was fully executed."); 170 throw new TransportDisposedIOException(); 171 } 172 } 173 } 174 175 protected void triggerLocalStartBridge() throws IOException { 176 asyncTaskRunner.execute(new Runnable() { 177 public void run() { 178 final String originalName = Thread.currentThread().getName(); 179 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); 180 try { 181 startLocalBridge(); 182 } catch (Throwable e) { 183 serviceLocalException(e); 184 } finally { 185 Thread.currentThread().setName(originalName); 186 } 187 } 188 }); 189 } 190 191 protected void triggerRemoteStartBridge() throws IOException { 192 asyncTaskRunner.execute(new Runnable() { 193 public void run() { 194 final String originalName = Thread.currentThread().getName(); 195 Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker); 196 try { 197 startRemoteBridge(); 198 } catch (Exception e) { 199 serviceRemoteException(e); 200 } finally { 201 Thread.currentThread().setName(originalName); 202 } 203 } 204 }); 205 } 206 207 private void startLocalBridge() throws Throwable { 208 if (localBridgeStarted.compareAndSet(false, true)) { 209 synchronized (this) { 210 if (LOG.isTraceEnabled()) { 211 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker); 212 } 213 if (!disposed.get()) { 214 localConnectionInfo = new ConnectionInfo(); 215 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 216 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 217 localConnectionInfo.setClientId(localClientId); 218 localConnectionInfo.setUserName(configuration.getUserName()); 219 localConnectionInfo.setPassword(configuration.getPassword()); 220 Transport originalTransport = remoteBroker; 221 while (originalTransport instanceof TransportFilter) { 222 originalTransport = ((TransportFilter) originalTransport).getNext(); 223 } 224 if (originalTransport instanceof SslTransport) { 225 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 226 localConnectionInfo.setTransportContext(peerCerts); 227 } 228 // sync requests that may fail 229 Object resp = localBroker.request(localConnectionInfo); 230 if (resp instanceof ExceptionResponse) { 231 throw ((ExceptionResponse)resp).getException(); 232 } 233 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 234 localBroker.oneway(localSessionInfo); 235 236 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 237 NetworkBridgeListener l = this.networkBridgeListener; 238 if (l != null) { 239 l.onStart(this); 240 } 241 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); 242 243 } else { 244 LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed."); 245 } 246 startedLatch.countDown(); 247 localStartedLatch.countDown(); 248 if (!disposed.get()) { 249 setupStaticDestinations(); 250 } else { 251 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment."); 252 } 253 } 254 } 255 } 256 257 protected void startRemoteBridge() throws Exception { 258 if (remoteBridgeStarted.compareAndSet(false, true)) { 259 if (LOG.isTraceEnabled()) { 260 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker); 261 } 262 synchronized (this) { 263 if (!isCreatedByDuplex()) { 264 BrokerInfo brokerInfo = new BrokerInfo(); 265 brokerInfo.setBrokerName(configuration.getBrokerName()); 266 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 267 brokerInfo.setNetworkConnection(true); 268 brokerInfo.setDuplexConnection(configuration.isDuplex()); 269 // set our properties 270 Properties props = new Properties(); 271 IntrospectionSupport.getProperties(configuration, props, null); 272 String str = MarshallingSupport.propertiesToString(props); 273 brokerInfo.setNetworkProperties(str); 274 brokerInfo.setBrokerId(this.localBrokerId); 275 remoteBroker.oneway(brokerInfo); 276 } 277 if (remoteConnectionInfo != null) { 278 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 279 } 280 remoteConnectionInfo = new ConnectionInfo(); 281 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 282 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 283 remoteConnectionInfo.setUserName(configuration.getUserName()); 284 remoteConnectionInfo.setPassword(configuration.getPassword()); 285 remoteBroker.oneway(remoteConnectionInfo); 286 287 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 288 remoteBroker.oneway(remoteSessionInfo); 289 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 290 producerInfo.setResponseRequired(false); 291 remoteBroker.oneway(producerInfo); 292 // Listen to consumer advisory messages on the remote broker to 293 // determine demand. 294 if (!configuration.isStaticBridge()) { 295 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 296 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync()); 297 String advisoryTopic = configuration.getDestinationFilter(); 298 if (configuration.isBridgeTempDestinations()) { 299 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 300 } 301 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 302 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 303 remoteBroker.oneway(demandConsumerInfo); 304 } 305 startedLatch.countDown(); 306 } 307 } 308 } 309 310 public void stop() throws Exception { 311 if (started.compareAndSet(true, false)) { 312 if (disposed.compareAndSet(false, true)) { 313 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); 314 NetworkBridgeListener l = this.networkBridgeListener; 315 if (l != null) { 316 l.onStop(this); 317 } 318 try { 319 remoteBridgeStarted.set(false); 320 final CountDownLatch sendShutdown = new CountDownLatch(1); 321 asyncTaskRunner.execute(new Runnable() { 322 public void run() { 323 try { 324 localBroker.oneway(new ShutdownInfo()); 325 sendShutdown.countDown(); 326 remoteBroker.oneway(new ShutdownInfo()); 327 } catch (Throwable e) { 328 LOG.debug("Caught exception sending shutdown", e); 329 } finally { 330 sendShutdown.countDown(); 331 } 332 333 } 334 }); 335 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 336 LOG.info("Network Could not shutdown in a timely manner"); 337 } 338 } finally { 339 ServiceStopper ss = new ServiceStopper(); 340 ss.stop(remoteBroker); 341 ss.stop(localBroker); 342 // Release the started Latch since another thread could be 343 // stuck waiting for it to start up. 344 startedLatch.countDown(); 345 startedLatch.countDown(); 346 localStartedLatch.countDown(); 347 ss.throwFirstException(); 348 } 349 } 350 if (remoteBrokerInfo != null) { 351 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 352 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 353 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); 354 } 355 } 356 } 357 358 public void serviceRemoteException(Throwable error) { 359 if (!disposed.get()) { 360 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 361 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); 362 } else { 363 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); 364 } 365 LOG.debug("The remote Exception was: " + error, error); 366 asyncTaskRunner.execute(new Runnable() { 367 public void run() { 368 ServiceSupport.dispose(getControllingService()); 369 } 370 }); 371 fireBridgeFailed(); 372 } 373 } 374 375 protected void serviceRemoteCommand(Command command) { 376 if (!disposed.get()) { 377 try { 378 if (command.isMessageDispatch()) { 379 waitStarted(); 380 MessageDispatch md = (MessageDispatch) command; 381 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 382 ackAdvisory(md.getMessage()); 383 } else if (command.isBrokerInfo()) { 384 lastConnectSucceeded.set(true); 385 remoteBrokerInfo = (BrokerInfo) command; 386 Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 387 try { 388 IntrospectionSupport.getProperties(configuration, props, null); 389 if (configuration.getExcludedDestinations() != null) { 390 excludedDestinations = configuration.getExcludedDestinations().toArray( 391 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 392 } 393 if (configuration.getStaticallyIncludedDestinations() != null) { 394 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 395 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 396 } 397 if (configuration.getDynamicallyIncludedDestinations() != null) { 398 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations() 399 .toArray( 400 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations() 401 .size()]); 402 } 403 } catch (Throwable t) { 404 LOG.error("Error mapping remote destinations", t); 405 } 406 serviceRemoteBrokerInfo(command); 407 // Let the local broker know the remote broker's ID. 408 localBroker.oneway(command); 409 // new peer broker (a consumer can work with remote broker also) 410 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 411 } else if (command.getClass() == ConnectionError.class) { 412 ConnectionError ce = (ConnectionError) command; 413 serviceRemoteException(ce.getException()); 414 } else { 415 if (isDuplex()) { 416 if (command.isMessage()) { 417 ActiveMQMessage message = (ActiveMQMessage) command; 418 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) 419 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { 420 serviceRemoteConsumerAdvisory(message.getDataStructure()); 421 ackAdvisory(message); 422 } else { 423 if (!isPermissableDestination(message.getDestination(), true)) { 424 return; 425 } 426 if (message.isResponseRequired()) { 427 Response reply = new Response(); 428 reply.setCorrelationId(message.getCommandId()); 429 localBroker.oneway(message); 430 remoteBroker.oneway(reply); 431 } else { 432 localBroker.oneway(message); 433 } 434 } 435 } else { 436 switch (command.getDataStructureType()) { 437 case ConnectionInfo.DATA_STRUCTURE_TYPE: 438 case SessionInfo.DATA_STRUCTURE_TYPE: 439 case ProducerInfo.DATA_STRUCTURE_TYPE: 440 localBroker.oneway(command); 441 break; 442 case MessageAck.DATA_STRUCTURE_TYPE: 443 MessageAck ack = (MessageAck) command; 444 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 445 if (localSub != null) { 446 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 447 localBroker.oneway(ack); 448 } else { 449 LOG.warn("Matching local subscription not found for ack: " + ack); 450 } 451 break; 452 case ConsumerInfo.DATA_STRUCTURE_TYPE: 453 localStartedLatch.await(); 454 if (started.get()) { 455 if (!addConsumerInfo((ConsumerInfo) command)) { 456 if (LOG.isDebugEnabled()) { 457 LOG.debug("Ignoring ConsumerInfo: " + command); 458 } 459 } else { 460 if (LOG.isTraceEnabled()) { 461 LOG.trace("Adding ConsumerInfo: " + command); 462 } 463 } 464 } else { 465 // received a subscription whilst stopping 466 LOG.warn("Stopping - ignoring ConsumerInfo: " + command); 467 } 468 break; 469 case ShutdownInfo.DATA_STRUCTURE_TYPE: 470 // initiator is shutting down, controlled case 471 // abortive close dealt with by inactivity monitor 472 LOG.info("Stopping network bridge on shutdown of remote broker"); 473 serviceRemoteException(new IOException(command.toString())); 474 break; 475 default: 476 if (LOG.isDebugEnabled()) { 477 LOG.debug("Ignoring remote command: " + command); 478 } 479 } 480 } 481 } else { 482 switch (command.getDataStructureType()) { 483 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 484 case WireFormatInfo.DATA_STRUCTURE_TYPE: 485 case ShutdownInfo.DATA_STRUCTURE_TYPE: 486 break; 487 default: 488 LOG.warn("Unexpected remote command: " + command); 489 } 490 } 491 } 492 } catch (Throwable e) { 493 if (LOG.isDebugEnabled()) { 494 LOG.debug("Exception processing remote command: " + command, e); 495 } 496 serviceRemoteException(e); 497 } 498 } 499 } 500 501 private void ackAdvisory(Message message) throws IOException { 502 demandConsumerDispatched++; 503 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { 504 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 505 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 506 remoteBroker.oneway(ack); 507 demandConsumerDispatched = 0; 508 } 509 } 510 511 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 512 final int networkTTL = configuration.getNetworkTTL(); 513 if (data.getClass() == ConsumerInfo.class) { 514 // Create a new local subscription 515 ConsumerInfo info = (ConsumerInfo) data; 516 BrokerId[] path = info.getBrokerPath(); 517 518 if (info.isBrowser()) { 519 if (LOG.isDebugEnabled()) { 520 LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed"); 521 } 522 return; 523 } 524 525 if (path != null && path.length >= networkTTL) { 526 if (LOG.isDebugEnabled()) { 527 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); 528 } 529 return; 530 } 531 if (contains(path, localBrokerPath[0])) { 532 // Ignore this consumer as it's a consumer we locally sent to the broker. 533 if (LOG.isDebugEnabled()) { 534 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info); 535 } 536 return; 537 } 538 if (!isPermissableDestination(info.getDestination())) { 539 // ignore if not in the permitted or in the excluded list 540 if (LOG.isDebugEnabled()) { 541 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); 542 } 543 return; 544 } 545 546 // in a cyclic network there can be multiple bridges per broker that can propagate 547 // a network subscription so there is a need to synchronise on a shared entity 548 synchronized (brokerService.getVmConnectorURI()) { 549 if (addConsumerInfo(info)) { 550 if (LOG.isDebugEnabled()) { 551 LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info); 552 } 553 } else { 554 if (LOG.isDebugEnabled()) { 555 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); 556 } 557 } 558 } 559 } else if (data.getClass() == DestinationInfo.class) { 560 // It's a destination info - we want to pass up 561 // information about temporary destinations 562 DestinationInfo destInfo = (DestinationInfo) data; 563 BrokerId[] path = destInfo.getBrokerPath(); 564 if (path != null && path.length >= networkTTL) { 565 if (LOG.isDebugEnabled()) { 566 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only"); 567 } 568 return; 569 } 570 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 571 // Ignore this consumer as it's a consumer we locally sent to 572 // the broker. 573 if (LOG.isDebugEnabled()) { 574 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once"); 575 } 576 return; 577 } 578 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 579 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 580 // re-set connection id so comes from here 581 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 582 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 583 } 584 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 585 if (LOG.isTraceEnabled()) { 586 LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo); 587 } 588 localBroker.oneway(destInfo); 589 } else if (data.getClass() == RemoveInfo.class) { 590 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 591 removeDemandSubscription(id); 592 } 593 } 594 595 public void serviceLocalException(Throwable error) { 596 if (!disposed.get()) { 597 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); 598 LOG.debug("The local Exception was:" + error, error); 599 asyncTaskRunner.execute(new Runnable() { 600 public void run() { 601 ServiceSupport.dispose(getControllingService()); 602 } 603 }); 604 fireBridgeFailed(); 605 } 606 } 607 608 protected Service getControllingService() { 609 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 610 } 611 612 protected void addSubscription(DemandSubscription sub) throws IOException { 613 if (sub != null) { 614 localBroker.oneway(sub.getLocalInfo()); 615 } 616 } 617 618 protected void removeSubscription(final DemandSubscription sub) throws IOException { 619 if (sub != null) { 620 if (LOG.isDebugEnabled()) { 621 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); 622 } 623 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 624 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 625 626 // continue removal in separate thread to free up this thread for outstanding responses 627 asyncTaskRunner.execute(new Runnable() { 628 public void run() { 629 sub.waitForCompletion(); 630 try { 631 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 632 } catch (IOException e) { 633 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e); 634 } 635 } 636 }); 637 } 638 } 639 640 protected Message configureMessage(MessageDispatch md) { 641 Message message = md.getMessage().copy(); 642 // Update the packet to show where it came from. 643 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 644 message.setProducerId(producerInfo.getProducerId()); 645 message.setDestination(md.getDestination()); 646 if (message.getOriginalTransactionId() == null) { 647 message.setOriginalTransactionId(message.getTransactionId()); 648 } 649 message.setTransactionId(null); 650 return message; 651 } 652 653 protected void serviceLocalCommand(Command command) { 654 if (!disposed.get()) { 655 try { 656 if (command.isMessageDispatch()) { 657 enqueueCounter.incrementAndGet(); 658 final MessageDispatch md = (MessageDispatch) command; 659 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 660 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 661 662 if (suppressMessageDispatch(md, sub)) { 663 if (LOG.isDebugEnabled()) { 664 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage()); 665 } 666 // still ack as it may be durable 667 try { 668 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 669 } finally { 670 sub.decrementOutstandingResponses(); 671 } 672 return; 673 } 674 675 Message message = configureMessage(md); 676 if (LOG.isDebugEnabled()) { 677 LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); 678 } 679 680 if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) { 681 682 // If the message was originally sent using async 683 // send, we will preserve that QOS 684 // by bridging it using an async send (small chance 685 // of message loss). 686 try { 687 remoteBroker.oneway(message); 688 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 689 dequeueCounter.incrementAndGet(); 690 } finally { 691 sub.decrementOutstandingResponses(); 692 } 693 694 } else { 695 696 // The message was not sent using async send, so we 697 // should only ack the local 698 // broker when we get confirmation that the remote 699 // broker has received the message. 700 ResponseCallback callback = new ResponseCallback() { 701 public void onCompletion(FutureResponse future) { 702 try { 703 Response response = future.getResult(); 704 if (response.isException()) { 705 ExceptionResponse er = (ExceptionResponse) response; 706 serviceLocalException(er.getException()); 707 } else { 708 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 709 dequeueCounter.incrementAndGet(); 710 } 711 } catch (IOException e) { 712 serviceLocalException(e); 713 } finally { 714 sub.decrementOutstandingResponses(); 715 } 716 } 717 }; 718 719 remoteBroker.asyncRequest(message, callback); 720 721 } 722 } else { 723 if (LOG.isDebugEnabled()) { 724 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); 725 } 726 } 727 } else if (command.isBrokerInfo()) { 728 localBrokerInfo = (BrokerInfo) command; 729 serviceLocalBrokerInfo(command); 730 } else if (command.isShutdownInfo()) { 731 LOG.info(configuration.getBrokerName() + " Shutting down"); 732 stop(); 733 } else if (command.getClass() == ConnectionError.class) { 734 ConnectionError ce = (ConnectionError) command; 735 serviceLocalException(ce.getException()); 736 } else { 737 switch (command.getDataStructureType()) { 738 case WireFormatInfo.DATA_STRUCTURE_TYPE: 739 break; 740 default: 741 LOG.warn("Unexpected local command: " + command); 742 } 743 } 744 } catch (Throwable e) { 745 LOG.warn("Caught an exception processing local command", e); 746 serviceLocalException(e); 747 } 748 } 749 } 750 751 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 752 boolean suppress = false; 753 // for durable subs, suppression via filter leaves dangling acks so we need to 754 // check here and allow the ack irrespective 755 if (sub.getLocalInfo().isDurable()) { 756 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 757 messageEvalContext.setMessageReference(md.getMessage()); 758 messageEvalContext.setDestination(md.getDestination()); 759 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 760 } 761 return suppress; 762 } 763 764 /** 765 * @return Returns the dynamicallyIncludedDestinations. 766 */ 767 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 768 return dynamicallyIncludedDestinations; 769 } 770 771 /** 772 * @param dynamicallyIncludedDestinations The 773 * dynamicallyIncludedDestinations to set. 774 */ 775 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 776 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 777 } 778 779 /** 780 * @return Returns the excludedDestinations. 781 */ 782 public ActiveMQDestination[] getExcludedDestinations() { 783 return excludedDestinations; 784 } 785 786 /** 787 * @param excludedDestinations The excludedDestinations to set. 788 */ 789 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 790 this.excludedDestinations = excludedDestinations; 791 } 792 793 /** 794 * @return Returns the staticallyIncludedDestinations. 795 */ 796 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 797 return staticallyIncludedDestinations; 798 } 799 800 /** 801 * @param staticallyIncludedDestinations The staticallyIncludedDestinations 802 * to set. 803 */ 804 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 805 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 806 } 807 808 /** 809 * @return Returns the durableDestinations. 810 */ 811 public ActiveMQDestination[] getDurableDestinations() { 812 return durableDestinations; 813 } 814 815 /** 816 * @param durableDestinations The durableDestinations to set. 817 */ 818 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 819 this.durableDestinations = durableDestinations; 820 } 821 822 /** 823 * @return Returns the localBroker. 824 */ 825 public Transport getLocalBroker() { 826 return localBroker; 827 } 828 829 /** 830 * @return Returns the remoteBroker. 831 */ 832 public Transport getRemoteBroker() { 833 return remoteBroker; 834 } 835 836 /** 837 * @return the createdByDuplex 838 */ 839 public boolean isCreatedByDuplex() { 840 return this.createdByDuplex; 841 } 842 843 /** 844 * @param createdByDuplex the createdByDuplex to set 845 */ 846 public void setCreatedByDuplex(boolean createdByDuplex) { 847 this.createdByDuplex = createdByDuplex; 848 } 849 850 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 851 if (brokerPath != null) { 852 for (int i = 0; i < brokerPath.length; i++) { 853 if (brokerId.equals(brokerPath[i])) { 854 return true; 855 } 856 } 857 } 858 return false; 859 } 860 861 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 862 if (brokerPath == null || brokerPath.length == 0) { 863 return pathsToAppend; 864 } 865 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 866 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 867 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 868 return rc; 869 } 870 871 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 872 if (brokerPath == null || brokerPath.length == 0) { 873 return new BrokerId[] { idToAppend }; 874 } 875 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 876 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 877 rc[brokerPath.length] = idToAppend; 878 return rc; 879 } 880 881 protected boolean isPermissableDestination(ActiveMQDestination destination) { 882 return isPermissableDestination(destination, false); 883 } 884 885 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 886 // Are we not bridging temp destinations? 887 if (destination.isTemporary()) { 888 if (allowTemporary) { 889 return true; 890 } else { 891 return configuration.isBridgeTempDestinations(); 892 } 893 } 894 895 ActiveMQDestination[] dests = staticallyIncludedDestinations; 896 if (dests != null && dests.length > 0) { 897 for (int i = 0; i < dests.length; i++) { 898 ActiveMQDestination match = dests[i]; 899 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); 900 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { 901 return true; 902 } 903 } 904 } 905 906 dests = excludedDestinations; 907 if (dests != null && dests.length > 0) { 908 for (int i = 0; i < dests.length; i++) { 909 ActiveMQDestination match = dests[i]; 910 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match); 911 if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { 912 return false; 913 } 914 } 915 } 916 917 dests = dynamicallyIncludedDestinations; 918 if (dests != null && dests.length > 0) { 919 for (int i = 0; i < dests.length; i++) { 920 ActiveMQDestination match = dests[i]; 921 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); 922 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { 923 return true; 924 } 925 } 926 927 return false; 928 } 929 return true; 930 } 931 932 /** 933 * Subscriptions for these destinations are always created 934 */ 935 protected void setupStaticDestinations() { 936 ActiveMQDestination[] dests = staticallyIncludedDestinations; 937 if (dests != null) { 938 for (int i = 0; i < dests.length; i++) { 939 ActiveMQDestination dest = dests[i]; 940 DemandSubscription sub = createDemandSubscription(dest); 941 try { 942 addSubscription(sub); 943 } catch (IOException e) { 944 LOG.error("Failed to add static destination " + dest, e); 945 } 946 if (LOG.isTraceEnabled()) { 947 LOG.trace("bridging messages for static destination: " + dest); 948 } 949 } 950 } 951 } 952 953 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 954 boolean consumerAdded = false; 955 ConsumerInfo info = consumerInfo.copy(); 956 addRemoteBrokerToBrokerPath(info); 957 DemandSubscription sub = createDemandSubscription(info); 958 if (sub != null) { 959 if (duplicateSuppressionIsRequired(sub)) { 960 undoMapRegistration(sub); 961 } else { 962 addSubscription(sub); 963 consumerAdded = true; 964 } 965 } 966 return consumerAdded; 967 } 968 969 private void undoMapRegistration(DemandSubscription sub) { 970 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 971 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 972 } 973 974 /* 975 * check our existing subs networkConsumerIds against the list of network ids in this subscription 976 * A match means a duplicate which we suppress for topics and maybe for queues 977 */ 978 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 979 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 980 boolean suppress = false; 981 982 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || 983 consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) { 984 return suppress; 985 } 986 987 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 988 Collection<Subscription> currentSubs = 989 getRegionSubscriptions(consumerInfo.getDestination()); 990 for (Subscription sub : currentSubs) { 991 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 992 if (!networkConsumers.isEmpty()) { 993 if (matchFound(candidateConsumers, networkConsumers)) { 994 if (isInActiveDurableSub(sub)) { 995 suppress = false; 996 } else { 997 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 998 } 999 break; 1000 } 1001 } 1002 } 1003 return suppress; 1004 } 1005 1006 private boolean isInActiveDurableSub(Subscription sub) { 1007 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive()); 1008 } 1009 1010 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1011 boolean suppress = false; 1012 1013 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1014 if (LOG.isDebugEnabled()) { 1015 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName 1016 + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 1017 + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); 1018 } 1019 suppress = true; 1020 } else { 1021 // remove the existing lower priority duplicate and allow this candidate 1022 try { 1023 removeDuplicateSubscription(existingSub); 1024 1025 if (LOG.isDebugEnabled()) { 1026 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() 1027 + " with sub from " + remoteBrokerName 1028 + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 1029 + candidateInfo.getNetworkConsumerIds()); 1030 } 1031 } catch (IOException e) { 1032 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e); 1033 } 1034 } 1035 return suppress; 1036 } 1037 1038 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1039 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1040 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1041 break; 1042 } 1043 } 1044 } 1045 1046 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1047 boolean found = false; 1048 for (ConsumerId aliasConsumer : networkConsumers) { 1049 if (candidateConsumers.contains(aliasConsumer)) { 1050 found = true; 1051 break; 1052 } 1053 } 1054 return found; 1055 } 1056 1057 private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1058 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1059 Region region; 1060 Collection<Subscription> subs; 1061 1062 region = null; 1063 switch ( dest.getDestinationType() ) 1064 { 1065 case ActiveMQDestination.QUEUE_TYPE: 1066 region = region_broker.getQueueRegion(); 1067 break; 1068 1069 case ActiveMQDestination.TOPIC_TYPE: 1070 region = region_broker.getTopicRegion(); 1071 break; 1072 1073 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1074 region = region_broker.getTempQueueRegion(); 1075 break; 1076 1077 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1078 region = region_broker.getTempTopicRegion(); 1079 break; 1080 } 1081 1082 if ( region instanceof AbstractRegion ) 1083 subs = ((AbstractRegion) region).getSubscriptions().values(); 1084 else 1085 subs = null; 1086 1087 return subs; 1088 } 1089 1090 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1091 //add our original id to ourselves 1092 info.addNetworkConsumerId(info.getConsumerId()); 1093 return doCreateDemandSubscription(info); 1094 } 1095 1096 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1097 DemandSubscription result = new DemandSubscription(info); 1098 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1099 if (info.getDestination().isTemporary()) { 1100 // reset the local connection Id 1101 1102 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1103 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1104 } 1105 1106 if (configuration.isDecreaseNetworkConsumerPriority()) { 1107 byte priority = (byte) configuration.getConsumerPriorityBase(); 1108 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1109 // The longer the path to the consumer, the less it's consumer priority. 1110 priority -= info.getBrokerPath().length + 1; 1111 } 1112 result.getLocalInfo().setPriority(priority); 1113 if (LOG.isDebugEnabled()) { 1114 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info); 1115 } 1116 } 1117 configureDemandSubscription(info, result); 1118 return result; 1119 } 1120 1121 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 1122 ConsumerInfo info = new ConsumerInfo(); 1123 info.setDestination(destination); 1124 // the remote info held by the DemandSubscription holds the original 1125 // consumerId, 1126 // the local info get's overwritten 1127 1128 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1129 DemandSubscription result = null; 1130 try { 1131 result = createDemandSubscription(info); 1132 } catch (IOException e) { 1133 LOG.error("Failed to create DemandSubscription ", e); 1134 } 1135 return result; 1136 } 1137 1138 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1139 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1140 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 1141 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1142 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1143 1144 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1145 if (!info.isDurable()) { 1146 // This works for now since we use a VM connection to the local broker. 1147 // may need to change if we ever subscribe to a remote broker. 1148 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1149 } else { 1150 // need to ack this message if it is ignored as it is durable so 1151 // we check before we send. see: suppressMessageDispatch() 1152 } 1153 } 1154 1155 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1156 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1157 if (LOG.isDebugEnabled()) { 1158 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub); 1159 } 1160 if (sub != null) { 1161 removeSubscription(sub); 1162 if (LOG.isDebugEnabled()) { 1163 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo()); 1164 } 1165 } 1166 } 1167 1168 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1169 boolean removeDone = false; 1170 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1171 if (sub != null) { 1172 try { 1173 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1174 removeDone = true; 1175 } catch (IOException e) { 1176 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e); 1177 } 1178 } 1179 return removeDone; 1180 } 1181 1182 protected void waitStarted() throws InterruptedException { 1183 startedLatch.await(); 1184 } 1185 1186 protected void clearDownSubscriptions() { 1187 subscriptionMapByLocalId.clear(); 1188 subscriptionMapByRemoteId.clear(); 1189 } 1190 1191 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1192 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1193 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1194 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1195 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1196 filterFactory = entry.getNetworkBridgeFilterFactory(); 1197 } 1198 } 1199 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL()); 1200 } 1201 1202 protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { 1203 synchronized (brokerInfoMutex) { 1204 if (remoteBrokerId != null) { 1205 if (remoteBrokerId.equals(localBrokerId)) { 1206 if (LOG.isTraceEnabled()) { 1207 LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId); 1208 } 1209 waitStarted(); 1210 ServiceSupport.dispose(this); 1211 } 1212 } 1213 } 1214 } 1215 1216 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1217 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1218 } 1219 1220 protected void serviceRemoteBrokerInfo(Command command) throws IOException { 1221 synchronized (brokerInfoMutex) { 1222 BrokerInfo remoteBrokerInfo = (BrokerInfo)command; 1223 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 1224 remoteBrokerPath[0] = remoteBrokerId; 1225 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 1226 if (localBrokerId != null) { 1227 if (localBrokerId.equals(remoteBrokerId)) { 1228 if (LOG.isTraceEnabled()) { 1229 LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId); 1230 } 1231 ServiceSupport.dispose(this); 1232 } 1233 } 1234 if (!disposed.get()) { 1235 triggerLocalStartBridge(); 1236 } 1237 } 1238 } 1239 1240 protected BrokerId[] getRemoteBrokerPath() { 1241 return remoteBrokerPath; 1242 } 1243 1244 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1245 this.networkBridgeListener = listener; 1246 } 1247 1248 private void fireBridgeFailed() { 1249 NetworkBridgeListener l = this.networkBridgeListener; 1250 if (l != null) { 1251 l.bridgeFailed(); 1252 } 1253 } 1254 1255 public String getRemoteAddress() { 1256 return remoteBroker.getRemoteAddress(); 1257 } 1258 1259 public String getLocalAddress() { 1260 return localBroker.getRemoteAddress(); 1261 } 1262 1263 public String getRemoteBrokerName() { 1264 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1265 } 1266 1267 public String getLocalBrokerName() { 1268 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1269 } 1270 1271 public long getDequeueCounter() { 1272 return dequeueCounter.get(); 1273 } 1274 1275 public long getEnqueueCounter() { 1276 return enqueueCounter.get(); 1277 } 1278 1279 protected boolean isDuplex() { 1280 return configuration.isDuplex() || createdByDuplex; 1281 } 1282 1283 public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1284 return subscriptionMapByRemoteId; 1285 } 1286 1287 public void setBrokerService(BrokerService brokerService) { 1288 this.brokerService = brokerService; 1289 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1290 localBrokerPath[0] = localBrokerId; 1291 } 1292 1293 public void setMbeanObjectName(ObjectName objectName) { 1294 this.mbeanObjectName = objectName; 1295 } 1296 1297 public ObjectName getMbeanObjectName() { 1298 return mbeanObjectName; 1299 } 1300}