001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.udp; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.net.UnknownHostException; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import org.apache.activemq.openwire.OpenWireFormat; 027 import org.apache.activemq.transport.CommandJoiner; 028 import org.apache.activemq.transport.InactivityMonitor; 029 import org.apache.activemq.transport.Transport; 030 import org.apache.activemq.transport.TransportFactory; 031 import org.apache.activemq.transport.TransportLoggerFactory; 032 import org.apache.activemq.transport.TransportServer; 033 import org.apache.activemq.transport.reliable.DefaultReplayStrategy; 034 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; 035 import org.apache.activemq.transport.reliable.ReliableTransport; 036 import org.apache.activemq.transport.reliable.ReplayStrategy; 037 import org.apache.activemq.transport.reliable.Replayer; 038 import org.apache.activemq.transport.tcp.TcpTransportFactory; 039 import org.apache.activemq.util.IOExceptionSupport; 040 import org.apache.activemq.util.IntSequenceGenerator; 041 import org.apache.activemq.util.IntrospectionSupport; 042 import org.apache.activemq.util.URISupport; 043 import org.apache.activemq.wireformat.WireFormat; 044 import org.slf4j.Logger; 045 import org.slf4j.LoggerFactory; 046 047 /** 048 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 049 * 050 */ 051 public class UdpTransportFactory extends TransportFactory { 052 053 private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class); 054 055 public TransportServer doBind(final URI location) throws IOException { 056 try { 057 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 058 if (options.containsKey("port")) { 059 throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax"); 060 } 061 WireFormat wf = createWireFormat(options); 062 int port = location.getPort(); 063 OpenWireFormat openWireFormat = asOpenWireFormat(wf); 064 UdpTransport transport = (UdpTransport) createTransport(location.getPort(), wf); 065 066 Transport configuredTransport = configure(transport, wf, options, true); 067 UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy()); 068 return server; 069 } catch (URISyntaxException e) { 070 throw IOExceptionSupport.create(e); 071 } catch (Exception e) { 072 throw IOExceptionSupport.create(e); 073 } 074 } 075 076 public Transport configure(Transport transport, WireFormat format, Map options) throws Exception { 077 return configure(transport, format, options, false); 078 } 079 080 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 081 IntrospectionSupport.setProperties(transport, options); 082 final UdpTransport udpTransport = (UdpTransport)transport; 083 084 // deal with fragmentation 085 transport = new CommandJoiner(transport, asOpenWireFormat(format)); 086 087 if (udpTransport.isTrace()) { 088 try { 089 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); 090 } catch (Throwable e) { 091 log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e); 092 } 093 } 094 095 transport = new InactivityMonitor(transport, format); 096 097 if (format instanceof OpenWireFormat) { 098 transport = configureClientSideNegotiator(transport, format, udpTransport); 099 } 100 101 return transport; 102 } 103 104 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { 105 OpenWireFormat wireFormat = asOpenWireFormat(wf); 106 return new UdpTransport(wireFormat, location); 107 } 108 109 protected Transport createTransport(int port, WireFormat wf) throws UnknownHostException, IOException { 110 OpenWireFormat wireFormat = asOpenWireFormat(wf); 111 return new UdpTransport(wireFormat, port); 112 } 113 114 /** 115 * Configures the transport 116 * 117 * @param acceptServer true if this transport is used purely as an 'accept' 118 * transport for new connections which work like TCP 119 * SocketServers where new connections spin up a new separate 120 * UDP transport 121 */ 122 protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception { 123 IntrospectionSupport.setProperties(transport, options); 124 UdpTransport udpTransport = (UdpTransport)transport; 125 126 OpenWireFormat openWireFormat = asOpenWireFormat(format); 127 128 if (udpTransport.isTrace()) { 129 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); 130 } 131 132 transport = new InactivityMonitor(transport, format); 133 134 if (!acceptServer && format instanceof OpenWireFormat) { 135 transport = configureClientSideNegotiator(transport, format, udpTransport); 136 } 137 138 // deal with fragmentation 139 140 if (acceptServer) { 141 // lets not support a buffer of messages to enable reliable 142 // messaging on the 'accept server' transport 143 udpTransport.setReplayEnabled(false); 144 145 // we don't want to do reliable checks on this transport as we 146 // delegate to one that does 147 transport = new CommandJoiner(transport, openWireFormat); 148 return transport; 149 } else { 150 ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport); 151 Replayer replayer = reliableTransport.getReplayer(); 152 reliableTransport.setReplayStrategy(createReplayStrategy(replayer)); 153 154 // Joiner must be on outside as the inbound messages must be 155 // processed by the reliable transport first 156 return new CommandJoiner(reliableTransport, openWireFormat); 157 } 158 } 159 160 protected ReplayStrategy createReplayStrategy(Replayer replayer) { 161 if (replayer != null) { 162 return new DefaultReplayStrategy(5); 163 } 164 return new ExceptionIfDroppedReplayStrategy(1); 165 } 166 167 protected ReplayStrategy createReplayStrategy() { 168 return new DefaultReplayStrategy(5); 169 } 170 171 protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { 172 return new ResponseRedirectInterceptor(transport, udpTransport); 173 } 174 175 protected OpenWireFormat asOpenWireFormat(WireFormat wf) { 176 OpenWireFormat answer = (OpenWireFormat)wf; 177 return answer; 178 } 179 }