001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.util.Collection; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import javax.management.MalformedObjectNameException; 029import javax.management.ObjectName; 030 031import org.apache.activemq.Service; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.jmx.AnnotatedMBean; 034import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 035import org.apache.activemq.broker.jmx.NetworkBridgeView; 036import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ConsumerId; 039import org.apache.activemq.transport.Transport; 040import org.apache.activemq.transport.TransportFactory; 041import org.apache.activemq.util.ServiceStopper; 042import org.apache.activemq.util.ServiceSupport; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Connector class for bridging broker networks. 048 */ 049public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { 050 051 private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class); 052 protected URI localURI; 053 protected ConnectionFilter connectionFilter; 054 protected ConcurrentMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>(); 055 056 protected ServiceSupport serviceSupport = new ServiceSupport() { 057 058 @Override 059 protected void doStart() throws Exception { 060 handleStart(); 061 } 062 063 @Override 064 protected void doStop(ServiceStopper stopper) throws Exception { 065 handleStop(stopper); 066 } 067 }; 068 069 private Set<ActiveMQDestination> durableDestinations; 070 071 private BrokerService brokerService; 072 private ObjectName objectName; 073 074 public NetworkConnector() { 075 } 076 077 public NetworkConnector(URI localURI) { 078 this.localURI = localURI; 079 } 080 081 public URI getLocalUri() throws URISyntaxException { 082 return localURI; 083 } 084 085 public void setLocalUri(URI localURI) { 086 this.localURI = localURI; 087 } 088 089 /** 090 * @return Returns the durableDestinations. 091 */ 092 public Set<ActiveMQDestination> getDurableDestinations() { 093 return durableDestinations; 094 } 095 096 /** 097 * @param durableDestinations The durableDestinations to set. 098 */ 099 public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) { 100 this.durableDestinations = durableDestinations; 101 } 102 103 104 public void addExcludedDestination(ActiveMQDestination destiantion) { 105 this.excludedDestinations.add(destiantion); 106 } 107 108 109 public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { 110 this.staticallyIncludedDestinations.add(destiantion); 111 } 112 113 114 public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { 115 this.dynamicallyIncludedDestinations.add(destiantion); 116 } 117 118 public ConnectionFilter getConnectionFilter() { 119 return connectionFilter; 120 } 121 122 public void setConnectionFilter(ConnectionFilter connectionFilter) { 123 this.connectionFilter = connectionFilter; 124 } 125 126 // Implementation methods 127 // ------------------------------------------------------------------------- 128 protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) { 129 List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations(); 130 ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]); 131 result.setDynamicallyIncludedDestinations(dests); 132 destsList = getExcludedDestinations(); 133 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 134 result.setExcludedDestinations(dests); 135 destsList = getStaticallyIncludedDestinations(); 136 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 137 result.setStaticallyIncludedDestinations(dests); 138 result.setDurableDestinations(getDurableTopicDestinations(durableDestinations)); 139 return result; 140 } 141 142 protected Transport createLocalTransport() throws Exception { 143 return TransportFactory.connect(localURI); 144 } 145 146 public static ActiveMQDestination[] getDurableTopicDestinations(final Set<ActiveMQDestination> durableDestinations) { 147 if (durableDestinations != null) { 148 149 HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>(); 150 for (ActiveMQDestination d : durableDestinations) { 151 if( d.isTopic() ) { 152 topics.add(d); 153 } 154 } 155 156 ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; 157 dest = topics.toArray(dest); 158 return dest; 159 } 160 return null; 161 } 162 163 @Override 164 public void start() throws Exception { 165 serviceSupport.start(); 166 } 167 168 @Override 169 public void stop() throws Exception { 170 serviceSupport.stop(); 171 } 172 173 protected void handleStart() throws Exception { 174 if (localURI == null) { 175 throw new IllegalStateException("You must configure the 'localURI' property"); 176 } 177 LOG.info("Network Connector {} started", this); 178 } 179 180 protected void handleStop(ServiceStopper stopper) throws Exception { 181 LOG.info("Network Connector {} stopped", this); 182 } 183 184 public boolean isStarted() { 185 return serviceSupport.isStarted(); 186 } 187 188 public boolean isStopped() { 189 return serviceSupport.isStopped(); 190 } 191 192 public boolean isStopping() { 193 return serviceSupport.isStopping(); 194 } 195 196 public ObjectName getObjectName() { 197 return objectName; 198 } 199 200 public void setObjectName(ObjectName objectName) { 201 this.objectName = objectName; 202 } 203 204 public BrokerService getBrokerService() { 205 return brokerService; 206 } 207 208 public void setBrokerService(BrokerService brokerService) { 209 this.brokerService = brokerService; 210 } 211 212 protected void registerNetworkBridgeMBean(NetworkBridge bridge) { 213 if (!getBrokerService().isUseJmx()) { 214 return; 215 } 216 NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); 217 try { 218 ObjectName objectName = createNetworkBridgeObjectName(bridge); 219 AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName); 220 } catch (Throwable e) { 221 LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e); 222 } 223 } 224 225 protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) { 226 if (!getBrokerService().isUseJmx()) { 227 return; 228 } 229 try { 230 ObjectName objectName = createNetworkBridgeObjectName(bridge); 231 getBrokerService().getManagementContext().unregisterMBean(objectName); 232 } catch (Throwable e) { 233 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 234 } 235 } 236 237 protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { 238 return BrokerMBeanSupport.createNetworkBridgeObjectName(getObjectName(), bridge.getRemoteAddress()); 239 } 240 241 // ask all the bridges as we can't know to which this consumer is tied 242 public boolean removeDemandSubscription(ConsumerId consumerId) { 243 boolean removeSucceeded = false; 244 for (NetworkBridge bridge : bridges.values()) { 245 if (bridge instanceof DemandForwardingBridgeSupport) { 246 DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge; 247 if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) { 248 removeSucceeded = true; 249 break; 250 } 251 } 252 } 253 return removeSucceeded; 254 } 255 256 public Collection<NetworkBridge> activeBridges() { 257 return bridges.values(); 258 } 259}