001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.SocketAddress;
022import java.net.URI;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.BrokerInfo;
027import org.apache.activemq.command.Command;
028import org.apache.activemq.openwire.OpenWireFormat;
029import org.apache.activemq.transport.CommandJoiner;
030import org.apache.activemq.transport.InactivityMonitor;
031import org.apache.activemq.transport.Transport;
032import org.apache.activemq.transport.TransportListener;
033import org.apache.activemq.transport.TransportServer;
034import org.apache.activemq.transport.TransportServerSupport;
035import org.apache.activemq.transport.reliable.ReliableTransport;
036import org.apache.activemq.transport.reliable.ReplayStrategy;
037import org.apache.activemq.transport.reliable.Replayer;
038import org.apache.activemq.util.ServiceStopper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A UDP based implementation of {@link TransportServer}
044 * 
045 * 
046 */
047
048public class UdpTransportServer extends TransportServerSupport {
049    private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class);
050
051    private UdpTransport serverTransport;
052    private ReplayStrategy replayStrategy;
053    private Transport configuredTransport;
054    private boolean usingWireFormatNegotiation;
055    private Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
056
057    public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
058        super(connectURI);
059        this.serverTransport = serverTransport;
060        this.configuredTransport = configuredTransport;
061        this.replayStrategy = replayStrategy;
062    }
063
064    public String toString() {
065        return "UdpTransportServer@" + serverTransport;
066    }
067
068    public void run() {
069    }
070
071    public UdpTransport getServerTransport() {
072        return serverTransport;
073    }
074
075    public void setBrokerInfo(BrokerInfo brokerInfo) {
076    }
077
078    protected void doStart() throws Exception {
079        LOG.info("Starting " + this);
080
081        configuredTransport.setTransportListener(new TransportListener() {
082            public void onCommand(Object o) {
083                final Command command = (Command)o;
084                processInboundConnection(command);
085            }
086
087            public void onException(IOException error) {
088                LOG.error("Caught: " + error, error);
089            }
090
091            public void transportInterupted() {
092            }
093
094            public void transportResumed() {
095            }
096        });
097        configuredTransport.start();
098    }
099
100    protected void doStop(ServiceStopper stopper) throws Exception {
101        configuredTransport.stop();
102    }
103
104    protected void processInboundConnection(Command command) {
105        DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom();
106        if (LOG.isDebugEnabled()) {
107            LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
108        }
109        Transport transport = null;
110        synchronized (transports) {
111            transport = transports.get(endpoint);
112            if (transport == null) {
113                if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
114                    LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
115                } else {
116                    if (LOG.isDebugEnabled()) {
117                        LOG.debug("Creating a new UDP server connection");
118                    }
119                    try {
120                        transport = createTransport(command, endpoint);
121                        transport = configureTransport(transport);
122                        transports.put(endpoint, transport);
123                    } catch (IOException e) {
124                        LOG.error("Caught: " + e, e);
125                        getAcceptListener().onAcceptError(e);
126                    }
127                }
128            } else {
129                LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
130            }
131        }
132    }
133
134    protected Transport configureTransport(Transport transport) {
135        transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
136        getAcceptListener().onAccept(transport);
137        return transport;
138    }
139
140    protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
141        if (endpoint == null) {
142            throw new IOException("No endpoint available for command: " + command);
143        }
144        final SocketAddress address = endpoint.getAddress();
145        final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
146        final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
147
148        final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
149        reliableTransport.getReplayer();
150        reliableTransport.setReplayStrategy(replayStrategy);
151
152        // Joiner must be on outside as the inbound messages must be processed
153        // by the reliable transport first
154        return new CommandJoiner(reliableTransport, connectionWireFormat) {
155            public void start() throws Exception {
156                super.start();
157                reliableTransport.onCommand(command);
158            }
159        };
160
161        /**
162         * final WireFormatNegotiator wireFormatNegotiator = new
163         * WireFormatNegotiator(configuredTransport, transport.getWireFormat(),
164         * serverTransport .getMinmumWireFormatVersion()) { public void start()
165         * throws Exception { super.start(); log.debug("Starting a new server
166         * transport: " + this + " with command: " + command);
167         * onCommand(command); } // lets use the specific addressing of wire
168         * format protected void sendWireFormat(WireFormatInfo info) throws
169         * IOException { log.debug("#### we have negotiated the wireformat;
170         * sending a wireformat to: " + address); transport.oneway(info,
171         * address); } }; return wireFormatNegotiator;
172         */
173    }
174
175    public InetSocketAddress getSocketAddress() {
176        return serverTransport.getLocalSocketAddress();
177    }
178}