001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.security.GeneralSecurityException;
021    import java.security.cert.X509Certificate;
022    import java.util.Arrays;
023    import java.util.Collection;
024    import java.util.List;
025    import java.util.Properties;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    import java.util.concurrent.atomic.AtomicLong;
031    
032    import javax.management.ObjectName;
033    import org.apache.activemq.Service;
034    import org.apache.activemq.advisory.AdvisorySupport;
035    import org.apache.activemq.broker.BrokerService;
036    import org.apache.activemq.broker.BrokerServiceAware;
037    import org.apache.activemq.broker.TransportConnection;
038    import org.apache.activemq.broker.region.AbstractRegion;
039    import org.apache.activemq.broker.region.DurableTopicSubscription;
040    import org.apache.activemq.broker.region.Region;
041    import org.apache.activemq.broker.region.RegionBroker;
042    import org.apache.activemq.broker.region.Subscription;
043    import org.apache.activemq.broker.region.policy.PolicyEntry;
044    import org.apache.activemq.command.*;
045    import org.apache.activemq.filter.DestinationFilter;
046    import org.apache.activemq.filter.MessageEvaluationContext;
047    import org.apache.activemq.thread.DefaultThreadPools;
048    import org.apache.activemq.thread.TaskRunnerFactory;
049    import org.apache.activemq.transport.DefaultTransportListener;
050    import org.apache.activemq.transport.FutureResponse;
051    import org.apache.activemq.transport.ResponseCallback;
052    import org.apache.activemq.transport.Transport;
053    import org.apache.activemq.transport.TransportDisposedIOException;
054    import org.apache.activemq.transport.TransportFilter;
055    import org.apache.activemq.transport.tcp.SslTransport;
056    import org.apache.activemq.util.IdGenerator;
057    import org.apache.activemq.util.IntrospectionSupport;
058    import org.apache.activemq.util.LongSequenceGenerator;
059    import org.apache.activemq.util.MarshallingSupport;
060    import org.apache.activemq.util.ServiceStopper;
061    import org.apache.activemq.util.ServiceSupport;
062    import org.slf4j.Logger;
063    import org.slf4j.LoggerFactory;
064    
065    /**
066     * A useful base class for implementing demand forwarding bridges.
067     */
068    public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
069        private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
070        private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
071        protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
072        protected final Transport localBroker;
073        protected final Transport remoteBroker;
074        protected final IdGenerator idGenerator = new IdGenerator();
075        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
076        protected ConnectionInfo localConnectionInfo;
077        protected ConnectionInfo remoteConnectionInfo;
078        protected SessionInfo localSessionInfo;
079        protected ProducerInfo producerInfo;
080        protected String remoteBrokerName = "Unknown";
081        protected String localClientId;
082        protected ConsumerInfo demandConsumerInfo;
083        protected int demandConsumerDispatched;
084        protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
085        protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
086        protected AtomicBoolean disposed = new AtomicBoolean();
087        protected BrokerId localBrokerId;
088        protected ActiveMQDestination[] excludedDestinations;
089        protected ActiveMQDestination[] dynamicallyIncludedDestinations;
090        protected ActiveMQDestination[] staticallyIncludedDestinations;
091        protected ActiveMQDestination[] durableDestinations;
092        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
093        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
094        protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
095        protected CountDownLatch startedLatch = new CountDownLatch(2);
096        protected CountDownLatch localStartedLatch = new CountDownLatch(1);
097        protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
098        protected NetworkBridgeConfiguration configuration;
099        protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
100    
101        protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
102        protected Object brokerInfoMutex = new Object();
103        protected BrokerId remoteBrokerId;
104    
105        final AtomicLong enqueueCounter = new AtomicLong();
106        final AtomicLong dequeueCounter = new AtomicLong();
107    
108        private NetworkBridgeListener networkBridgeListener;
109        private boolean createdByDuplex;
110        private BrokerInfo localBrokerInfo;
111        private BrokerInfo remoteBrokerInfo;
112    
113        private final AtomicBoolean started = new AtomicBoolean();
114        private TransportConnection duplexInitiatingConnection;
115        private BrokerService brokerService = null;
116        private ObjectName mbeanObjectName;
117    
118        public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
119            this.configuration = configuration;
120            this.localBroker = localBroker;
121            this.remoteBroker = remoteBroker;
122        }
123    
124        public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
125            this.localBrokerInfo = localBrokerInfo;
126            this.remoteBrokerInfo = remoteBrokerInfo;
127            this.duplexInitiatingConnection = connection;
128            start();
129            serviceRemoteCommand(remoteBrokerInfo);
130        }
131    
132        public void start() throws Exception {
133            if (started.compareAndSet(false, true)) {
134                localBroker.setTransportListener(new DefaultTransportListener() {
135    
136                    @Override
137                    public void onCommand(Object o) {
138                        Command command = (Command) o;
139                        serviceLocalCommand(command);
140                    }
141    
142                    @Override
143                    public void onException(IOException error) {
144                        serviceLocalException(error);
145                    }
146                });
147                remoteBroker.setTransportListener(new DefaultTransportListener() {
148    
149                    public void onCommand(Object o) {
150                        Command command = (Command) o;
151                        serviceRemoteCommand(command);
152                    }
153    
154                    public void onException(IOException error) {
155                        serviceRemoteException(error);
156                    }
157    
158                });
159    
160                localBroker.start();
161                remoteBroker.start();
162                if (!disposed.get()) {
163                    try {
164                        triggerRemoteStartBridge();
165                    } catch (IOException e) {
166                        LOG.warn("Caught exception from remote start", e);
167                    }
168                } else {
169                    LOG.warn ("Bridge was disposed before the start() method was fully executed.");
170                    throw new TransportDisposedIOException();
171                }
172            }
173        }
174    
175        protected void triggerLocalStartBridge() throws IOException {
176            asyncTaskRunner.execute(new Runnable() {
177                public void run() {
178                    final String originalName = Thread.currentThread().getName();
179                    Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
180                    try {
181                        startLocalBridge();
182                    } catch (Throwable e) {
183                        serviceLocalException(e);
184                    } finally {
185                        Thread.currentThread().setName(originalName);
186                    }
187                }
188            });
189        }
190    
191        protected void triggerRemoteStartBridge() throws IOException {
192            asyncTaskRunner.execute(new Runnable() {
193                public void run() {
194                    final String originalName = Thread.currentThread().getName();
195                    Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
196                    try {
197                        startRemoteBridge();
198                    } catch (Exception e) {
199                        serviceRemoteException(e);
200                    } finally {
201                        Thread.currentThread().setName(originalName);
202                    }
203                }
204            });
205        }
206    
207        private void startLocalBridge() throws Throwable {
208            if (localBridgeStarted.compareAndSet(false, true)) {
209                synchronized (this) {
210                    if (LOG.isTraceEnabled()) {
211                        LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
212                    }
213                    if (!disposed.get()) {
214                        localConnectionInfo = new ConnectionInfo();
215                        localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
216                        localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
217                        localConnectionInfo.setClientId(localClientId);
218                        localConnectionInfo.setUserName(configuration.getUserName());
219                        localConnectionInfo.setPassword(configuration.getPassword());
220                        Transport originalTransport = remoteBroker;
221                        while (originalTransport instanceof TransportFilter) {
222                            originalTransport = ((TransportFilter) originalTransport).getNext();
223                        }
224                        if (originalTransport instanceof SslTransport) {
225                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
226                            localConnectionInfo.setTransportContext(peerCerts);
227                        }
228                        // sync requests that may fail
229                        Object resp = localBroker.request(localConnectionInfo);
230                        if (resp instanceof ExceptionResponse) {
231                            throw ((ExceptionResponse)resp).getException();
232                        }
233                        localSessionInfo = new SessionInfo(localConnectionInfo, 1);
234                        localBroker.oneway(localSessionInfo);
235    
236                        brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
237                        NetworkBridgeListener l = this.networkBridgeListener;
238                        if (l != null) {
239                            l.onStart(this);
240                        }
241                        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
242    
243                    } else {
244                        LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
245                    }
246                    startedLatch.countDown();
247                    localStartedLatch.countDown();
248                    if (!disposed.get()) {
249                        setupStaticDestinations();
250                    } else {
251                        LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
252                    }
253                }
254            }
255        }
256    
257        protected void startRemoteBridge() throws Exception {
258            if (remoteBridgeStarted.compareAndSet(false, true)) {
259                if (LOG.isTraceEnabled()) {
260                    LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
261                }
262                synchronized (this) {
263                    if (!isCreatedByDuplex()) {
264                        BrokerInfo brokerInfo = new BrokerInfo();
265                        brokerInfo.setBrokerName(configuration.getBrokerName());
266                        brokerInfo.setBrokerURL(configuration.getBrokerURL());
267                        brokerInfo.setNetworkConnection(true);
268                        brokerInfo.setDuplexConnection(configuration.isDuplex());
269                        // set our properties
270                        Properties props = new Properties();
271                        IntrospectionSupport.getProperties(configuration, props, null);
272                        String str = MarshallingSupport.propertiesToString(props);
273                        brokerInfo.setNetworkProperties(str);
274                        brokerInfo.setBrokerId(this.localBrokerId);
275                        remoteBroker.oneway(brokerInfo);
276                    }
277                    if (remoteConnectionInfo != null) {
278                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
279                    }
280                    remoteConnectionInfo = new ConnectionInfo();
281                    remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
282                    remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
283                    remoteConnectionInfo.setUserName(configuration.getUserName());
284                    remoteConnectionInfo.setPassword(configuration.getPassword());
285                    remoteBroker.oneway(remoteConnectionInfo);
286    
287                    SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
288                    remoteBroker.oneway(remoteSessionInfo);
289                    producerInfo = new ProducerInfo(remoteSessionInfo, 1);
290                    producerInfo.setResponseRequired(false);
291                    remoteBroker.oneway(producerInfo);
292                    // Listen to consumer advisory messages on the remote broker to
293                    // determine demand.
294                    if (!configuration.isStaticBridge()) {
295                        demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
296                        demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
297                        String advisoryTopic = configuration.getDestinationFilter();
298                        if (configuration.isBridgeTempDestinations()) {
299                            advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
300                        }
301                        demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
302                        demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
303                        remoteBroker.oneway(demandConsumerInfo);
304                    }
305                    startedLatch.countDown();
306                }
307            }
308        }
309    
310        public void stop() throws Exception {
311            if (started.compareAndSet(true, false)) {
312                if (disposed.compareAndSet(false, true)) {
313                    LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
314                    NetworkBridgeListener l = this.networkBridgeListener;
315                    if (l != null) {
316                        l.onStop(this);
317                    }
318                    try {
319                        remoteBridgeStarted.set(false);
320                        final CountDownLatch sendShutdown = new CountDownLatch(1);
321                        asyncTaskRunner.execute(new Runnable() {
322                            public void run() {
323                                try {
324                                    localBroker.oneway(new ShutdownInfo());
325                                    sendShutdown.countDown();
326                                    remoteBroker.oneway(new ShutdownInfo());
327                                } catch (Throwable e) {
328                                    LOG.debug("Caught exception sending shutdown", e);
329                                } finally {
330                                    sendShutdown.countDown();
331                                }
332    
333                            }
334                        });
335                        if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
336                            LOG.info("Network Could not shutdown in a timely manner");
337                        }
338                    } finally {
339                        ServiceStopper ss = new ServiceStopper();
340                        ss.stop(remoteBroker);
341                        ss.stop(localBroker);
342                        // Release the started Latch since another thread could be
343                        // stuck waiting for it to start up.
344                        startedLatch.countDown();
345                        startedLatch.countDown();
346                        localStartedLatch.countDown();
347                        ss.throwFirstException();
348                    }
349                }
350                if (remoteBrokerInfo != null) {
351                    brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
352                    brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
353                    LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
354                }
355            }
356        }
357    
358        public void serviceRemoteException(Throwable error) {
359            if (!disposed.get()) {
360                if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
361                    LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
362                } else {
363                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
364                }
365                LOG.debug("The remote Exception was: " + error, error);
366                asyncTaskRunner.execute(new Runnable() {
367                    public void run() {
368                        ServiceSupport.dispose(getControllingService());
369                    }
370                });
371                fireBridgeFailed();
372            }
373        }
374    
375        protected void serviceRemoteCommand(Command command) {
376            if (!disposed.get()) {
377                try {
378                    if (command.isMessageDispatch()) {
379                        waitStarted();
380                        MessageDispatch md = (MessageDispatch) command;
381                        serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
382                        ackAdvisory(md.getMessage());
383                    } else if (command.isBrokerInfo()) {
384                        lastConnectSucceeded.set(true);
385                        remoteBrokerInfo = (BrokerInfo) command;
386                        Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
387                        try {
388                            IntrospectionSupport.getProperties(configuration, props, null);
389                            if (configuration.getExcludedDestinations() != null) {
390                                excludedDestinations = configuration.getExcludedDestinations().toArray(
391                                        new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
392                            }
393                            if (configuration.getStaticallyIncludedDestinations() != null) {
394                                staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
395                                        new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
396                            }
397                            if (configuration.getDynamicallyIncludedDestinations() != null) {
398                                dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
399                                        .toArray(
400                                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
401                                                        .size()]);
402                            }
403                        } catch (Throwable t) {
404                            LOG.error("Error mapping remote destinations", t);
405                        }
406                        serviceRemoteBrokerInfo(command);
407                        // Let the local broker know the remote broker's ID.
408                        localBroker.oneway(command);
409                        // new peer broker (a consumer can work with remote broker also)
410                        brokerService.getBroker().addBroker(null, remoteBrokerInfo);
411                    } else if (command.getClass() == ConnectionError.class) {
412                        ConnectionError ce = (ConnectionError) command;
413                        serviceRemoteException(ce.getException());
414                    } else {
415                        if (isDuplex()) {
416                            if (command.isMessage()) {
417                                ActiveMQMessage message = (ActiveMQMessage) command;
418                                if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
419                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
420                                    serviceRemoteConsumerAdvisory(message.getDataStructure());
421                                    ackAdvisory(message);
422                                } else {
423                                    if (!isPermissableDestination(message.getDestination(), true)) {
424                                        return;
425                                    }
426                                    if (message.isResponseRequired()) {
427                                        Response reply = new Response();
428                                        reply.setCorrelationId(message.getCommandId());
429                                        localBroker.oneway(message);
430                                        remoteBroker.oneway(reply);
431                                    } else {
432                                        localBroker.oneway(message);
433                                    }
434                                }
435                            } else {
436                                switch (command.getDataStructureType()) {
437                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
438                                case SessionInfo.DATA_STRUCTURE_TYPE:
439                                case ProducerInfo.DATA_STRUCTURE_TYPE:
440                                    localBroker.oneway(command);
441                                    break;
442                                case MessageAck.DATA_STRUCTURE_TYPE:
443                                    MessageAck ack = (MessageAck) command;
444                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
445                                    if (localSub != null) {
446                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
447                                        localBroker.oneway(ack);
448                                    } else {
449                                        LOG.warn("Matching local subscription not found for ack: " + ack);
450                                    }
451                                    break;
452                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
453                                    localStartedLatch.await();
454                                    if (started.get()) {
455                                        if (!addConsumerInfo((ConsumerInfo) command)) {
456                                            if (LOG.isDebugEnabled()) {
457                                                LOG.debug("Ignoring ConsumerInfo: " + command);
458                                            }
459                                        } else {
460                                            if (LOG.isTraceEnabled()) {
461                                                LOG.trace("Adding ConsumerInfo: " + command);
462                                            }
463                                        }
464                                    } else {
465                                        // received a subscription whilst stopping
466                                        LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
467                                    }
468                                    break;
469                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
470                                    // initiator is shutting down, controlled case
471                                    // abortive close dealt with by inactivity monitor
472                                    LOG.info("Stopping network bridge on shutdown of remote broker");
473                                    serviceRemoteException(new IOException(command.toString()));
474                                    break;
475                                default:
476                                    if (LOG.isDebugEnabled()) {
477                                        LOG.debug("Ignoring remote command: " + command);
478                                    }
479                                }
480                            }
481                        } else {
482                            switch (command.getDataStructureType()) {
483                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
484                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
485                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
486                                break;
487                            default:
488                                LOG.warn("Unexpected remote command: " + command);
489                            }
490                        }
491                    }
492                } catch (Throwable e) {
493                    if (LOG.isDebugEnabled()) {
494                        LOG.debug("Exception processing remote command: " + command, e);
495                    }
496                    serviceRemoteException(e);
497                }
498            }
499        }
500    
501        private void ackAdvisory(Message message) throws IOException {
502            demandConsumerDispatched++;
503            if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
504                MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
505                ack.setConsumerId(demandConsumerInfo.getConsumerId());
506                remoteBroker.oneway(ack);
507                demandConsumerDispatched = 0;
508            }
509        }
510    
511        private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
512            final int networkTTL = configuration.getNetworkTTL();
513            if (data.getClass() == ConsumerInfo.class) {
514                // Create a new local subscription
515                ConsumerInfo info = (ConsumerInfo) data;
516                BrokerId[] path = info.getBrokerPath();
517    
518                if (info.isBrowser()) {
519                    if (LOG.isDebugEnabled()) {
520                        LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
521                    }
522                    return;
523                }
524    
525                if (path != null && path.length >= networkTTL) {
526                    if (LOG.isDebugEnabled()) {
527                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
528                    }
529                    return;
530                }
531                if (contains(path, localBrokerPath[0])) {
532                    // Ignore this consumer as it's a consumer we locally sent to the broker.
533                    if (LOG.isDebugEnabled()) {
534                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
535                    }
536                    return;
537                }
538                if (!isPermissableDestination(info.getDestination())) {
539                    // ignore if not in the permitted or in the excluded list
540                    if (LOG.isDebugEnabled()) {
541                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
542                    }
543                    return;
544                }
545    
546                // in a cyclic network there can be multiple bridges per broker that can propagate
547                // a network subscription so there is a need to synchronise on a shared entity
548                synchronized (brokerService.getVmConnectorURI()) {
549                    if (addConsumerInfo(info)) {
550                        if (LOG.isDebugEnabled()) {
551                            LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
552                        }
553                    } else {
554                        if (LOG.isDebugEnabled()) {
555                            LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
556                        }
557                    }
558                }
559            } else if (data.getClass() == DestinationInfo.class) {
560                // It's a destination info - we want to pass up
561                // information about temporary destinations
562                DestinationInfo destInfo = (DestinationInfo) data;
563                BrokerId[] path = destInfo.getBrokerPath();
564                if (path != null && path.length >= networkTTL) {
565                    if (LOG.isDebugEnabled()) {
566                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
567                    }
568                    return;
569                }
570                if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
571                    // Ignore this consumer as it's a consumer we locally sent to
572                    // the broker.
573                    if (LOG.isDebugEnabled()) {
574                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
575                    }
576                    return;
577                }
578                destInfo.setConnectionId(localConnectionInfo.getConnectionId());
579                if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
580                    // re-set connection id so comes from here
581                    ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
582                    tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
583                }
584                destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
585                if (LOG.isTraceEnabled()) {
586                    LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
587                }
588                localBroker.oneway(destInfo);
589            } else if (data.getClass() == RemoveInfo.class) {
590                ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
591                removeDemandSubscription(id);
592            }
593        }
594    
595        public void serviceLocalException(Throwable error) {
596            if (!disposed.get()) {
597                LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
598                LOG.debug("The local Exception was:" + error, error);
599                asyncTaskRunner.execute(new Runnable() {
600                    public void run() {
601                        ServiceSupport.dispose(getControllingService());
602                    }
603                });
604                fireBridgeFailed();
605            }
606        }
607    
608        protected Service getControllingService() {
609            return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
610        }
611    
612        protected void addSubscription(DemandSubscription sub) throws IOException {
613            if (sub != null) {
614                localBroker.oneway(sub.getLocalInfo());
615            }
616        }
617    
618        protected void removeSubscription(final DemandSubscription sub) throws IOException {
619            if (sub != null) {
620                if (LOG.isDebugEnabled()) {
621                    LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
622                }
623                subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
624                subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
625    
626                // continue removal in separate thread to free up this thread for outstanding responses
627                asyncTaskRunner.execute(new Runnable() {
628                    public void run() {
629                        sub.waitForCompletion();
630                        try {
631                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
632                        } catch (IOException e) {
633                            LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
634                        }
635                    }
636                });
637            }
638        }
639    
640        protected Message configureMessage(MessageDispatch md) {
641            Message message = md.getMessage().copy();
642            // Update the packet to show where it came from.
643            message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
644            message.setProducerId(producerInfo.getProducerId());
645            message.setDestination(md.getDestination());
646            if (message.getOriginalTransactionId() == null) {
647                message.setOriginalTransactionId(message.getTransactionId());
648            }
649            message.setTransactionId(null);
650            return message;
651        }
652    
653        protected void serviceLocalCommand(Command command) {
654            if (!disposed.get()) {
655                try {
656                    if (command.isMessageDispatch()) {
657                        enqueueCounter.incrementAndGet();
658                        final MessageDispatch md = (MessageDispatch) command;
659                        final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
660                        if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
661    
662                            if (suppressMessageDispatch(md, sub)) {
663                                if (LOG.isDebugEnabled()) {
664                                    LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
665                                }
666                                // still ack as it may be durable
667                                try {
668                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
669                                } finally {
670                                    sub.decrementOutstandingResponses();
671                                }
672                                return;
673                            }
674    
675                            Message message = configureMessage(md);
676                            if (LOG.isDebugEnabled()) {
677                                LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
678                            }
679    
680                            if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
681    
682                                // If the message was originally sent using async
683                                // send, we will preserve that QOS
684                                // by bridging it using an async send (small chance
685                                // of message loss).
686                                try {
687                                    remoteBroker.oneway(message);
688                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
689                                    dequeueCounter.incrementAndGet();
690                                } finally {
691                                    sub.decrementOutstandingResponses();
692                                }
693    
694                            } else {
695    
696                                // The message was not sent using async send, so we
697                                // should only ack the local
698                                // broker when we get confirmation that the remote
699                                // broker has received the message.
700                                ResponseCallback callback = new ResponseCallback() {
701                                    public void onCompletion(FutureResponse future) {
702                                        try {
703                                            Response response = future.getResult();
704                                            if (response.isException()) {
705                                                ExceptionResponse er = (ExceptionResponse) response;
706                                                serviceLocalException(er.getException());
707                                            } else {
708                                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
709                                                dequeueCounter.incrementAndGet();
710                                            }
711                                        } catch (IOException e) {
712                                            serviceLocalException(e);
713                                        } finally {
714                                            sub.decrementOutstandingResponses();
715                                        }
716                                    }
717                                };
718    
719                                remoteBroker.asyncRequest(message, callback);
720    
721                            }
722                        } else {
723                            if (LOG.isDebugEnabled()) {
724                                LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
725                            }
726                        }
727                    } else if (command.isBrokerInfo()) {
728                        localBrokerInfo = (BrokerInfo) command;
729                        serviceLocalBrokerInfo(command);
730                    } else if (command.isShutdownInfo()) {
731                        LOG.info(configuration.getBrokerName() + " Shutting down");
732                        stop();
733                    } else if (command.getClass() == ConnectionError.class) {
734                        ConnectionError ce = (ConnectionError) command;
735                        serviceLocalException(ce.getException());
736                    } else {
737                        switch (command.getDataStructureType()) {
738                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
739                            break;
740                        default:
741                            LOG.warn("Unexpected local command: " + command);
742                        }
743                    }
744                } catch (Throwable e) {
745                    LOG.warn("Caught an exception processing local command", e);
746                    serviceLocalException(e);
747                }
748            }
749        }
750    
751        private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
752            boolean suppress = false;
753            // for durable subs, suppression via filter leaves dangling acks so we need to
754            // check here and allow the ack irrespective
755            if (sub.getLocalInfo().isDurable()) {
756                MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
757                messageEvalContext.setMessageReference(md.getMessage());
758                messageEvalContext.setDestination(md.getDestination());
759                suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
760            }
761            return suppress;
762        }
763    
764        /**
765         * @return Returns the dynamicallyIncludedDestinations.
766         */
767        public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
768            return dynamicallyIncludedDestinations;
769        }
770    
771        /**
772         * @param dynamicallyIncludedDestinations The
773         *            dynamicallyIncludedDestinations to set.
774         */
775        public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
776            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
777        }
778    
779        /**
780         * @return Returns the excludedDestinations.
781         */
782        public ActiveMQDestination[] getExcludedDestinations() {
783            return excludedDestinations;
784        }
785    
786        /**
787         * @param excludedDestinations The excludedDestinations to set.
788         */
789        public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
790            this.excludedDestinations = excludedDestinations;
791        }
792    
793        /**
794         * @return Returns the staticallyIncludedDestinations.
795         */
796        public ActiveMQDestination[] getStaticallyIncludedDestinations() {
797            return staticallyIncludedDestinations;
798        }
799    
800        /**
801         * @param staticallyIncludedDestinations The staticallyIncludedDestinations
802         *            to set.
803         */
804        public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
805            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
806        }
807    
808        /**
809         * @return Returns the durableDestinations.
810         */
811        public ActiveMQDestination[] getDurableDestinations() {
812            return durableDestinations;
813        }
814    
815        /**
816         * @param durableDestinations The durableDestinations to set.
817         */
818        public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
819            this.durableDestinations = durableDestinations;
820        }
821    
822        /**
823         * @return Returns the localBroker.
824         */
825        public Transport getLocalBroker() {
826            return localBroker;
827        }
828    
829        /**
830         * @return Returns the remoteBroker.
831         */
832        public Transport getRemoteBroker() {
833            return remoteBroker;
834        }
835    
836        /**
837         * @return the createdByDuplex
838         */
839        public boolean isCreatedByDuplex() {
840            return this.createdByDuplex;
841        }
842    
843        /**
844         * @param createdByDuplex the createdByDuplex to set
845         */
846        public void setCreatedByDuplex(boolean createdByDuplex) {
847            this.createdByDuplex = createdByDuplex;
848        }
849    
850        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
851            if (brokerPath != null) {
852                for (int i = 0; i < brokerPath.length; i++) {
853                    if (brokerId.equals(brokerPath[i])) {
854                        return true;
855                    }
856                }
857            }
858            return false;
859        }
860    
861        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
862            if (brokerPath == null || brokerPath.length == 0) {
863                return pathsToAppend;
864            }
865            BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
866            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
867            System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
868            return rc;
869        }
870    
871        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
872            if (brokerPath == null || brokerPath.length == 0) {
873                return new BrokerId[] { idToAppend };
874            }
875            BrokerId rc[] = new BrokerId[brokerPath.length + 1];
876            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
877            rc[brokerPath.length] = idToAppend;
878            return rc;
879        }
880    
881        protected boolean isPermissableDestination(ActiveMQDestination destination) {
882            return isPermissableDestination(destination, false);
883        }
884    
885        protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
886            // Are we not bridging temp destinations?
887            if (destination.isTemporary()) {
888                if (allowTemporary) {
889                    return true;
890                } else {
891                    return configuration.isBridgeTempDestinations();
892                }
893            }
894    
895            ActiveMQDestination[] dests = staticallyIncludedDestinations;
896            if (dests != null && dests.length > 0) {
897                for (int i = 0; i < dests.length; i++) {
898                    ActiveMQDestination match = dests[i];
899                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
900                    if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
901                        return true;
902                    }
903                }
904            }
905    
906            dests = excludedDestinations;
907            if (dests != null && dests.length > 0) {
908                for (int i = 0; i < dests.length; i++) {
909                    ActiveMQDestination match = dests[i];
910                    DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
911                    if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
912                        return false;
913                    }
914                }
915            }
916    
917            dests = dynamicallyIncludedDestinations;
918            if (dests != null && dests.length > 0) {
919                for (int i = 0; i < dests.length; i++) {
920                    ActiveMQDestination match = dests[i];
921                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
922                    if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
923                        return true;
924                    }
925                }
926    
927                return false;
928            }
929            return true;
930        }
931    
932        /**
933         * Subscriptions for these destinations are always created
934         */
935        protected void setupStaticDestinations() {
936            ActiveMQDestination[] dests = staticallyIncludedDestinations;
937            if (dests != null) {
938                for (int i = 0; i < dests.length; i++) {
939                    ActiveMQDestination dest = dests[i];
940                    DemandSubscription sub = createDemandSubscription(dest);
941                    try {
942                        addSubscription(sub);
943                    } catch (IOException e) {
944                        LOG.error("Failed to add static destination " + dest, e);
945                    }
946                    if (LOG.isTraceEnabled()) {
947                        LOG.trace("bridging messages for static destination: " + dest);
948                    }
949                }
950            }
951        }
952    
953        protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
954            boolean consumerAdded = false;
955            ConsumerInfo info = consumerInfo.copy();
956            addRemoteBrokerToBrokerPath(info);
957            DemandSubscription sub = createDemandSubscription(info);
958            if (sub != null) {
959                if (duplicateSuppressionIsRequired(sub)) {
960                    undoMapRegistration(sub);
961                } else {
962                    addSubscription(sub);
963                    consumerAdded = true;
964                }
965            }
966            return consumerAdded;
967        }
968    
969        private void undoMapRegistration(DemandSubscription sub) {
970            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
971            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
972        }
973    
974        /*
975         * check our existing subs networkConsumerIds against the list of network ids in this subscription
976         * A match means a duplicate which we suppress for topics and maybe for queues
977         */
978        private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
979            final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
980            boolean suppress = false;
981    
982            if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
983                    consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
984                return suppress;
985            }
986    
987            List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
988            Collection<Subscription> currentSubs =
989                getRegionSubscriptions(consumerInfo.getDestination());
990            for (Subscription sub : currentSubs) {
991                List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
992                if (!networkConsumers.isEmpty()) {
993                    if (matchFound(candidateConsumers, networkConsumers)) {
994                        if (isInActiveDurableSub(sub)) {
995                            suppress = false;
996                        } else {
997                            suppress = hasLowerPriority(sub, candidate.getLocalInfo());
998                        }
999                        break;
1000                    }
1001                }
1002            }
1003            return suppress;
1004        }
1005    
1006        private boolean isInActiveDurableSub(Subscription sub) {
1007            return  (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive());
1008        }
1009    
1010        private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1011            boolean suppress = false;
1012    
1013            if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1014                if (LOG.isDebugEnabled()) {
1015                    LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1016                            + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
1017                            + existingSub  + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1018                }
1019                suppress = true;
1020            } else {
1021                // remove the existing lower priority duplicate and allow this candidate
1022                try {
1023                    removeDuplicateSubscription(existingSub);
1024    
1025                    if (LOG.isDebugEnabled()) {
1026                        LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1027                                + " with sub from " + remoteBrokerName
1028                                + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1029                                + candidateInfo.getNetworkConsumerIds());
1030                    }
1031                } catch (IOException e) {
1032                    LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1033                }
1034            }
1035            return suppress;
1036        }
1037    
1038        private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1039            for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1040                if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1041                    break;
1042                }
1043            }
1044        }
1045    
1046        private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1047            boolean found = false;
1048            for (ConsumerId aliasConsumer : networkConsumers) {
1049                if (candidateConsumers.contains(aliasConsumer)) {
1050                    found = true;
1051                    break;
1052                }
1053            }
1054            return found;
1055        }
1056    
1057        private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1058            RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1059            Region region;
1060            Collection<Subscription> subs;
1061    
1062            region = null;
1063            switch ( dest.getDestinationType() )
1064            {
1065                case ActiveMQDestination.QUEUE_TYPE:
1066                    region = region_broker.getQueueRegion();
1067                    break;
1068    
1069                case ActiveMQDestination.TOPIC_TYPE:
1070                    region = region_broker.getTopicRegion();
1071                    break;
1072    
1073                case ActiveMQDestination.TEMP_QUEUE_TYPE:
1074                    region = region_broker.getTempQueueRegion();
1075                    break;
1076    
1077                case ActiveMQDestination.TEMP_TOPIC_TYPE:
1078                    region = region_broker.getTempTopicRegion();
1079                    break;
1080            }
1081    
1082            if ( region instanceof AbstractRegion )
1083                subs = ((AbstractRegion) region).getSubscriptions().values();
1084            else
1085                subs = null;
1086    
1087            return subs;
1088        }
1089    
1090        protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1091            //add our original id to ourselves
1092            info.addNetworkConsumerId(info.getConsumerId());
1093            return doCreateDemandSubscription(info);
1094        }
1095    
1096        protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1097            DemandSubscription result = new DemandSubscription(info);
1098            result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1099            if (info.getDestination().isTemporary()) {
1100                // reset the local connection Id
1101    
1102                ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1103                dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1104            }
1105    
1106            if (configuration.isDecreaseNetworkConsumerPriority()) {
1107                byte priority = (byte) configuration.getConsumerPriorityBase();
1108                if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1109                    // The longer the path to the consumer, the less it's consumer priority.
1110                    priority -= info.getBrokerPath().length + 1;
1111                }
1112                result.getLocalInfo().setPriority(priority);
1113                if (LOG.isDebugEnabled()) {
1114                    LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1115                }
1116            }
1117            configureDemandSubscription(info, result);
1118            return result;
1119        }
1120    
1121        final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1122            ConsumerInfo info = new ConsumerInfo();
1123            info.setDestination(destination);
1124            // the remote info held by the DemandSubscription holds the original
1125            // consumerId,
1126            // the local info get's overwritten
1127    
1128            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1129            DemandSubscription result = null;
1130            try {
1131                result = createDemandSubscription(info);
1132            } catch (IOException e) {
1133                LOG.error("Failed to create DemandSubscription ", e);
1134            }
1135            return result;
1136        }
1137    
1138        protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1139            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1140            sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1141            subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1142            subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1143    
1144            sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1145            if (!info.isDurable()) {
1146                // This works for now since we use a VM connection to the local broker.
1147                // may need to change if we ever subscribe to a remote broker.
1148                sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1149            } else  {
1150                // need to ack this message if it is ignored as it is durable so
1151                // we check before we send. see: suppressMessageDispatch()
1152            }
1153        }
1154    
1155        protected void removeDemandSubscription(ConsumerId id) throws IOException {
1156            DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1157            if (LOG.isDebugEnabled()) {
1158                LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1159            }
1160            if (sub != null) {
1161                removeSubscription(sub);
1162                if (LOG.isDebugEnabled()) {
1163                    LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1164                }
1165            }
1166        }
1167    
1168        protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1169            boolean removeDone = false;
1170            DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1171            if (sub != null) {
1172                try {
1173                    removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1174                    removeDone = true;
1175                } catch (IOException e) {
1176                    LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1177                }
1178            }
1179            return removeDone;
1180        }
1181    
1182        protected void waitStarted() throws InterruptedException {
1183            startedLatch.await();
1184        }
1185    
1186        protected void clearDownSubscriptions() {
1187            subscriptionMapByLocalId.clear();
1188            subscriptionMapByRemoteId.clear();
1189        }
1190    
1191        protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1192            NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1193            if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1194                PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1195                if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1196                    filterFactory = entry.getNetworkBridgeFilterFactory();
1197                }
1198            }
1199            return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1200        }
1201    
1202        protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
1203            synchronized (brokerInfoMutex) {
1204                if (remoteBrokerId != null) {
1205                    if (remoteBrokerId.equals(localBrokerId)) {
1206                        if (LOG.isTraceEnabled()) {
1207                            LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1208                        }
1209                        waitStarted();
1210                        ServiceSupport.dispose(this);
1211                    }
1212                }
1213            }
1214        }
1215    
1216        protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1217            info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1218        }
1219    
1220        protected void serviceRemoteBrokerInfo(Command command) throws IOException {
1221            synchronized (brokerInfoMutex) {
1222                BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
1223                remoteBrokerId = remoteBrokerInfo.getBrokerId();
1224                remoteBrokerPath[0] = remoteBrokerId;
1225                remoteBrokerName = remoteBrokerInfo.getBrokerName();
1226                if (localBrokerId != null) {
1227                    if (localBrokerId.equals(remoteBrokerId)) {
1228                        if (LOG.isTraceEnabled()) {
1229                            LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1230                        }
1231                        ServiceSupport.dispose(this);
1232                    }
1233                }
1234                if (!disposed.get()) {
1235                    triggerLocalStartBridge();
1236                }
1237            }
1238        }
1239    
1240        protected  BrokerId[] getRemoteBrokerPath() {
1241            return remoteBrokerPath;
1242        }
1243    
1244        public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1245            this.networkBridgeListener = listener;
1246        }
1247    
1248        private void fireBridgeFailed() {
1249            NetworkBridgeListener l = this.networkBridgeListener;
1250            if (l != null) {
1251                l.bridgeFailed();
1252            }
1253        }
1254    
1255        public String getRemoteAddress() {
1256            return remoteBroker.getRemoteAddress();
1257        }
1258    
1259        public String getLocalAddress() {
1260            return localBroker.getRemoteAddress();
1261        }
1262    
1263        public String getRemoteBrokerName() {
1264            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1265        }
1266    
1267        public String getLocalBrokerName() {
1268            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1269        }
1270    
1271        public long getDequeueCounter() {
1272            return dequeueCounter.get();
1273        }
1274    
1275        public long getEnqueueCounter() {
1276            return enqueueCounter.get();
1277        }
1278    
1279        protected boolean isDuplex() {
1280            return configuration.isDuplex() || createdByDuplex;
1281        }
1282    
1283        public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1284            return subscriptionMapByRemoteId;
1285        }
1286    
1287        public void setBrokerService(BrokerService brokerService) {
1288            this.brokerService = brokerService;
1289            this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1290            localBrokerPath[0] = localBrokerId;
1291        }
1292    
1293        public void setMbeanObjectName(ObjectName objectName) {
1294            this.mbeanObjectName = objectName;
1295        }
1296    
1297        public ObjectName getMbeanObjectName() {
1298            return mbeanObjectName;
1299        }
1300    }