001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.StringTokenizer;
025    import java.util.concurrent.CopyOnWriteArrayList;
026    import java.util.regex.Pattern;
027    
028    import javax.management.ObjectName;
029    import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030    import org.apache.activemq.broker.jmx.ManagementContext;
031    import org.apache.activemq.broker.region.ConnectorStatistics;
032    import org.apache.activemq.command.BrokerInfo;
033    import org.apache.activemq.command.ConnectionControl;
034    import org.apache.activemq.security.MessageAuthorizationPolicy;
035    import org.apache.activemq.thread.DefaultThreadPools;
036    import org.apache.activemq.thread.TaskRunnerFactory;
037    import org.apache.activemq.transport.Transport;
038    import org.apache.activemq.transport.TransportAcceptListener;
039    import org.apache.activemq.transport.TransportFactory;
040    import org.apache.activemq.transport.TransportServer;
041    import org.apache.activemq.transport.discovery.DiscoveryAgent;
042    import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.activemq.util.ServiceSupport;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * @org.apache.xbean.XBean
050     *
051     */
052    public class TransportConnector implements Connector, BrokerServiceAware {
053    
054        final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
055    
056        protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
057        protected TransportStatusDetector statusDector;
058        private BrokerService brokerService;
059        private TransportServer server;
060        private URI uri;
061        private BrokerInfo brokerInfo = new BrokerInfo();
062        private TaskRunnerFactory taskRunnerFactory;
063        private MessageAuthorizationPolicy messageAuthorizationPolicy;
064        private DiscoveryAgent discoveryAgent;
065        private final ConnectorStatistics statistics = new ConnectorStatistics();
066        private URI discoveryUri;
067        private URI connectUri;
068        private String name;
069        private boolean disableAsyncDispatch;
070        private boolean enableStatusMonitor = false;
071        private Broker broker;
072        private boolean updateClusterClients = false;
073        private boolean rebalanceClusterClients;
074        private boolean updateClusterClientsOnRemove = false;
075        private String updateClusterFilter;
076        private boolean auditNetworkProducers = false;
077        private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
078        private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
079    
080        LinkedList<String> peerBrokers = new LinkedList<String>();
081    
082        public TransportConnector() {
083        }
084    
085        public TransportConnector(TransportServer server) {
086            this();
087            setServer(server);
088            if (server != null && server.getConnectURI() != null) {
089                URI uri = server.getConnectURI();
090                if (uri != null && uri.getScheme().equals("vm")) {
091                    setEnableStatusMonitor(false);
092                }
093            }
094    
095        }
096    
097        /**
098         * @return Returns the connections.
099         */
100        public CopyOnWriteArrayList<TransportConnection> getConnections() {
101            return connections;
102        }
103    
104        /**
105         * Factory method to create a JMX managed version of this transport
106         * connector
107         */
108        public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
109                throws IOException, URISyntaxException {
110            ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
111            rc.setBrokerInfo(getBrokerInfo());
112            rc.setConnectUri(getConnectUri());
113            rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
114            rc.setDiscoveryAgent(getDiscoveryAgent());
115            rc.setDiscoveryUri(getDiscoveryUri());
116            rc.setEnableStatusMonitor(isEnableStatusMonitor());
117            rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
118            rc.setName(getName());
119            rc.setTaskRunnerFactory(getTaskRunnerFactory());
120            rc.setUri(getUri());
121            rc.setBrokerService(brokerService);
122            rc.setUpdateClusterClients(isUpdateClusterClients());
123            rc.setRebalanceClusterClients(isRebalanceClusterClients());
124            rc.setUpdateClusterFilter(getUpdateClusterFilter());
125            rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
126            rc.setAuditNetworkProducers(isAuditNetworkProducers());
127            rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
128            rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
129            return rc;
130        }
131    
132        public BrokerInfo getBrokerInfo() {
133            return brokerInfo;
134        }
135    
136        public void setBrokerInfo(BrokerInfo brokerInfo) {
137            this.brokerInfo = brokerInfo;
138        }
139    
140        /**
141         *
142         * @deprecated use the {@link #setBrokerService(BrokerService)} method
143         *             instead.
144         */
145        @Deprecated
146        public void setBrokerName(String name) {
147            if (this.brokerInfo == null) {
148                this.brokerInfo = new BrokerInfo();
149            }
150            this.brokerInfo.setBrokerName(name);
151        }
152    
153        public TransportServer getServer() throws IOException, URISyntaxException {
154            if (server == null) {
155                setServer(createTransportServer());
156            }
157            return server;
158        }
159    
160        public void setServer(TransportServer server) {
161            this.server = server;
162        }
163    
164        public URI getUri() {
165            if (uri == null) {
166                try {
167                    uri = getConnectUri();
168                } catch (Throwable e) {
169                }
170            }
171            return uri;
172        }
173    
174        /**
175         * Sets the server transport URI to use if there is not a
176         * {@link TransportServer} configured via the
177         * {@link #setServer(TransportServer)} method. This value is used to lazy
178         * create a {@link TransportServer} instance
179         *
180         * @param uri
181         */
182        public void setUri(URI uri) {
183            this.uri = uri;
184        }
185    
186        public TaskRunnerFactory getTaskRunnerFactory() {
187            return taskRunnerFactory;
188        }
189    
190        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
191            this.taskRunnerFactory = taskRunnerFactory;
192        }
193    
194        /**
195         * @return the statistics for this connector
196         */
197        public ConnectorStatistics getStatistics() {
198            return statistics;
199        }
200    
201        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
202            return messageAuthorizationPolicy;
203        }
204    
205        /**
206         * Sets the policy used to decide if the current connection is authorized to
207         * consume a given message
208         */
209        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
210            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
211        }
212    
213        public void start() throws Exception {
214            broker = brokerService.getBroker();
215            brokerInfo.setBrokerName(broker.getBrokerName());
216            brokerInfo.setBrokerId(broker.getBrokerId());
217            brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
218            brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
219            brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
220            getServer().setAcceptListener(new TransportAcceptListener() {
221                public void onAccept(final Transport transport) {
222                    try {
223                        DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
224                            public void run() {
225                                try {
226                                    Connection connection = createConnection(transport);
227                                    connection.start();
228                                } catch (Exception e) {
229                                    String remoteHost = transport.getRemoteAddress();
230                                    ServiceSupport.dispose(transport);
231                                    onAcceptError(e, remoteHost);
232                                }
233                            }
234                        });
235                    } catch (Exception e) {
236                        String remoteHost = transport.getRemoteAddress();
237                        ServiceSupport.dispose(transport);
238                        onAcceptError(e, remoteHost);
239                    }
240                }
241    
242                public void onAcceptError(Exception error) {
243                    onAcceptError(error, null);
244                }
245    
246                private void onAcceptError(Exception error, String remoteHost) {
247                    LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
248                            + error);
249                    LOG.debug("Reason: " + error, error);
250                }
251            });
252            getServer().setBrokerInfo(brokerInfo);
253            getServer().start();
254    
255            DiscoveryAgent da = getDiscoveryAgent();
256            if (da != null) {
257                da.registerService(getPublishableConnectString());
258                da.start();
259            }
260            if (enableStatusMonitor) {
261                this.statusDector = new TransportStatusDetector(this);
262                this.statusDector.start();
263            }
264    
265            LOG.info("Connector " + getName() + " Started");
266        }
267    
268        public String getPublishableConnectString() throws Exception {
269            return getPublishableConnectString(getConnectUri());
270        }
271    
272        public String getPublishableConnectString(URI theConnectURI) throws Exception {
273            String publishableConnectString = null;
274            if (theConnectURI != null) {
275                publishableConnectString = theConnectURI.toString();
276                // strip off server side query parameters which may not be compatible to
277                // clients
278                if (theConnectURI.getRawQuery() != null) {
279                    publishableConnectString = publishableConnectString.substring(0, publishableConnectString
280                            .indexOf(theConnectURI.getRawQuery()) - 1);
281                }
282            }
283            if (LOG.isDebugEnabled()) {
284                LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
285            }
286            return publishableConnectString;
287        }
288    
289        public void stop() throws Exception {
290            ServiceStopper ss = new ServiceStopper();
291            if (discoveryAgent != null) {
292                ss.stop(discoveryAgent);
293            }
294            if (server != null) {
295                ss.stop(server);
296            }
297            if (this.statusDector != null) {
298                this.statusDector.stop();
299            }
300    
301            for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
302                TransportConnection c = iter.next();
303                ss.stop(c);
304            }
305            server = null;
306            ss.throwFirstException();
307            LOG.info("Connector " + getName() + " Stopped");
308        }
309    
310        // Implementation methods
311        // -------------------------------------------------------------------------
312        protected Connection createConnection(Transport transport) throws IOException {
313            TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
314                    : taskRunnerFactory);
315            boolean statEnabled = this.getStatistics().isEnabled();
316            answer.getStatistics().setEnabled(statEnabled);
317            answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
318            return answer;
319        }
320    
321        protected TransportServer createTransportServer() throws IOException, URISyntaxException {
322            if (uri == null) {
323                throw new IllegalArgumentException("You must specify either a server or uri property");
324            }
325            if (brokerService == null) {
326                throw new IllegalArgumentException(
327                        "You must specify the brokerService property. Maybe this connector should be added to a broker?");
328            }
329            return TransportFactory.bind(brokerService, uri);
330        }
331    
332        public DiscoveryAgent getDiscoveryAgent() throws IOException {
333            if (discoveryAgent == null) {
334                discoveryAgent = createDiscoveryAgent();
335            }
336            return discoveryAgent;
337        }
338    
339        protected DiscoveryAgent createDiscoveryAgent() throws IOException {
340            if (discoveryUri != null) {
341                DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
342    
343                if( agent!=null && agent instanceof BrokerServiceAware ) {
344                    ((BrokerServiceAware)agent).setBrokerService(brokerService);
345                }
346    
347                return agent;
348            }
349            return null;
350        }
351    
352        public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
353            this.discoveryAgent = discoveryAgent;
354        }
355    
356        public URI getDiscoveryUri() {
357            return discoveryUri;
358        }
359    
360        public void setDiscoveryUri(URI discoveryUri) {
361            this.discoveryUri = discoveryUri;
362        }
363    
364        public URI getConnectUri() throws IOException, URISyntaxException {
365            if (connectUri == null) {
366                if (server != null) {
367                    connectUri = server.getConnectURI();
368                }
369            }
370            return connectUri;
371        }
372    
373        public void setConnectUri(URI transportUri) {
374            this.connectUri = transportUri;
375        }
376    
377        public void onStarted(TransportConnection connection) {
378            connections.add(connection);
379        }
380    
381        public void onStopped(TransportConnection connection) {
382            connections.remove(connection);
383        }
384    
385        public String getName() {
386            if (name == null) {
387                uri = getUri();
388                if (uri != null) {
389                    name = uri.toString();
390                }
391            }
392            return name;
393        }
394    
395        public void setName(String name) {
396            this.name = name;
397        }
398    
399        @Override
400        public String toString() {
401            String rc = getName();
402            if (rc == null) {
403                rc = super.toString();
404            }
405            return rc;
406        }
407    
408        protected ConnectionControl getConnectionControl() {
409            boolean rebalance = isRebalanceClusterClients();
410            String connectedBrokers = "";
411            String separator = "";
412    
413            if (isUpdateClusterClients()) {
414                synchronized (peerBrokers) {
415                    for (String uri : getPeerBrokers()) {
416                        connectedBrokers += separator + uri;
417                        separator = ",";
418                    }
419    
420                    if (rebalance) {
421                        String shuffle = getPeerBrokers().removeFirst();
422                        getPeerBrokers().addLast(shuffle);
423                    }
424                }
425            }
426            ConnectionControl control = new ConnectionControl();
427            control.setConnectedBrokers(connectedBrokers);
428            control.setRebalanceConnection(rebalance);
429            return control;
430    
431        }
432        
433        public void addPeerBroker(BrokerInfo info) {
434            if (isMatchesClusterFilter(info.getBrokerName())) {
435                synchronized (peerBrokers) {
436                    getPeerBrokers().addLast(info.getBrokerURL());
437                }
438            }
439        }
440        
441        public void removePeerBroker(BrokerInfo info) {
442            synchronized (peerBrokers) {
443                getPeerBrokers().remove(info.getBrokerURL());
444            }
445        }
446    
447        public LinkedList<String> getPeerBrokers() {
448            synchronized (peerBrokers) {
449                if (peerBrokers.isEmpty()) {
450                    peerBrokers.add(brokerService.getDefaultSocketURIString());
451                }
452                return peerBrokers;
453            }
454        }
455    
456        public void updateClientClusterInfo() {
457    
458            if (isRebalanceClusterClients() || isUpdateClusterClients()) {
459                ConnectionControl control = getConnectionControl();
460                for (Connection c : this.connections) {
461                    c.updateClient(control);
462                    if (isRebalanceClusterClients()) {
463                        control = getConnectionControl();
464                    }
465                }
466            }
467        }
468    
469        private boolean isMatchesClusterFilter(String brokerName) {
470            boolean result = true;
471            String filter = getUpdateClusterFilter();
472            if (filter != null) {
473                filter = filter.trim();
474                if (filter.length() > 0) {
475                    StringTokenizer tokenizer = new StringTokenizer(filter, ",");
476                    while (result && tokenizer.hasMoreTokens()) {
477                        String token = tokenizer.nextToken();
478                        result = isMatchesClusterFilter(brokerName, token);
479                    }
480                }
481            }
482            return result;
483        }
484    
485        private boolean isMatchesClusterFilter(String brokerName, String match) {
486            boolean result = true;
487            if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
488                result = Pattern.matches(match, brokerName);
489            }
490            return result;
491        }
492    
493        public boolean isDisableAsyncDispatch() {
494            return disableAsyncDispatch;
495        }
496    
497        public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
498            this.disableAsyncDispatch = disableAsyncDispatch;
499        }
500    
501        /**
502         * @return the enableStatusMonitor
503         */
504        public boolean isEnableStatusMonitor() {
505            return enableStatusMonitor;
506        }
507    
508        /**
509         * @param enableStatusMonitor
510         *            the enableStatusMonitor to set
511         */
512        public void setEnableStatusMonitor(boolean enableStatusMonitor) {
513            this.enableStatusMonitor = enableStatusMonitor;
514        }
515    
516        /**
517         * This is called by the BrokerService right before it starts the transport.
518         */
519        public void setBrokerService(BrokerService brokerService) {
520            this.brokerService = brokerService;
521        }
522    
523        public Broker getBroker() {
524            return broker;
525        }
526    
527        public BrokerService getBrokerService() {
528            return brokerService;
529        }
530    
531        /**
532         * @return the updateClusterClients
533         */
534        public boolean isUpdateClusterClients() {
535            return this.updateClusterClients;
536        }
537    
538        /**
539         * @param updateClusterClients
540         *            the updateClusterClients to set
541         */
542        public void setUpdateClusterClients(boolean updateClusterClients) {
543            this.updateClusterClients = updateClusterClients;
544        }
545    
546        /**
547         * @return the rebalanceClusterClients
548         */
549        public boolean isRebalanceClusterClients() {
550            return this.rebalanceClusterClients;
551        }
552    
553        /**
554         * @param rebalanceClusterClients
555         *            the rebalanceClusterClients to set
556         */
557        public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
558            this.rebalanceClusterClients = rebalanceClusterClients;
559        }
560    
561        /**
562         * @return the updateClusterClientsOnRemove
563         */
564        public boolean isUpdateClusterClientsOnRemove() {
565            return this.updateClusterClientsOnRemove;
566        }
567    
568        /**
569         * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
570         */
571        public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
572            this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
573        }
574    
575        /**
576         * @return the updateClusterFilter
577         */
578        public String getUpdateClusterFilter() {
579            return this.updateClusterFilter;
580        }
581    
582        /**
583         * @param updateClusterFilter
584         *            the updateClusterFilter to set
585         */
586        public void setUpdateClusterFilter(String updateClusterFilter) {
587            this.updateClusterFilter = updateClusterFilter;
588        }
589    
590        public int connectionCount() {
591            return connections.size();
592        }
593    
594        public boolean isAuditNetworkProducers() {
595            return auditNetworkProducers;
596        }
597    
598        /**
599         * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
600         * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
601         * @param auditNetworkProducers
602         */
603        public void setAuditNetworkProducers(boolean auditNetworkProducers) {
604            this.auditNetworkProducers = auditNetworkProducers;
605        }
606    
607        public int getMaximumProducersAllowedPerConnection() {
608            return maximumProducersAllowedPerConnection;
609        }
610    
611        public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
612            this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
613        }
614    
615        public int getMaximumConsumersAllowedPerConnection() {
616            return maximumConsumersAllowedPerConnection;
617        }
618    
619        public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
620            this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
621        }
622    
623    }