001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.udp; 018 019 import java.io.IOException; 020 import java.net.InetSocketAddress; 021 import java.net.SocketAddress; 022 import java.net.URI; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import org.apache.activemq.command.BrokerInfo; 027 import org.apache.activemq.command.Command; 028 import org.apache.activemq.openwire.OpenWireFormat; 029 import org.apache.activemq.transport.CommandJoiner; 030 import org.apache.activemq.transport.InactivityMonitor; 031 import org.apache.activemq.transport.Transport; 032 import org.apache.activemq.transport.TransportListener; 033 import org.apache.activemq.transport.TransportServer; 034 import org.apache.activemq.transport.TransportServerSupport; 035 import org.apache.activemq.transport.reliable.ReliableTransport; 036 import org.apache.activemq.transport.reliable.ReplayStrategy; 037 import org.apache.activemq.transport.reliable.Replayer; 038 import org.apache.activemq.util.ServiceStopper; 039 import org.slf4j.Logger; 040 import org.slf4j.LoggerFactory; 041 042 /** 043 * A UDP based implementation of {@link TransportServer} 044 * 045 * 046 */ 047 048 public class UdpTransportServer extends TransportServerSupport { 049 private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class); 050 051 private UdpTransport serverTransport; 052 private ReplayStrategy replayStrategy; 053 private Transport configuredTransport; 054 private boolean usingWireFormatNegotiation; 055 private Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>(); 056 057 public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) { 058 super(connectURI); 059 this.serverTransport = serverTransport; 060 this.configuredTransport = configuredTransport; 061 this.replayStrategy = replayStrategy; 062 } 063 064 public String toString() { 065 return "UdpTransportServer@" + serverTransport; 066 } 067 068 public void run() { 069 } 070 071 public UdpTransport getServerTransport() { 072 return serverTransport; 073 } 074 075 public void setBrokerInfo(BrokerInfo brokerInfo) { 076 } 077 078 protected void doStart() throws Exception { 079 LOG.info("Starting " + this); 080 081 configuredTransport.setTransportListener(new TransportListener() { 082 public void onCommand(Object o) { 083 final Command command = (Command)o; 084 processInboundConnection(command); 085 } 086 087 public void onException(IOException error) { 088 LOG.error("Caught: " + error, error); 089 } 090 091 public void transportInterupted() { 092 } 093 094 public void transportResumed() { 095 } 096 }); 097 configuredTransport.start(); 098 } 099 100 protected void doStop(ServiceStopper stopper) throws Exception { 101 configuredTransport.stop(); 102 } 103 104 protected void processInboundConnection(Command command) { 105 DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom(); 106 if (LOG.isDebugEnabled()) { 107 LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command); 108 } 109 Transport transport = null; 110 synchronized (transports) { 111 transport = transports.get(endpoint); 112 if (transport == null) { 113 if (usingWireFormatNegotiation && !command.isWireFormatInfo()) { 114 LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command); 115 } else { 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("Creating a new UDP server connection"); 118 } 119 try { 120 transport = createTransport(command, endpoint); 121 transport = configureTransport(transport); 122 transports.put(endpoint, transport); 123 } catch (IOException e) { 124 LOG.error("Caught: " + e, e); 125 getAcceptListener().onAcceptError(e); 126 } 127 } 128 } else { 129 LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command); 130 } 131 } 132 } 133 134 protected Transport configureTransport(Transport transport) { 135 transport = new InactivityMonitor(transport, serverTransport.getWireFormat()); 136 getAcceptListener().onAccept(transport); 137 return transport; 138 } 139 140 protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException { 141 if (endpoint == null) { 142 throw new IOException("No endpoint available for command: " + command); 143 } 144 final SocketAddress address = endpoint.getAddress(); 145 final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); 146 final UdpTransport transport = new UdpTransport(connectionWireFormat, address); 147 148 final ReliableTransport reliableTransport = new ReliableTransport(transport, transport); 149 reliableTransport.getReplayer(); 150 reliableTransport.setReplayStrategy(replayStrategy); 151 152 // Joiner must be on outside as the inbound messages must be processed 153 // by the reliable transport first 154 return new CommandJoiner(reliableTransport, connectionWireFormat) { 155 public void start() throws Exception { 156 super.start(); 157 reliableTransport.onCommand(command); 158 } 159 }; 160 161 /** 162 * final WireFormatNegotiator wireFormatNegotiator = new 163 * WireFormatNegotiator(configuredTransport, transport.getWireFormat(), 164 * serverTransport .getMinmumWireFormatVersion()) { public void start() 165 * throws Exception { super.start(); log.debug("Starting a new server 166 * transport: " + this + " with command: " + command); 167 * onCommand(command); } // lets use the specific addressing of wire 168 * format protected void sendWireFormat(WireFormatInfo info) throws 169 * IOException { log.debug("#### we have negotiated the wireformat; 170 * sending a wireformat to: " + address); transport.oneway(info, 171 * address); } }; return wireFormatNegotiator; 172 */ 173 } 174 175 public InetSocketAddress getSocketAddress() { 176 return serverTransport.getLocalSocketAddress(); 177 } 178 }