001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.util.HashMap;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.concurrent.TimeUnit;
033
034import javax.net.ServerSocketFactory;
035
036import org.apache.activemq.Service;
037import org.apache.activemq.ThreadPriorities;
038import org.apache.activemq.command.BrokerInfo;
039import org.apache.activemq.openwire.OpenWireFormatFactory;
040import org.apache.activemq.transport.Transport;
041import org.apache.activemq.transport.TransportLoggerFactory;
042import org.apache.activemq.transport.TransportServer;
043import org.apache.activemq.transport.TransportServerThreadSupport;
044import org.apache.activemq.util.IOExceptionSupport;
045import org.apache.activemq.util.InetAddressUtil;
046import org.apache.activemq.util.IntrospectionSupport;
047import org.apache.activemq.util.ServiceListener;
048import org.apache.activemq.util.ServiceStopper;
049import org.apache.activemq.util.ServiceSupport;
050import org.apache.activemq.wireformat.WireFormat;
051import org.apache.activemq.wireformat.WireFormatFactory;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A TCP based implementation of {@link TransportServer}
057 * 
058 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
059 * 
060 */
061
062public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
063
064    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
065    protected ServerSocket serverSocket;
066    protected int backlog = 5000;
067    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
068    protected final TcpTransportFactory transportFactory;
069    protected long maxInactivityDuration = 30000;
070    protected long maxInactivityDurationInitalDelay = 10000;
071    protected int minmumWireFormatVersion;
072    protected boolean useQueueForAccept=true;
073       
074    /**
075     * trace=true -> the Transport stack where this TcpTransport
076     * object will be, will have a TransportLogger layer
077     * trace=false -> the Transport stack where this TcpTransport
078     * object will be, will NOT have a TransportLogger layer, and therefore
079     * will never be able to print logging messages.
080     * This parameter is most probably set in Connection or TransportConnector URIs.
081     */
082    protected boolean trace = false;
083
084    protected int soTimeout = 0;
085    protected int socketBufferSize = 64 * 1024;
086    protected int connectionTimeout =  30000;
087
088    /**
089     * Name of the LogWriter implementation to use.
090     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
091     * This parameter is most probably set in Connection or TransportConnector URIs.
092     */
093    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
094    /**
095     * Specifies if the TransportLogger will be manageable by JMX or not.
096     * Also, as long as there is at least 1 TransportLogger which is manageable,
097     * a TransportLoggerControl MBean will me created.
098     */
099    protected boolean dynamicManagement = false;
100    /**
101     * startLogging=true -> the TransportLogger object of the Transport stack
102     * will initially write messages to the log.
103     * startLogging=false -> the TransportLogger object of the Transport stack
104     * will initially NOT write messages to the log.
105     * This parameter only has an effect if trace == true.
106     * This parameter is most probably set in Connection or TransportConnector URIs.
107     */
108    protected boolean startLogging = true;
109    protected final ServerSocketFactory serverSocketFactory;
110    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
111    protected Thread socketHandlerThread;
112    /**
113     * The maximum number of sockets allowed for this server
114     */
115    protected int maximumConnections = Integer.MAX_VALUE;
116    protected int currentTransportCount=0;
117  
118    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
119        super(location);
120        this.transportFactory = transportFactory;
121        this.serverSocketFactory = serverSocketFactory;
122        
123    }
124
125    public void bind() throws IOException {
126        URI bind = getBindLocation();
127
128        String host = bind.getHost();
129        host = (host == null || host.length() == 0) ? "localhost" : host;
130        InetAddress addr = InetAddress.getByName(host);
131
132        try {
133
134            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
135            configureServerSocket(this.serverSocket);
136            
137        } catch (IOException e) {
138            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
139        }
140        try {
141            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
142                .getFragment()));
143        } catch (URISyntaxException e) {
144
145            // it could be that the host name contains invalid characters such
146            // as _ on unix platforms
147            // so lets try use the IP address instead
148            try {
149                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
150            } catch (URISyntaxException e2) {
151                throw IOExceptionSupport.create(e2);
152            }
153        }
154    }
155
156    private void configureServerSocket(ServerSocket socket) throws SocketException {
157        socket.setSoTimeout(2000);
158        if (transportOptions != null) {
159            IntrospectionSupport.setProperties(socket, transportOptions);
160        }
161    }
162
163    /**
164     * @return Returns the wireFormatFactory.
165     */
166    public WireFormatFactory getWireFormatFactory() {
167        return wireFormatFactory;
168    }
169
170    /**
171     * @param wireFormatFactory The wireFormatFactory to set.
172     */
173    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
174        this.wireFormatFactory = wireFormatFactory;
175    }
176
177    /**
178     * Associates a broker info with the transport server so that the transport
179     * can do discovery advertisements of the broker.
180     * 
181     * @param brokerInfo
182     */
183    public void setBrokerInfo(BrokerInfo brokerInfo) {
184    }
185
186    public long getMaxInactivityDuration() {
187        return maxInactivityDuration;
188    }
189
190    public void setMaxInactivityDuration(long maxInactivityDuration) {
191        this.maxInactivityDuration = maxInactivityDuration;
192    }
193    
194    public long getMaxInactivityDurationInitalDelay() {
195        return this.maxInactivityDurationInitalDelay;
196    }
197
198    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
199        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
200    }
201
202    public int getMinmumWireFormatVersion() {
203        return minmumWireFormatVersion;
204    }
205
206    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
207        this.minmumWireFormatVersion = minmumWireFormatVersion;
208    }
209
210    public boolean isTrace() {
211        return trace;
212    }
213
214    public void setTrace(boolean trace) {
215        this.trace = trace;
216    }
217    
218    public String getLogWriterName() {
219        return logWriterName;
220    }
221
222    public void setLogWriterName(String logFormat) {
223        this.logWriterName = logFormat;
224    }        
225
226    public boolean isDynamicManagement() {
227        return dynamicManagement;
228    }
229
230    public void setDynamicManagement(boolean useJmx) {
231        this.dynamicManagement = useJmx;
232    }
233
234    public boolean isStartLogging() {
235        return startLogging;
236    }
237
238
239    public void setStartLogging(boolean startLogging) {
240        this.startLogging = startLogging;
241    }
242    
243    /**
244     * @return the backlog
245     */
246    public int getBacklog() {
247        return backlog;
248    }
249
250    /**
251     * @param backlog the backlog to set
252     */
253    public void setBacklog(int backlog) {
254        this.backlog = backlog;
255    }
256
257    /**
258     * @return the useQueueForAccept
259     */
260    public boolean isUseQueueForAccept() {
261        return useQueueForAccept;
262    }
263
264    /**
265     * @param useQueueForAccept the useQueueForAccept to set
266     */
267    public void setUseQueueForAccept(boolean useQueueForAccept) {
268        this.useQueueForAccept = useQueueForAccept;
269    }
270    
271
272    /**
273     * pull Sockets from the ServerSocket
274     */
275    public void run() {
276        while (!isStopped()) {
277            Socket socket = null;
278            try {
279                socket = serverSocket.accept();
280                if (socket != null) {
281                    if (isStopped() || getAcceptListener() == null) {
282                        socket.close();
283                    } else {
284                        if (useQueueForAccept) {
285                            socketQueue.put(socket);
286                        }else {
287                            handleSocket(socket);
288                        }
289                    }
290                }
291            } catch (SocketTimeoutException ste) {
292                // expect this to happen
293            } catch (Exception e) {
294                if (!isStopping()) {
295                    onAcceptError(e);
296                } else if (!isStopped()) {
297                    LOG.warn("run()", e);
298                    onAcceptError(e);
299                }
300            }
301        }
302    }
303
304    /**
305     * Allow derived classes to override the Transport implementation that this
306     * transport server creates.
307     * 
308     * @param socket
309     * @param format
310     * @return
311     * @throws IOException
312     */
313    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
314        return new TcpTransport(format, socket);
315    }
316
317    /**
318     * @return pretty print of this
319     */
320    public String toString() {
321        return "" + getBindLocation();
322    }
323
324    /**
325     * @param socket 
326     * @param inetAddress
327     * @return real hostName
328     * @throws UnknownHostException
329     */
330    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
331        String result = null;
332        if (socket.isBound()) {
333            if (socket.getInetAddress().isAnyLocalAddress()) {
334                // make it more human readable and useful, an alternative to 0.0.0.0
335                result = InetAddressUtil.getLocalHostName();
336            } else {
337                result = socket.getInetAddress().getCanonicalHostName();
338            }
339        } else {
340            result = bindAddress.getCanonicalHostName();
341        }
342        return result;
343    }
344    
345    protected void doStart() throws Exception {
346        if(useQueueForAccept) {
347            Runnable run = new Runnable() {
348                public void run() {
349                    try {
350                        while (!isStopped() && !isStopping()) {
351                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
352                            if (sock != null) {
353                                handleSocket(sock);
354                            }
355                        }
356    
357                    } catch (InterruptedException e) {
358                        LOG.info("socketQueue interuppted - stopping");
359                        if (!isStopping()) {
360                            onAcceptError(e);
361                        }
362                    }
363    
364                }
365    
366            };
367            socketHandlerThread = new Thread(null, run,
368                    "ActiveMQ Transport Server Thread Handler: " + toString(),
369                    getStackSize());
370            socketHandlerThread.setDaemon(true);
371            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
372            socketHandlerThread.start();
373        }
374        super.doStart();
375        
376    }
377
378    protected void doStop(ServiceStopper stopper) throws Exception {
379        super.doStop(stopper);
380        if (serverSocket != null) {
381            serverSocket.close();
382        }
383    }
384
385    public InetSocketAddress getSocketAddress() {
386        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
387    }
388
389    protected final void handleSocket(Socket socket) {
390        try {
391            if (this.currentTransportCount >= this.maximumConnections) {
392                throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 
393                    "number of allowed client connections. See the 'maximumConnections' " + 
394                    "property on the TCP transport configuration URI in the ActiveMQ " + 
395                    "configuration file (e.g., activemq.xml)"); 
396                
397            } else {
398                HashMap<String, Object> options = new HashMap<String, Object>();
399                options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
400                options.put("maxInactivityDurationInitalDelay", 
401                    Long.valueOf(maxInactivityDurationInitalDelay));
402                options.put("minmumWireFormatVersion", 
403                    Integer.valueOf(minmumWireFormatVersion));
404                options.put("trace", Boolean.valueOf(trace));
405                options.put("soTimeout", Integer.valueOf(soTimeout));
406                options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
407                options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
408                options.put("logWriterName", logWriterName);
409                options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
410                options.put("startLogging", Boolean.valueOf(startLogging));
411                options.putAll(transportOptions);
412
413                WireFormat format = wireFormatFactory.createWireFormat();
414                Transport transport = createTransport(socket, format);
415
416                if (transport instanceof ServiceSupport) {
417                    ((ServiceSupport) transport).addServiceListener(this);
418                }
419
420                Transport configuredTransport = 
421                    transportFactory.serverConfigure( transport, format, options);
422
423                getAcceptListener().onAccept(configuredTransport);
424            }
425        } catch (SocketTimeoutException ste) {
426            // expect this to happen
427        } catch (Exception e) {
428            if (!isStopping()) {
429                onAcceptError(e);
430            } else if (!isStopped()) {
431                LOG.warn("run()", e);
432                onAcceptError(e);
433            }
434        }
435        
436    }    
437
438        public int getSoTimeout() {
439                return soTimeout;
440        }
441
442        public void setSoTimeout(int soTimeout) {
443                this.soTimeout = soTimeout;
444        }
445
446        public int getSocketBufferSize() {
447                return socketBufferSize;
448        }
449
450        public void setSocketBufferSize(int socketBufferSize) {
451                this.socketBufferSize = socketBufferSize;
452        }
453
454        public int getConnectionTimeout() {
455                return connectionTimeout;
456        }
457
458        public void setConnectionTimeout(int connectionTimeout) {
459                this.connectionTimeout = connectionTimeout;
460        }
461
462    /**
463     * @return the maximumConnections
464     */
465    public int getMaximumConnections() {
466        return maximumConnections;
467    }
468
469    /**
470     * @param maximumConnections the maximumConnections to set
471     */
472    public void setMaximumConnections(int maximumConnections) {
473        this.maximumConnections = maximumConnections;
474    }
475
476    
477    public void started(Service service) {
478       this.currentTransportCount++;
479    }
480
481    public void stopped(Service service) {
482        this.currentTransportCount--;
483    }
484}