001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport; 018 019 import java.io.IOException; 020 import java.net.MalformedURLException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.net.UnknownHostException; 024 import java.util.HashMap; 025 import java.util.Map; 026 import java.util.concurrent.ConcurrentHashMap; 027 import java.util.concurrent.Executor; 028 029 import org.apache.activemq.broker.BrokerService; 030 import org.apache.activemq.broker.BrokerServiceAware; 031 import org.apache.activemq.broker.SslContext; 032 import org.apache.activemq.util.FactoryFinder; 033 import org.apache.activemq.util.IOExceptionSupport; 034 import org.apache.activemq.util.IntrospectionSupport; 035 import org.apache.activemq.util.URISupport; 036 import org.apache.activemq.wireformat.WireFormat; 037 import org.apache.activemq.wireformat.WireFormatFactory; 038 039 public abstract class TransportFactory { 040 041 private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); 042 private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); 043 private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>(); 044 045 private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout"; 046 private static final String THREAD_NAME_FILTER = "threadName"; 047 048 public abstract TransportServer doBind(URI location) throws IOException; 049 050 public Transport doConnect(URI location, Executor ex) throws Exception { 051 return doConnect(location); 052 } 053 054 public Transport doCompositeConnect(URI location, Executor ex) throws Exception { 055 return doCompositeConnect(location); 056 } 057 058 /** 059 * Creates a normal transport. 060 * 061 * @param location 062 * @return the transport 063 * @throws Exception 064 */ 065 public static Transport connect(URI location) throws Exception { 066 TransportFactory tf = findTransportFactory(location); 067 return tf.doConnect(location); 068 } 069 070 /** 071 * Creates a normal transport. 072 * 073 * @param location 074 * @param ex 075 * @return the transport 076 * @throws Exception 077 */ 078 public static Transport connect(URI location, Executor ex) throws Exception { 079 TransportFactory tf = findTransportFactory(location); 080 return tf.doConnect(location, ex); 081 } 082 083 /** 084 * Creates a slimmed down transport that is more efficient so that it can be 085 * used by composite transports like reliable and HA. 086 * 087 * @param location 088 * @return the Transport 089 * @throws Exception 090 */ 091 public static Transport compositeConnect(URI location) throws Exception { 092 TransportFactory tf = findTransportFactory(location); 093 return tf.doCompositeConnect(location); 094 } 095 096 /** 097 * Creates a slimmed down transport that is more efficient so that it can be 098 * used by composite transports like reliable and HA. 099 * 100 * @param location 101 * @param ex 102 * @return the Transport 103 * @throws Exception 104 */ 105 public static Transport compositeConnect(URI location, Executor ex) throws Exception { 106 TransportFactory tf = findTransportFactory(location); 107 return tf.doCompositeConnect(location, ex); 108 } 109 110 public static TransportServer bind(URI location) throws IOException { 111 TransportFactory tf = findTransportFactory(location); 112 return tf.doBind(location); 113 } 114 115 /** 116 * @deprecated 117 */ 118 public static TransportServer bind(String brokerId, URI location) throws IOException { 119 return bind(location); 120 } 121 122 public static TransportServer bind(BrokerService brokerService, URI location) throws IOException { 123 TransportFactory tf = findTransportFactory(location); 124 if( brokerService!=null && tf instanceof BrokerServiceAware ) { 125 ((BrokerServiceAware)tf).setBrokerService(brokerService); 126 } 127 try { 128 if( brokerService!=null ) { 129 SslContext.setCurrentSslContext(brokerService.getSslContext()); 130 } 131 return tf.doBind(location); 132 } finally { 133 SslContext.setCurrentSslContext(null); 134 } 135 } 136 137 public Transport doConnect(URI location) throws Exception { 138 try { 139 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 140 WireFormat wf = createWireFormat(options); 141 Transport transport = createTransport(location, wf); 142 Transport rc = configure(transport, wf, options); 143 if (!options.isEmpty()) { 144 throw new IllegalArgumentException("Invalid connect parameters: " + options); 145 } 146 return rc; 147 } catch (URISyntaxException e) { 148 throw IOExceptionSupport.create(e); 149 } 150 } 151 152 public Transport doCompositeConnect(URI location) throws Exception { 153 try { 154 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 155 WireFormat wf = createWireFormat(options); 156 Transport transport = createTransport(location, wf); 157 Transport rc = compositeConfigure(transport, wf, options); 158 if (!options.isEmpty()) { 159 throw new IllegalArgumentException("Invalid connect parameters: " + options); 160 } 161 return rc; 162 163 } catch (URISyntaxException e) { 164 throw IOExceptionSupport.create(e); 165 } 166 } 167 168 /** 169 * Allow registration of a transport factory without wiring via META-INF classes 170 * @param scheme 171 * @param tf 172 */ 173 public static void registerTransportFactory(String scheme, TransportFactory tf) { 174 TRANSPORT_FACTORYS.put(scheme, tf); 175 } 176 177 /** 178 * Factory method to create a new transport 179 * 180 * @throws IOException 181 * @throws UnknownHostException 182 */ 183 protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException { 184 throw new IOException("createTransport() method not implemented!"); 185 } 186 187 /** 188 * @param location 189 * @return 190 * @throws IOException 191 */ 192 private static TransportFactory findTransportFactory(URI location) throws IOException { 193 String scheme = location.getScheme(); 194 if (scheme == null) { 195 throw new IOException("Transport not scheme specified: [" + location + "]"); 196 } 197 TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); 198 if (tf == null) { 199 // Try to load if from a META-INF property. 200 try { 201 tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); 202 TRANSPORT_FACTORYS.put(scheme, tf); 203 } catch (Throwable e) { 204 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); 205 } 206 } 207 return tf; 208 } 209 210 protected WireFormat createWireFormat(Map<String, String> options) throws IOException { 211 WireFormatFactory factory = createWireFormatFactory(options); 212 WireFormat format = factory.createWireFormat(); 213 return format; 214 } 215 216 protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException { 217 String wireFormat = (String)options.remove("wireFormat"); 218 if (wireFormat == null) { 219 wireFormat = getDefaultWireFormatType(); 220 } 221 222 try { 223 WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat); 224 IntrospectionSupport.setProperties(wff, options, "wireFormat."); 225 return wff; 226 } catch (Throwable e) { 227 throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e); 228 } 229 } 230 231 protected String getDefaultWireFormatType() { 232 return "default"; 233 } 234 235 /** 236 * Fully configures and adds all need transport filters so that the 237 * transport can be used by the JMS client. 238 * 239 * @param transport 240 * @param wf 241 * @param options 242 * @return 243 * @throws Exception 244 */ 245 @SuppressWarnings("rawtypes") 246 public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { 247 transport = compositeConfigure(transport, wf, options); 248 249 transport = new MutexTransport(transport); 250 transport = new ResponseCorrelator(transport); 251 252 return transport; 253 } 254 255 /** 256 * Fully configures and adds all need transport filters so that the 257 * transport can be used by the ActiveMQ message broker. The main difference 258 * between this and the configure() method is that the broker does not issue 259 * requests to the client so the ResponseCorrelator is not needed. 260 * 261 * @param transport 262 * @param format 263 * @param options 264 * @return 265 * @throws Exception 266 */ 267 @SuppressWarnings("rawtypes") 268 public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { 269 if (options.containsKey(THREAD_NAME_FILTER)) { 270 transport = new ThreadNameFilter(transport); 271 } 272 transport = compositeConfigure(transport, format, options); 273 transport = new MutexTransport(transport); 274 return transport; 275 } 276 277 /** 278 * Similar to configure(...) but this avoid adding in the MutexTransport and 279 * ResponseCorrelator transport layers so that the resulting transport can 280 * more efficiently be used as part of a composite transport. 281 * 282 * @param transport 283 * @param format 284 * @param options 285 * @return 286 */ 287 @SuppressWarnings("rawtypes") 288 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 289 if (options.containsKey(WRITE_TIMEOUT_FILTER)) { 290 transport = new WriteTimeoutFilter(transport); 291 String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER); 292 if (soWriteTimeout!=null) { 293 ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout)); 294 } 295 } 296 IntrospectionSupport.setProperties(transport, options); 297 return transport; 298 } 299 300 @SuppressWarnings("rawtypes") 301 protected String getOption(Map options, String key, String def) { 302 String rc = (String) options.remove(key); 303 if( rc == null ) { 304 rc = def; 305 } 306 return rc; 307 } 308 }