001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.StringTokenizer;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.regex.Pattern;
027
028import javax.management.ObjectName;
029import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030import org.apache.activemq.broker.jmx.ManagementContext;
031import org.apache.activemq.broker.region.ConnectorStatistics;
032import org.apache.activemq.command.BrokerInfo;
033import org.apache.activemq.command.ConnectionControl;
034import org.apache.activemq.security.MessageAuthorizationPolicy;
035import org.apache.activemq.thread.DefaultThreadPools;
036import org.apache.activemq.thread.TaskRunnerFactory;
037import org.apache.activemq.transport.Transport;
038import org.apache.activemq.transport.TransportAcceptListener;
039import org.apache.activemq.transport.TransportFactory;
040import org.apache.activemq.transport.TransportServer;
041import org.apache.activemq.transport.discovery.DiscoveryAgent;
042import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
043import org.apache.activemq.util.ServiceStopper;
044import org.apache.activemq.util.ServiceSupport;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * @org.apache.xbean.XBean
050 *
051 */
052public class TransportConnector implements Connector, BrokerServiceAware {
053
054    final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
055
056    protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
057    protected TransportStatusDetector statusDector;
058    private BrokerService brokerService;
059    private TransportServer server;
060    private URI uri;
061    private BrokerInfo brokerInfo = new BrokerInfo();
062    private TaskRunnerFactory taskRunnerFactory;
063    private MessageAuthorizationPolicy messageAuthorizationPolicy;
064    private DiscoveryAgent discoveryAgent;
065    private final ConnectorStatistics statistics = new ConnectorStatistics();
066    private URI discoveryUri;
067    private URI connectUri;
068    private String name;
069    private boolean disableAsyncDispatch;
070    private boolean enableStatusMonitor = false;
071    private Broker broker;
072    private boolean updateClusterClients = false;
073    private boolean rebalanceClusterClients;
074    private boolean updateClusterClientsOnRemove = false;
075    private String updateClusterFilter;
076    private boolean auditNetworkProducers = false;
077    private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
078    private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
079
080    LinkedList<String> peerBrokers = new LinkedList<String>();
081
082    public TransportConnector() {
083    }
084
085    public TransportConnector(TransportServer server) {
086        this();
087        setServer(server);
088        if (server != null && server.getConnectURI() != null) {
089            URI uri = server.getConnectURI();
090            if (uri != null && uri.getScheme().equals("vm")) {
091                setEnableStatusMonitor(false);
092            }
093        }
094
095    }
096
097    /**
098     * @return Returns the connections.
099     */
100    public CopyOnWriteArrayList<TransportConnection> getConnections() {
101        return connections;
102    }
103
104    /**
105     * Factory method to create a JMX managed version of this transport
106     * connector
107     */
108    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
109            throws IOException, URISyntaxException {
110        ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
111        rc.setBrokerInfo(getBrokerInfo());
112        rc.setConnectUri(getConnectUri());
113        rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
114        rc.setDiscoveryAgent(getDiscoveryAgent());
115        rc.setDiscoveryUri(getDiscoveryUri());
116        rc.setEnableStatusMonitor(isEnableStatusMonitor());
117        rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
118        rc.setName(getName());
119        rc.setTaskRunnerFactory(getTaskRunnerFactory());
120        rc.setUri(getUri());
121        rc.setBrokerService(brokerService);
122        rc.setUpdateClusterClients(isUpdateClusterClients());
123        rc.setRebalanceClusterClients(isRebalanceClusterClients());
124        rc.setUpdateClusterFilter(getUpdateClusterFilter());
125        rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
126        rc.setAuditNetworkProducers(isAuditNetworkProducers());
127        rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
128        rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
129        return rc;
130    }
131
132    public BrokerInfo getBrokerInfo() {
133        return brokerInfo;
134    }
135
136    public void setBrokerInfo(BrokerInfo brokerInfo) {
137        this.brokerInfo = brokerInfo;
138    }
139
140    /**
141     *
142     * @deprecated use the {@link #setBrokerService(BrokerService)} method
143     *             instead.
144     */
145    @Deprecated
146    public void setBrokerName(String name) {
147        if (this.brokerInfo == null) {
148            this.brokerInfo = new BrokerInfo();
149        }
150        this.brokerInfo.setBrokerName(name);
151    }
152
153    public TransportServer getServer() throws IOException, URISyntaxException {
154        if (server == null) {
155            setServer(createTransportServer());
156        }
157        return server;
158    }
159
160    public void setServer(TransportServer server) {
161        this.server = server;
162    }
163
164    public URI getUri() {
165        if (uri == null) {
166            try {
167                uri = getConnectUri();
168            } catch (Throwable e) {
169            }
170        }
171        return uri;
172    }
173
174    /**
175     * Sets the server transport URI to use if there is not a
176     * {@link TransportServer} configured via the
177     * {@link #setServer(TransportServer)} method. This value is used to lazy
178     * create a {@link TransportServer} instance
179     *
180     * @param uri
181     */
182    public void setUri(URI uri) {
183        this.uri = uri;
184    }
185
186    public TaskRunnerFactory getTaskRunnerFactory() {
187        return taskRunnerFactory;
188    }
189
190    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
191        this.taskRunnerFactory = taskRunnerFactory;
192    }
193
194    /**
195     * @return the statistics for this connector
196     */
197    public ConnectorStatistics getStatistics() {
198        return statistics;
199    }
200
201    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
202        return messageAuthorizationPolicy;
203    }
204
205    /**
206     * Sets the policy used to decide if the current connection is authorized to
207     * consume a given message
208     */
209    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
210        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
211    }
212
213    public void start() throws Exception {
214        broker = brokerService.getBroker();
215        brokerInfo.setBrokerName(broker.getBrokerName());
216        brokerInfo.setBrokerId(broker.getBrokerId());
217        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
218        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
219        brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
220        getServer().setAcceptListener(new TransportAcceptListener() {
221            public void onAccept(final Transport transport) {
222                try {
223                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
224                        public void run() {
225                            try {
226                                Connection connection = createConnection(transport);
227                                connection.start();
228                            } catch (Exception e) {
229                                String remoteHost = transport.getRemoteAddress();
230                                ServiceSupport.dispose(transport);
231                                onAcceptError(e, remoteHost);
232                            }
233                        }
234                    });
235                } catch (Exception e) {
236                    String remoteHost = transport.getRemoteAddress();
237                    ServiceSupport.dispose(transport);
238                    onAcceptError(e, remoteHost);
239                }
240            }
241
242            public void onAcceptError(Exception error) {
243                onAcceptError(error, null);
244            }
245
246            private void onAcceptError(Exception error, String remoteHost) {
247                LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
248                        + error);
249                LOG.debug("Reason: " + error, error);
250            }
251        });
252        getServer().setBrokerInfo(brokerInfo);
253        getServer().start();
254
255        DiscoveryAgent da = getDiscoveryAgent();
256        if (da != null) {
257            da.registerService(getPublishableConnectString());
258            da.start();
259        }
260        if (enableStatusMonitor) {
261            this.statusDector = new TransportStatusDetector(this);
262            this.statusDector.start();
263        }
264
265        LOG.info("Connector " + getName() + " Started");
266    }
267
268    public String getPublishableConnectString() throws Exception {
269        return getPublishableConnectString(getConnectUri());
270    }
271
272    public String getPublishableConnectString(URI theConnectURI) throws Exception {
273        String publishableConnectString = null;
274        if (theConnectURI != null) {
275            publishableConnectString = theConnectURI.toString();
276            // strip off server side query parameters which may not be compatible to
277            // clients
278            if (theConnectURI.getRawQuery() != null) {
279                publishableConnectString = publishableConnectString.substring(0, publishableConnectString
280                        .indexOf(theConnectURI.getRawQuery()) - 1);
281            }
282        }
283        if (LOG.isDebugEnabled()) {
284            LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
285        }
286        return publishableConnectString;
287    }
288
289    public void stop() throws Exception {
290        ServiceStopper ss = new ServiceStopper();
291        if (discoveryAgent != null) {
292            ss.stop(discoveryAgent);
293        }
294        if (server != null) {
295            ss.stop(server);
296        }
297        if (this.statusDector != null) {
298            this.statusDector.stop();
299        }
300
301        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
302            TransportConnection c = iter.next();
303            ss.stop(c);
304        }
305        server = null;
306        ss.throwFirstException();
307        LOG.info("Connector " + getName() + " Stopped");
308    }
309
310    // Implementation methods
311    // -------------------------------------------------------------------------
312    protected Connection createConnection(Transport transport) throws IOException {
313        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
314                : taskRunnerFactory);
315        boolean statEnabled = this.getStatistics().isEnabled();
316        answer.getStatistics().setEnabled(statEnabled);
317        answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
318        return answer;
319    }
320
321    protected TransportServer createTransportServer() throws IOException, URISyntaxException {
322        if (uri == null) {
323            throw new IllegalArgumentException("You must specify either a server or uri property");
324        }
325        if (brokerService == null) {
326            throw new IllegalArgumentException(
327                    "You must specify the brokerService property. Maybe this connector should be added to a broker?");
328        }
329        return TransportFactory.bind(brokerService, uri);
330    }
331
332    public DiscoveryAgent getDiscoveryAgent() throws IOException {
333        if (discoveryAgent == null) {
334            discoveryAgent = createDiscoveryAgent();
335        }
336        return discoveryAgent;
337    }
338
339    protected DiscoveryAgent createDiscoveryAgent() throws IOException {
340        if (discoveryUri != null) {
341            DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
342
343            if( agent!=null && agent instanceof BrokerServiceAware ) {
344                ((BrokerServiceAware)agent).setBrokerService(brokerService);
345            }
346
347            return agent;
348        }
349        return null;
350    }
351
352    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
353        this.discoveryAgent = discoveryAgent;
354    }
355
356    public URI getDiscoveryUri() {
357        return discoveryUri;
358    }
359
360    public void setDiscoveryUri(URI discoveryUri) {
361        this.discoveryUri = discoveryUri;
362    }
363
364    public URI getConnectUri() throws IOException, URISyntaxException {
365        if (connectUri == null) {
366            if (server != null) {
367                connectUri = server.getConnectURI();
368            }
369        }
370        return connectUri;
371    }
372
373    public void setConnectUri(URI transportUri) {
374        this.connectUri = transportUri;
375    }
376
377    public void onStarted(TransportConnection connection) {
378        connections.add(connection);
379    }
380
381    public void onStopped(TransportConnection connection) {
382        connections.remove(connection);
383    }
384
385    public String getName() {
386        if (name == null) {
387            uri = getUri();
388            if (uri != null) {
389                name = uri.toString();
390            }
391        }
392        return name;
393    }
394
395    public void setName(String name) {
396        this.name = name;
397    }
398
399    @Override
400    public String toString() {
401        String rc = getName();
402        if (rc == null) {
403            rc = super.toString();
404        }
405        return rc;
406    }
407
408    protected ConnectionControl getConnectionControl() {
409        boolean rebalance = isRebalanceClusterClients();
410        String connectedBrokers = "";
411        String separator = "";
412
413        if (isUpdateClusterClients()) {
414            synchronized (peerBrokers) {
415                for (String uri : getPeerBrokers()) {
416                    connectedBrokers += separator + uri;
417                    separator = ",";
418                }
419
420                if (rebalance) {
421                    String shuffle = getPeerBrokers().removeFirst();
422                    getPeerBrokers().addLast(shuffle);
423                }
424            }
425        }
426        ConnectionControl control = new ConnectionControl();
427        control.setConnectedBrokers(connectedBrokers);
428        control.setRebalanceConnection(rebalance);
429        return control;
430
431    }
432    
433    public void addPeerBroker(BrokerInfo info) {
434        if (isMatchesClusterFilter(info.getBrokerName())) {
435            synchronized (peerBrokers) {
436                getPeerBrokers().addLast(info.getBrokerURL());
437            }
438        }
439    }
440    
441    public void removePeerBroker(BrokerInfo info) {
442        synchronized (peerBrokers) {
443            getPeerBrokers().remove(info.getBrokerURL());
444        }
445    }
446
447    public LinkedList<String> getPeerBrokers() {
448        synchronized (peerBrokers) {
449            if (peerBrokers.isEmpty()) {
450                peerBrokers.add(brokerService.getDefaultSocketURIString());
451            }
452            return peerBrokers;
453        }
454    }
455
456    public void updateClientClusterInfo() {
457
458        if (isRebalanceClusterClients() || isUpdateClusterClients()) {
459            ConnectionControl control = getConnectionControl();
460            for (Connection c : this.connections) {
461                c.updateClient(control);
462                if (isRebalanceClusterClients()) {
463                    control = getConnectionControl();
464                }
465            }
466        }
467    }
468
469    private boolean isMatchesClusterFilter(String brokerName) {
470        boolean result = true;
471        String filter = getUpdateClusterFilter();
472        if (filter != null) {
473            filter = filter.trim();
474            if (filter.length() > 0) {
475                StringTokenizer tokenizer = new StringTokenizer(filter, ",");
476                while (result && tokenizer.hasMoreTokens()) {
477                    String token = tokenizer.nextToken();
478                    result = isMatchesClusterFilter(brokerName, token);
479                }
480            }
481        }
482        return result;
483    }
484
485    private boolean isMatchesClusterFilter(String brokerName, String match) {
486        boolean result = true;
487        if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
488            result = Pattern.matches(match, brokerName);
489        }
490        return result;
491    }
492
493    public boolean isDisableAsyncDispatch() {
494        return disableAsyncDispatch;
495    }
496
497    public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
498        this.disableAsyncDispatch = disableAsyncDispatch;
499    }
500
501    /**
502     * @return the enableStatusMonitor
503     */
504    public boolean isEnableStatusMonitor() {
505        return enableStatusMonitor;
506    }
507
508    /**
509     * @param enableStatusMonitor
510     *            the enableStatusMonitor to set
511     */
512    public void setEnableStatusMonitor(boolean enableStatusMonitor) {
513        this.enableStatusMonitor = enableStatusMonitor;
514    }
515
516    /**
517     * This is called by the BrokerService right before it starts the transport.
518     */
519    public void setBrokerService(BrokerService brokerService) {
520        this.brokerService = brokerService;
521    }
522
523    public Broker getBroker() {
524        return broker;
525    }
526
527    public BrokerService getBrokerService() {
528        return brokerService;
529    }
530
531    /**
532     * @return the updateClusterClients
533     */
534    public boolean isUpdateClusterClients() {
535        return this.updateClusterClients;
536    }
537
538    /**
539     * @param updateClusterClients
540     *            the updateClusterClients to set
541     */
542    public void setUpdateClusterClients(boolean updateClusterClients) {
543        this.updateClusterClients = updateClusterClients;
544    }
545
546    /**
547     * @return the rebalanceClusterClients
548     */
549    public boolean isRebalanceClusterClients() {
550        return this.rebalanceClusterClients;
551    }
552
553    /**
554     * @param rebalanceClusterClients
555     *            the rebalanceClusterClients to set
556     */
557    public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
558        this.rebalanceClusterClients = rebalanceClusterClients;
559    }
560
561    /**
562     * @return the updateClusterClientsOnRemove
563     */
564    public boolean isUpdateClusterClientsOnRemove() {
565        return this.updateClusterClientsOnRemove;
566    }
567
568    /**
569     * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
570     */
571    public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
572        this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
573    }
574
575    /**
576     * @return the updateClusterFilter
577     */
578    public String getUpdateClusterFilter() {
579        return this.updateClusterFilter;
580    }
581
582    /**
583     * @param updateClusterFilter
584     *            the updateClusterFilter to set
585     */
586    public void setUpdateClusterFilter(String updateClusterFilter) {
587        this.updateClusterFilter = updateClusterFilter;
588    }
589
590    public int connectionCount() {
591        return connections.size();
592    }
593
594    public boolean isAuditNetworkProducers() {
595        return auditNetworkProducers;
596    }
597
598    /**
599     * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
600     * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
601     * @param auditNetworkProducers
602     */
603    public void setAuditNetworkProducers(boolean auditNetworkProducers) {
604        this.auditNetworkProducers = auditNetworkProducers;
605    }
606
607    public int getMaximumProducersAllowedPerConnection() {
608        return maximumProducersAllowedPerConnection;
609    }
610
611    public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
612        this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
613    }
614
615    public int getMaximumConsumersAllowedPerConnection() {
616        return maximumConsumersAllowedPerConnection;
617    }
618
619    public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
620        this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
621    }
622
623}