001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.CopyOnWriteArrayList; 029 030import javax.management.MalformedObjectNameException; 031import javax.management.ObjectName; 032 033import org.apache.activemq.Service; 034import org.apache.activemq.broker.BrokerService; 035import org.apache.activemq.broker.jmx.AnnotatedMBean; 036import org.apache.activemq.broker.jmx.NetworkBridgeView; 037import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ConsumerId; 040import org.apache.activemq.transport.Transport; 041import org.apache.activemq.transport.TransportFactory; 042import org.apache.activemq.util.JMXSupport; 043import org.apache.activemq.util.ServiceStopper; 044import org.apache.activemq.util.ServiceSupport; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * 050 */ 051public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { 052 053 private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class); 054 protected URI localURI; 055 protected ConnectionFilter connectionFilter; 056 protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>(); 057 058 protected ServiceSupport serviceSupport = new ServiceSupport() { 059 060 protected void doStart() throws Exception { 061 handleStart(); 062 } 063 064 protected void doStop(ServiceStopper stopper) throws Exception { 065 handleStop(stopper); 066 } 067 }; 068 069 private Set<ActiveMQDestination> durableDestinations; 070 private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); 071 private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); 072 private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); 073 private BrokerService brokerService; 074 private ObjectName objectName; 075 076 public NetworkConnector() { 077 } 078 079 public NetworkConnector(URI localURI) { 080 this.localURI = localURI; 081 } 082 083 public URI getLocalUri() throws URISyntaxException { 084 return localURI; 085 } 086 087 public void setLocalUri(URI localURI) { 088 this.localURI = localURI; 089 } 090 091 /** 092 * @return Returns the durableDestinations. 093 */ 094 public Set getDurableDestinations() { 095 return durableDestinations; 096 } 097 098 /** 099 * @param durableDestinations The durableDestinations to set. 100 */ 101 public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) { 102 this.durableDestinations = durableDestinations; 103 } 104 105 /** 106 * @return Returns the excludedDestinations. 107 */ 108 public List<ActiveMQDestination> getExcludedDestinations() { 109 return excludedDestinations; 110 } 111 112 /** 113 * @param excludedDestinations The excludedDestinations to set. 114 */ 115 public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) { 116 this.excludedDestinations = excludedDestinations; 117 } 118 119 public void addExcludedDestination(ActiveMQDestination destiantion) { 120 this.excludedDestinations.add(destiantion); 121 } 122 123 /** 124 * @return Returns the staticallyIncludedDestinations. 125 */ 126 public List<ActiveMQDestination> getStaticallyIncludedDestinations() { 127 return staticallyIncludedDestinations; 128 } 129 130 /** 131 * @param staticallyIncludedDestinations The staticallyIncludedDestinations 132 * to set. 133 */ 134 public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) { 135 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 136 } 137 138 public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { 139 this.staticallyIncludedDestinations.add(destiantion); 140 } 141 142 /** 143 * @return Returns the dynamicallyIncludedDestinations. 144 */ 145 public List<ActiveMQDestination> getDynamicallyIncludedDestinations() { 146 return dynamicallyIncludedDestinations; 147 } 148 149 /** 150 * @param dynamicallyIncludedDestinations The 151 * dynamicallyIncludedDestinations to set. 152 */ 153 public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) { 154 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 155 } 156 157 public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { 158 this.dynamicallyIncludedDestinations.add(destiantion); 159 } 160 161 public ConnectionFilter getConnectionFilter() { 162 return connectionFilter; 163 } 164 165 public void setConnectionFilter(ConnectionFilter connectionFilter) { 166 this.connectionFilter = connectionFilter; 167 } 168 169 // Implementation methods 170 // ------------------------------------------------------------------------- 171 protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) { 172 List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations(); 173 ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]); 174 result.setDynamicallyIncludedDestinations(dests); 175 destsList = getExcludedDestinations(); 176 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 177 result.setExcludedDestinations(dests); 178 destsList = getStaticallyIncludedDestinations(); 179 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 180 result.setStaticallyIncludedDestinations(dests); 181 if (durableDestinations != null) { 182 183 HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>(); 184 for (ActiveMQDestination d : durableDestinations) { 185 if( d.isTopic() ) { 186 topics.add(d); 187 } 188 } 189 190 ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; 191 dest = (ActiveMQDestination[])topics.toArray(dest); 192 result.setDurableDestinations(dest); 193 } 194 return result; 195 } 196 197 protected Transport createLocalTransport() throws Exception { 198 return TransportFactory.connect(localURI); 199 } 200 201 public void start() throws Exception { 202 serviceSupport.start(); 203 } 204 205 public void stop() throws Exception { 206 serviceSupport.stop(); 207 } 208 209 protected void handleStart() throws Exception { 210 if (localURI == null) { 211 throw new IllegalStateException("You must configure the 'localURI' property"); 212 } 213 LOG.info("Network Connector " + this + " Started"); 214 } 215 216 protected void handleStop(ServiceStopper stopper) throws Exception { 217 LOG.info("Network Connector " + this + " Stopped"); 218 } 219 220 public boolean isStarted() { 221 return serviceSupport.isStarted(); 222 } 223 224 public boolean isStopped() { 225 return serviceSupport.isStopped(); 226 } 227 228 public boolean isStopping() { 229 return serviceSupport.isStopping(); 230 } 231 232 public ObjectName getObjectName() { 233 return objectName; 234 } 235 236 public void setObjectName(ObjectName objectName) { 237 this.objectName = objectName; 238 } 239 240 public BrokerService getBrokerService() { 241 return brokerService; 242 } 243 244 public void setBrokerService(BrokerService brokerService) { 245 this.brokerService = brokerService; 246 } 247 248 protected void registerNetworkBridgeMBean(NetworkBridge bridge) { 249 if (!getBrokerService().isUseJmx()) { 250 return; 251 } 252 NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); 253 try { 254 ObjectName objectName = createNetworkBridgeObjectName(bridge); 255 AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName); 256 } catch (Throwable e) { 257 LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e); 258 } 259 } 260 261 protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) { 262 if (!getBrokerService().isUseJmx()) { 263 return; 264 } 265 try { 266 ObjectName objectName = createNetworkBridgeObjectName(bridge); 267 getBrokerService().getManagementContext().unregisterMBean(objectName); 268 } catch (Throwable e) { 269 LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); 270 } 271 } 272 273 274 @SuppressWarnings("unchecked") 275 protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { 276 ObjectName connectorName = getObjectName(); 277 Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList()); 278 return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge," 279 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name=" 280 + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress()))); 281 } 282 283 // ask all the bridges as we can't know to which this consumer is tied 284 public boolean removeDemandSubscription(ConsumerId consumerId) { 285 boolean removeSucceeded = false; 286 for (NetworkBridge bridge : bridges.values()) { 287 if (bridge instanceof DemandForwardingBridgeSupport) { 288 DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge; 289 if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) { 290 removeSucceeded = true; 291 break; 292 } 293 } 294 } 295 return removeSucceeded; 296 } 297 298 public Collection<NetworkBridge> activeBridges() { 299 return bridges.values(); 300 } 301 302}