001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.net.UnknownHostException;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.Iterator;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    import java.util.concurrent.CopyOnWriteArrayList;
032    import java.util.concurrent.CountDownLatch;
033    import java.util.concurrent.LinkedBlockingQueue;
034    import java.util.concurrent.RejectedExecutionException;
035    import java.util.concurrent.RejectedExecutionHandler;
036    import java.util.concurrent.SynchronousQueue;
037    import java.util.concurrent.ThreadFactory;
038    import java.util.concurrent.ThreadPoolExecutor;
039    import java.util.concurrent.TimeUnit;
040    import java.util.concurrent.atomic.AtomicBoolean;
041    
042    import javax.annotation.PostConstruct;
043    import javax.annotation.PreDestroy;
044    import javax.management.MalformedObjectNameException;
045    import javax.management.ObjectName;
046    
047    import org.apache.activemq.ActiveMQConnectionMetaData;
048    import org.apache.activemq.ConfigurationException;
049    import org.apache.activemq.Service;
050    import org.apache.activemq.advisory.AdvisoryBroker;
051    import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
052    import org.apache.activemq.broker.ft.MasterConnector;
053    import org.apache.activemq.broker.jmx.AnnotatedMBean;
054    import org.apache.activemq.broker.jmx.BrokerView;
055    import org.apache.activemq.broker.jmx.ConnectorView;
056    import org.apache.activemq.broker.jmx.ConnectorViewMBean;
057    import org.apache.activemq.broker.jmx.FTConnectorView;
058    import org.apache.activemq.broker.jmx.JmsConnectorView;
059    import org.apache.activemq.broker.jmx.JobSchedulerView;
060    import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
061    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
062    import org.apache.activemq.broker.jmx.ManagementContext;
063    import org.apache.activemq.broker.jmx.NetworkConnectorView;
064    import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
065    import org.apache.activemq.broker.jmx.ProxyConnectorView;
066    import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
067    import org.apache.activemq.broker.region.Destination;
068    import org.apache.activemq.broker.region.DestinationFactory;
069    import org.apache.activemq.broker.region.DestinationFactoryImpl;
070    import org.apache.activemq.broker.region.DestinationInterceptor;
071    import org.apache.activemq.broker.region.RegionBroker;
072    import org.apache.activemq.broker.region.policy.PolicyMap;
073    import org.apache.activemq.broker.region.virtual.MirroredQueue;
074    import org.apache.activemq.broker.region.virtual.VirtualDestination;
075    import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
076    import org.apache.activemq.broker.region.virtual.VirtualTopic;
077    import org.apache.activemq.broker.scheduler.SchedulerBroker;
078    import org.apache.activemq.command.ActiveMQDestination;
079    import org.apache.activemq.command.ActiveMQQueue;
080    import org.apache.activemq.command.BrokerId;
081    import org.apache.activemq.filter.DestinationFilter;
082    import org.apache.activemq.network.ConnectionFilter;
083    import org.apache.activemq.network.DiscoveryNetworkConnector;
084    import org.apache.activemq.network.NetworkConnector;
085    import org.apache.activemq.network.jms.JmsConnector;
086    import org.apache.activemq.proxy.ProxyConnector;
087    import org.apache.activemq.security.MessageAuthorizationPolicy;
088    import org.apache.activemq.selector.SelectorParser;
089    import org.apache.activemq.store.PersistenceAdapter;
090    import org.apache.activemq.store.PersistenceAdapterFactory;
091    import org.apache.activemq.store.amq.AMQPersistenceAdapter;
092    import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
093    import org.apache.activemq.store.kahadb.plist.PListStore;
094    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
095    import org.apache.activemq.thread.Scheduler;
096    import org.apache.activemq.thread.TaskRunnerFactory;
097    import org.apache.activemq.transport.TransportFactory;
098    import org.apache.activemq.transport.TransportServer;
099    import org.apache.activemq.transport.vm.VMTransportFactory;
100    import org.apache.activemq.usage.SystemUsage;
101    import org.apache.activemq.util.BrokerSupport;
102    import org.apache.activemq.util.DefaultIOExceptionHandler;
103    import org.apache.activemq.util.IOExceptionHandler;
104    import org.apache.activemq.util.IOExceptionSupport;
105    import org.apache.activemq.util.IOHelper;
106    import org.apache.activemq.util.InetAddressUtil;
107    import org.apache.activemq.util.JMXSupport;
108    import org.apache.activemq.util.ServiceStopper;
109    import org.apache.activemq.util.URISupport;
110    import org.slf4j.Logger;
111    import org.slf4j.LoggerFactory;
112    import org.slf4j.MDC;
113    
114    /**
115     * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
116     * number of transport connectors, network connectors and a bunch of properties
117     * which can be used to configure the broker as its lazily created.
118     *
119     *
120     * @org.apache.xbean.XBean
121     */
122    public class BrokerService implements Service {
123        protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
124        public static final String DEFAULT_PORT = "61616";
125        public static final String LOCAL_HOST_NAME;
126        public static final String DEFAULT_BROKER_NAME = "localhost";
127        private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
128        private static final long serialVersionUID = 7353129142305630237L;
129        private boolean useJmx = true;
130        private boolean enableStatistics = true;
131        private boolean persistent = true;
132        private boolean populateJMSXUserID;
133        private boolean useAuthenticatedPrincipalForJMSXUserID;
134        private boolean populateUserNameInMBeans;
135    
136        private boolean useShutdownHook = true;
137        private boolean useLoggingForShutdownErrors;
138        private boolean shutdownOnMasterFailure;
139        private boolean shutdownOnSlaveFailure;
140        private boolean waitForSlave;
141        private long waitForSlaveTimeout = 600000L;
142        private boolean passiveSlave;
143        private String brokerName = DEFAULT_BROKER_NAME;
144        private File dataDirectoryFile;
145        private File tmpDataDirectory;
146        private Broker broker;
147        private BrokerView adminView;
148        private ManagementContext managementContext;
149        private ObjectName brokerObjectName;
150        private TaskRunnerFactory taskRunnerFactory;
151        private TaskRunnerFactory persistenceTaskRunnerFactory;
152        private SystemUsage systemUsage;
153        private SystemUsage producerSystemUsage;
154        private SystemUsage consumerSystemUsaage;
155        private PersistenceAdapter persistenceAdapter;
156        private PersistenceAdapterFactory persistenceFactory;
157        protected DestinationFactory destinationFactory;
158        private MessageAuthorizationPolicy messageAuthorizationPolicy;
159        private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
160        private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
161        private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
162        private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
163        private final List<Service> services = new ArrayList<Service>();
164        private MasterConnector masterConnector;
165        private String masterConnectorURI;
166        private transient Thread shutdownHook;
167        private String[] transportConnectorURIs;
168        private String[] networkConnectorURIs;
169        private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
170        // to other jms messaging
171        // systems
172        private boolean deleteAllMessagesOnStartup;
173        private boolean advisorySupport = true;
174        private URI vmConnectorURI;
175        private String defaultSocketURIString;
176        private PolicyMap destinationPolicy;
177        private final AtomicBoolean started = new AtomicBoolean(false);
178        private final AtomicBoolean stopped = new AtomicBoolean(false);
179        private BrokerPlugin[] plugins;
180        private boolean keepDurableSubsActive = true;
181        private boolean useVirtualTopics = true;
182        private boolean useMirroredQueues = false;
183        private boolean useTempMirroredQueues = true;
184        private BrokerId brokerId;
185        private DestinationInterceptor[] destinationInterceptors;
186        private ActiveMQDestination[] destinations;
187        private PListStore tempDataStore;
188        private int persistenceThreadPriority = Thread.MAX_PRIORITY;
189        private boolean useLocalHostBrokerName;
190        private final CountDownLatch stoppedLatch = new CountDownLatch(1);
191        private final CountDownLatch startedLatch = new CountDownLatch(1);
192        private boolean supportFailOver;
193        private Broker regionBroker;
194        private int producerSystemUsagePortion = 60;
195        private int consumerSystemUsagePortion = 40;
196        private boolean splitSystemUsageForProducersConsumers;
197        private boolean monitorConnectionSplits = false;
198        private int taskRunnerPriority = Thread.NORM_PRIORITY;
199        private boolean dedicatedTaskRunner;
200        private boolean cacheTempDestinations = false;// useful for failover
201        private int timeBeforePurgeTempDestinations = 5000;
202        private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
203        private boolean systemExitOnShutdown;
204        private int systemExitOnShutdownExitCode;
205        private SslContext sslContext;
206        private boolean forceStart = false;
207        private IOExceptionHandler ioExceptionHandler;
208        private boolean schedulerSupport = false;
209        private File schedulerDirectoryFile;
210        private Scheduler scheduler;
211        private ThreadPoolExecutor executor;
212        private boolean slave = true;
213        private int schedulePeriodForDestinationPurge= 0;
214        private int maxPurgedDestinationsPerSweep = 0;
215        private BrokerContext brokerContext;
216        private boolean networkConnectorStartAsync = false;
217        private boolean allowTempAutoCreationOnSend;
218    
219        private int offlineDurableSubscriberTimeout = -1;
220        private int offlineDurableSubscriberTaskSchedule = 300000;
221        private DestinationFilter virtualConsumerDestinationFilter;
222    
223        static {
224            String localHostName = "localhost";
225            try {
226                localHostName =  InetAddressUtil.getLocalHostName();
227            } catch (UnknownHostException e) {
228                LOG.error("Failed to resolve localhost");
229            }
230            LOCAL_HOST_NAME = localHostName;
231        }
232    
233        @Override
234        public String toString() {
235            return "BrokerService[" + getBrokerName() + "]";
236        }
237    
238        /**
239         * Adds a new transport connector for the given bind address
240         *
241         * @return the newly created and added transport connector
242         * @throws Exception
243         */
244        public TransportConnector addConnector(String bindAddress) throws Exception {
245            return addConnector(new URI(bindAddress));
246        }
247    
248        /**
249         * Adds a new transport connector for the given bind address
250         *
251         * @return the newly created and added transport connector
252         * @throws Exception
253         */
254        public TransportConnector addConnector(URI bindAddress) throws Exception {
255            return addConnector(createTransportConnector(bindAddress));
256        }
257    
258        /**
259         * Adds a new transport connector for the given TransportServer transport
260         *
261         * @return the newly created and added transport connector
262         * @throws Exception
263         */
264        public TransportConnector addConnector(TransportServer transport) throws Exception {
265            return addConnector(new TransportConnector(transport));
266        }
267    
268        /**
269         * Adds a new transport connector
270         *
271         * @return the transport connector
272         * @throws Exception
273         */
274        public TransportConnector addConnector(TransportConnector connector) throws Exception {
275            transportConnectors.add(connector);
276            return connector;
277        }
278    
279        /**
280         * Stops and removes a transport connector from the broker.
281         *
282         * @param connector
283         * @return true if the connector has been previously added to the broker
284         * @throws Exception
285         */
286        public boolean removeConnector(TransportConnector connector) throws Exception {
287            boolean rc = transportConnectors.remove(connector);
288            if (rc) {
289                unregisterConnectorMBean(connector);
290            }
291            return rc;
292        }
293    
294        /**
295         * Adds a new network connector using the given discovery address
296         *
297         * @return the newly created and added network connector
298         * @throws Exception
299         */
300        public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
301            return addNetworkConnector(new URI(discoveryAddress));
302        }
303    
304        /**
305         * Adds a new proxy connector using the given bind address
306         *
307         * @return the newly created and added network connector
308         * @throws Exception
309         */
310        public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
311            return addProxyConnector(new URI(bindAddress));
312        }
313    
314        /**
315         * Adds a new network connector using the given discovery address
316         *
317         * @return the newly created and added network connector
318         * @throws Exception
319         */
320        public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
321            NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
322            return addNetworkConnector(connector);
323        }
324    
325        /**
326         * Adds a new proxy connector using the given bind address
327         *
328         * @return the newly created and added network connector
329         * @throws Exception
330         */
331        public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
332            ProxyConnector connector = new ProxyConnector();
333            connector.setBind(bindAddress);
334            connector.setRemote(new URI("fanout:multicast://default"));
335            return addProxyConnector(connector);
336        }
337    
338        /**
339         * Adds a new network connector to connect this broker to a federated
340         * network
341         */
342        public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
343            connector.setBrokerService(this);
344            URI uri = getVmConnectorURI();
345            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
346            map.put("network", "true");
347            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
348            connector.setLocalUri(uri);
349            // Set a connection filter so that the connector does not establish loop
350            // back connections.
351            connector.setConnectionFilter(new ConnectionFilter() {
352                public boolean connectTo(URI location) {
353                    List<TransportConnector> transportConnectors = getTransportConnectors();
354                    for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
355                        try {
356                            TransportConnector tc = iter.next();
357                            if (location.equals(tc.getConnectUri())) {
358                                return false;
359                            }
360                        } catch (Throwable e) {
361                        }
362                    }
363                    return true;
364                }
365            });
366            networkConnectors.add(connector);
367            if (isUseJmx()) {
368                registerNetworkConnectorMBean(connector);
369            }
370            return connector;
371        }
372    
373        /**
374         * Removes the given network connector without stopping it. The caller
375         * should call {@link NetworkConnector#stop()} to close the connector
376         */
377        public boolean removeNetworkConnector(NetworkConnector connector) {
378            boolean answer = networkConnectors.remove(connector);
379            if (answer) {
380                unregisterNetworkConnectorMBean(connector);
381            }
382            return answer;
383        }
384    
385        public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
386            URI uri = getVmConnectorURI();
387            connector.setLocalUri(uri);
388            proxyConnectors.add(connector);
389            if (isUseJmx()) {
390                registerProxyConnectorMBean(connector);
391            }
392            return connector;
393        }
394    
395        public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
396            connector.setBrokerService(this);
397            jmsConnectors.add(connector);
398            if (isUseJmx()) {
399                registerJmsConnectorMBean(connector);
400            }
401            return connector;
402        }
403    
404        public JmsConnector removeJmsConnector(JmsConnector connector) {
405            if (jmsConnectors.remove(connector)) {
406                return connector;
407            }
408            return null;
409        }
410    
411        /**
412         * @return Returns the masterConnectorURI.
413         */
414        public String getMasterConnectorURI() {
415            return masterConnectorURI;
416        }
417    
418        /**
419         * @param masterConnectorURI
420         *            The masterConnectorURI to set.
421         */
422        public void setMasterConnectorURI(String masterConnectorURI) {
423            this.masterConnectorURI = masterConnectorURI;
424        }
425    
426        /**
427         * @return true if this Broker is a slave to a Master
428         */
429        public boolean isSlave() {
430            return (masterConnector != null && masterConnector.isSlave()) ||
431                (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
432                (masterConnector == null && slave);
433        }
434    
435        public void masterFailed() {
436            if (shutdownOnMasterFailure) {
437                LOG.error("The Master has failed ... shutting down");
438                try {
439                    stop();
440                } catch (Exception e) {
441                    LOG.error("Failed to stop for master failure", e);
442                }
443            } else {
444                LOG.warn("Master Failed - starting all connectors");
445                try {
446                    startAllConnectors();
447                    broker.nowMasterBroker();
448                } catch (Exception e) {
449                    LOG.error("Failed to startAllConnectors", e);
450                }
451            }
452        }
453    
454        public boolean isStarted() {
455            return started.get();
456        }
457    
458        /**
459         * Forces a start of the broker.
460         * By default a BrokerService instance that was
461         * previously stopped using BrokerService.stop() cannot be restarted
462         * using BrokerService.start().
463         * This method enforces a restart.
464         * It is not recommended to force a restart of the broker and will not work
465         * for most but some very trivial broker configurations.
466         * For restarting a broker instance we recommend to first call stop() on
467         * the old instance and then recreate a new BrokerService instance.
468         *
469         * @param force - if true enforces a restart.
470         * @throws Exception
471         */
472        public void start(boolean force) throws Exception {
473            forceStart = force;
474            stopped.set(false);
475            started.set(false);
476            start();
477        }
478    
479        // Service interface
480        // -------------------------------------------------------------------------
481    
482        protected boolean shouldAutostart() {
483            return true;
484        }
485    
486        /**
487         *
488         * @throws Exception
489         * @org. apache.xbean.InitMethod
490         */
491        @PostConstruct
492        public void autoStart() throws Exception {
493            if(shouldAutostart()) {
494                start();
495            }
496        }
497    
498        public void start() throws Exception {
499            if (stopped.get() || !started.compareAndSet(false, true)) {
500                // lets just ignore redundant start() calls
501                // as its way too easy to not be completely sure if start() has been
502                // called or not with the gazillion of different configuration
503                // mechanisms
504                // throw new IllegalStateException("Allready started.");
505                return;
506            }
507    
508            MDC.put("activemq.broker", brokerName);
509    
510            try {
511                if (systemExitOnShutdown && useShutdownHook) {
512                    throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
513                }
514                processHelperProperties();
515                if (isUseJmx()) {
516                    startManagementContext();
517                }
518    
519                getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
520                getPersistenceAdapter().setBrokerName(getBrokerName());
521                LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
522                if (deleteAllMessagesOnStartup) {
523                    deleteAllMessages();
524                }
525                getPersistenceAdapter().start();
526                slave = false;
527                startDestinations();
528                addShutdownHook();
529                getBroker().start();
530                if (isUseJmx()) {
531                    if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
532                        // try to restart management context
533                        // typical for slaves that use the same ports as master
534                        managementContext.stop();
535                        startManagementContext();
536                    }
537                    ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
538                    managedBroker.setContextBroker(broker);
539                    adminView.setBroker(managedBroker);
540                }
541                BrokerRegistry.getInstance().bind(getBrokerName(), this);
542                // see if there is a MasterBroker service and if so, configure
543                // it and start it.
544                for (Service service : services) {
545                    if (service instanceof MasterConnector) {
546                        configureService(service);
547                        service.start();
548                    }
549                }
550                if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
551                    startAllConnectors();
552                }
553                if (!stopped.get()) {
554                    if (isUseJmx() && masterConnector != null) {
555                        registerFTConnectorMBean(masterConnector);
556                    }
557                }
558                if (brokerId == null) {
559                    brokerId = broker.getBrokerId();
560                }
561                if (ioExceptionHandler == null) {
562                    setIoExceptionHandler(new DefaultIOExceptionHandler());
563                }
564                LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
565                getBroker().brokerServiceStarted();
566                checkSystemUsageLimits();
567                startedLatch.countDown();
568            } catch (Exception e) {
569                LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
570                try {
571                    if (!stopped.get()) {
572                        stop();
573                    }
574                } catch (Exception ex) {
575                    LOG.warn("Failed to stop broker after failure in start ", ex);
576                }
577                throw e;
578            } finally {
579                MDC.remove("activemq.broker");
580            }
581        }
582    
583        /**
584         *
585         * @throws Exception
586         * @org.apache .xbean.DestroyMethod
587         */
588        @PreDestroy
589        public void stop() throws Exception {
590            if (!started.get()) {
591                return;
592            }
593    
594            MDC.put("activemq.broker", brokerName);
595    
596            if (systemExitOnShutdown) {
597                new Thread() {
598                    @Override
599                    public void run() {
600                        System.exit(systemExitOnShutdownExitCode);
601                    }
602                }.start();
603            }
604    
605            LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
606            removeShutdownHook();
607            if (this.scheduler != null) {
608                this.scheduler.stop();
609                this.scheduler = null;
610            }
611            ServiceStopper stopper = new ServiceStopper();
612            if (services != null) {
613                for (Service service : services) {
614                    stopper.stop(service);
615                }
616            }
617            stopAllConnectors(stopper);
618            // remove any VMTransports connected
619            // this has to be done after services are stopped,
620            // to avoid timimg issue with discovery (spinning up a new instance)
621            BrokerRegistry.getInstance().unbind(getBrokerName());
622            VMTransportFactory.stopped(getBrokerName());
623            if (broker != null) {
624                stopper.stop(broker);
625                broker = null;
626            }
627    
628            if (tempDataStore != null) {
629                tempDataStore.stop();
630                tempDataStore = null;
631            }
632            try {
633                stopper.stop(persistenceAdapter);
634                persistenceAdapter = null;
635                slave = true;
636                if (isUseJmx()) {
637                    stopper.stop(getManagementContext());
638                    managementContext = null;
639                }
640                // Clear SelectorParser cache to free memory
641                SelectorParser.clearCache();
642            } finally {
643                stopped.set(true);
644                stoppedLatch.countDown();
645            }
646            if (masterConnectorURI == null) {
647                // master start has not finished yet
648                if (slaveStartSignal.getCount() == 1) {
649                    started.set(false);
650                    slaveStartSignal.countDown();
651                }
652            } else {
653                for (Service service : services) {
654                    if (service instanceof MasterConnector) {
655                        MasterConnector mConnector = (MasterConnector) service;
656                        if (!mConnector.isSlave()) {
657                            // means should be slave but not connected to master yet
658                            started.set(false);
659                            mConnector.stopBeforeConnected();
660                        }
661                    }
662                }
663            }
664            if (this.taskRunnerFactory != null) {
665                this.taskRunnerFactory.shutdown();
666                this.taskRunnerFactory = null;
667            }
668            if (this.executor != null) {
669                this.executor.shutdownNow();
670                this.executor = null;
671            }
672    
673            this.destinationInterceptors = null;
674            this.destinationFactory = null;
675    
676            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
677            synchronized (shutdownHooks) {
678                for (Runnable hook : shutdownHooks) {
679                    try {
680                        hook.run();
681                    } catch (Throwable e) {
682                        stopper.onException(hook, e);
683                    }
684                }
685            }
686    
687            MDC.remove("activemq.broker");
688    
689            stopper.throwFirstException();
690        }
691    
692        public boolean checkQueueSize(String queueName) {
693            long count = 0;
694            long queueSize = 0;
695            Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
696            for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
697                if (entry.getKey().isQueue()) {
698                    if (entry.getValue().getName().matches(queueName)) {
699                        queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
700                        count += queueSize;
701                        if (queueSize > 0) {
702                            LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
703                                    + queueSize);
704                        }
705                    }
706                }
707            }
708            return count == 0;
709        }
710    
711        /**
712         * This method (both connectorName and queueName are using regex to match)
713         * 1. stop the connector (supposed the user input the connector which the
714         * clients connect to) 2. to check whether there is any pending message on
715         * the queues defined by queueName 3. supposedly, after stop the connector,
716         * client should failover to other broker and pending messages should be
717         * forwarded. if no pending messages, the method finally call stop to stop
718         * the broker.
719         *
720         * @param connectorName
721         * @param queueName
722         * @param timeout
723         * @param pollInterval
724         * @throws Exception
725         */
726        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
727                throws Exception {
728            if (isUseJmx()) {
729                if (connectorName == null || queueName == null || timeout <= 0) {
730                    throw new Exception(
731                            "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
732                }
733                if (pollInterval <= 0) {
734                    pollInterval = 30;
735                }
736                LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
737                        + timeout + " pollInterval:" + pollInterval);
738                TransportConnector connector;
739                for (int i = 0; i < transportConnectors.size(); i++) {
740                    connector = transportConnectors.get(i);
741                    if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
742                        connector.stop();
743                    }
744                }
745                long start = System.currentTimeMillis();
746                while (System.currentTimeMillis() - start < timeout * 1000) {
747                    // check quesize until it gets zero
748                    if (checkQueueSize(queueName)) {
749                        stop();
750                        break;
751                    } else {
752                        Thread.sleep(pollInterval * 1000);
753                    }
754                }
755                if (stopped.get()) {
756                    LOG.info("Successfully stop the broker.");
757                } else {
758                    LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
759                }
760            }
761        }
762    
763        /**
764         * A helper method to block the caller thread until the broker has been
765         * stopped
766         */
767        public void waitUntilStopped() {
768            while (isStarted() && !stopped.get()) {
769                try {
770                    stoppedLatch.await();
771                } catch (InterruptedException e) {
772                    // ignore
773                }
774            }
775        }
776    
777        /**
778         * A helper method to block the caller thread until the broker has fully started
779         * @return boolean true if wait succeeded false if broker was not started or was stopped
780         */
781        public boolean waitUntilStarted() {
782            boolean waitSucceeded = false;
783            while (isStarted() && !stopped.get() && !waitSucceeded) {
784                try {
785                    waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
786                } catch (InterruptedException ignore) {
787                }
788            }
789            return waitSucceeded;
790        }
791    
792        // Properties
793        // -------------------------------------------------------------------------
794        /**
795         * Returns the message broker
796         */
797        public Broker getBroker() throws Exception {
798            if (broker == null) {
799                LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
800                        + getBrokerName() + ") is starting");
801                LOG.info("For help or more information please see: http://activemq.apache.org/");
802                broker = createBroker();
803            }
804            return broker;
805        }
806    
807        /**
808         * Returns the administration view of the broker; used to create and destroy
809         * resources such as queues and topics. Note this method returns null if JMX
810         * is disabled.
811         */
812        public BrokerView getAdminView() throws Exception {
813            if (adminView == null) {
814                // force lazy creation
815                getBroker();
816            }
817            return adminView;
818        }
819    
820        public void setAdminView(BrokerView adminView) {
821            this.adminView = adminView;
822        }
823    
824        public String getBrokerName() {
825            return brokerName;
826        }
827    
828        /**
829         * Sets the name of this broker; which must be unique in the network
830         *
831         * @param brokerName
832         */
833        public void setBrokerName(String brokerName) {
834            if (brokerName == null) {
835                throw new NullPointerException("The broker name cannot be null");
836            }
837            String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
838            if (!str.equals(brokerName)) {
839                LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
840            }
841            this.brokerName = str.trim();
842        }
843    
844        public PersistenceAdapterFactory getPersistenceFactory() {
845            return persistenceFactory;
846        }
847    
848        public File getDataDirectoryFile() {
849            if (dataDirectoryFile == null) {
850                dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
851            }
852            return dataDirectoryFile;
853        }
854    
855        public File getBrokerDataDirectory() {
856            String brokerDir = getBrokerName();
857            return new File(getDataDirectoryFile(), brokerDir);
858        }
859    
860        /**
861         * Sets the directory in which the data files will be stored by default for
862         * the JDBC and Journal persistence adaptors.
863         *
864         * @param dataDirectory
865         *            the directory to store data files
866         */
867        public void setDataDirectory(String dataDirectory) {
868            setDataDirectoryFile(new File(dataDirectory));
869        }
870    
871        /**
872         * Sets the directory in which the data files will be stored by default for
873         * the JDBC and Journal persistence adaptors.
874         *
875         * @param dataDirectoryFile
876         *            the directory to store data files
877         */
878        public void setDataDirectoryFile(File dataDirectoryFile) {
879            this.dataDirectoryFile = dataDirectoryFile;
880        }
881    
882        /**
883         * @return the tmpDataDirectory
884         */
885        public File getTmpDataDirectory() {
886            if (tmpDataDirectory == null) {
887                tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
888            }
889            return tmpDataDirectory;
890        }
891    
892        /**
893         * @param tmpDataDirectory
894         *            the tmpDataDirectory to set
895         */
896        public void setTmpDataDirectory(File tmpDataDirectory) {
897            this.tmpDataDirectory = tmpDataDirectory;
898        }
899    
900        public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
901            this.persistenceFactory = persistenceFactory;
902        }
903    
904        public void setDestinationFactory(DestinationFactory destinationFactory) {
905            this.destinationFactory = destinationFactory;
906        }
907    
908        public boolean isPersistent() {
909            return persistent;
910        }
911    
912        /**
913         * Sets whether or not persistence is enabled or disabled.
914         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
915         */
916        public void setPersistent(boolean persistent) {
917            this.persistent = persistent;
918        }
919    
920        public boolean isPopulateJMSXUserID() {
921            return populateJMSXUserID;
922        }
923    
924        /**
925         * Sets whether or not the broker should populate the JMSXUserID header.
926         */
927        public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
928            this.populateJMSXUserID = populateJMSXUserID;
929        }
930    
931        public SystemUsage getSystemUsage() {
932            try {
933                if (systemUsage == null) {
934                    systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
935                    systemUsage.setExecutor(getExecutor());
936                    systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
937                                                                             // 64
938                                                                             // Meg
939                    systemUsage.getTempUsage().setLimit(1024L * 1024 * 1000 * 50); // 50
940                                                                                    // Gb
941                    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1000 * 100); // 100
942                                                                                     // GB
943                    addService(this.systemUsage);
944                }
945                return systemUsage;
946            } catch (IOException e) {
947                LOG.error("Cannot create SystemUsage", e);
948                throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
949            }
950        }
951    
952        public void setSystemUsage(SystemUsage memoryManager) {
953            if (this.systemUsage != null) {
954                removeService(this.systemUsage);
955            }
956            this.systemUsage = memoryManager;
957            if (this.systemUsage.getExecutor()==null) {
958                this.systemUsage.setExecutor(getExecutor());
959            }
960            addService(this.systemUsage);
961        }
962    
963        /**
964         * @return the consumerUsageManager
965         * @throws IOException
966         */
967        public SystemUsage getConsumerSystemUsage() throws IOException {
968            if (this.consumerSystemUsaage == null) {
969                if (splitSystemUsageForProducersConsumers) {
970                    this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
971                    float portion = consumerSystemUsagePortion / 100f;
972                    this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
973                    addService(this.consumerSystemUsaage);
974                } else {
975                    consumerSystemUsaage = getSystemUsage();
976                }
977            }
978            return this.consumerSystemUsaage;
979        }
980    
981        /**
982         * @param consumerSystemUsaage
983         *            the storeSystemUsage to set
984         */
985        public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
986            if (this.consumerSystemUsaage != null) {
987                removeService(this.consumerSystemUsaage);
988            }
989            this.consumerSystemUsaage = consumerSystemUsaage;
990            addService(this.consumerSystemUsaage);
991        }
992    
993        /**
994         * @return the producerUsageManager
995         * @throws IOException
996         */
997        public SystemUsage getProducerSystemUsage() throws IOException {
998            if (producerSystemUsage == null) {
999                if (splitSystemUsageForProducersConsumers) {
1000                    producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1001                    float portion = producerSystemUsagePortion / 100f;
1002                    producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1003                    addService(producerSystemUsage);
1004                } else {
1005                    producerSystemUsage = getSystemUsage();
1006                }
1007            }
1008            return producerSystemUsage;
1009        }
1010    
1011        /**
1012         * @param producerUsageManager
1013         *            the producerUsageManager to set
1014         */
1015        public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1016            if (this.producerSystemUsage != null) {
1017                removeService(this.producerSystemUsage);
1018            }
1019            this.producerSystemUsage = producerUsageManager;
1020            addService(this.producerSystemUsage);
1021        }
1022    
1023        public PersistenceAdapter getPersistenceAdapter() throws IOException {
1024            if (persistenceAdapter == null) {
1025                persistenceAdapter = createPersistenceAdapter();
1026                configureService(persistenceAdapter);
1027                this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1028            }
1029            return persistenceAdapter;
1030        }
1031    
1032        /**
1033         * Sets the persistence adaptor implementation to use for this broker
1034         *
1035         * @throws IOException
1036         */
1037        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1038            this.persistenceAdapter = persistenceAdapter;
1039            configureService(this.persistenceAdapter);
1040            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1041        }
1042    
1043        public TaskRunnerFactory getTaskRunnerFactory() {
1044            if (this.taskRunnerFactory == null) {
1045                this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1046                        isDedicatedTaskRunner());
1047            }
1048            return this.taskRunnerFactory;
1049        }
1050    
1051        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1052            this.taskRunnerFactory = taskRunnerFactory;
1053        }
1054    
1055        public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1056            if (taskRunnerFactory == null) {
1057                persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1058                        true, 1000, isDedicatedTaskRunner());
1059            }
1060            return persistenceTaskRunnerFactory;
1061        }
1062    
1063        public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1064            this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1065        }
1066    
1067        public boolean isUseJmx() {
1068            return useJmx;
1069        }
1070    
1071        public boolean isEnableStatistics() {
1072            return enableStatistics;
1073        }
1074    
1075        /**
1076         * Sets whether or not the Broker's services enable statistics or not.
1077         */
1078        public void setEnableStatistics(boolean enableStatistics) {
1079            this.enableStatistics = enableStatistics;
1080        }
1081    
1082        /**
1083         * Sets whether or not the Broker's services should be exposed into JMX or
1084         * not.
1085         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1086         */
1087        public void setUseJmx(boolean useJmx) {
1088            this.useJmx = useJmx;
1089        }
1090    
1091        public ObjectName getBrokerObjectName() throws IOException {
1092            if (brokerObjectName == null) {
1093                brokerObjectName = createBrokerObjectName();
1094            }
1095            return brokerObjectName;
1096        }
1097    
1098        /**
1099         * Sets the JMX ObjectName for this broker
1100         */
1101        public void setBrokerObjectName(ObjectName brokerObjectName) {
1102            this.brokerObjectName = brokerObjectName;
1103        }
1104    
1105        public ManagementContext getManagementContext() {
1106            if (managementContext == null) {
1107                managementContext = new ManagementContext();
1108            }
1109            return managementContext;
1110        }
1111    
1112        public void setManagementContext(ManagementContext managementContext) {
1113            this.managementContext = managementContext;
1114        }
1115    
1116        public NetworkConnector getNetworkConnectorByName(String connectorName) {
1117            for (NetworkConnector connector : networkConnectors) {
1118                if (connector.getName().equals(connectorName)) {
1119                    return connector;
1120                }
1121            }
1122            return null;
1123        }
1124    
1125        public String[] getNetworkConnectorURIs() {
1126            return networkConnectorURIs;
1127        }
1128    
1129        public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1130            this.networkConnectorURIs = networkConnectorURIs;
1131        }
1132    
1133        public TransportConnector getConnectorByName(String connectorName) {
1134            for (TransportConnector connector : transportConnectors) {
1135                if (connector.getName().equals(connectorName)) {
1136                    return connector;
1137                }
1138            }
1139            return null;
1140        }
1141    
1142        public Map<String, String> getTransportConnectorURIsAsMap() {
1143            Map<String, String> answer = new HashMap<String, String>();
1144            for (TransportConnector connector : transportConnectors) {
1145                try {
1146                    URI uri = connector.getConnectUri();
1147                    if (uri != null) {
1148                        String scheme = uri.getScheme();
1149                        if (scheme != null) {
1150                            answer.put(scheme.toLowerCase(), uri.toString());
1151                        }
1152                    }
1153                } catch (Exception e) {
1154                    LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1155                }
1156            }
1157            return answer;
1158        }
1159    
1160        public String[] getTransportConnectorURIs() {
1161            return transportConnectorURIs;
1162        }
1163    
1164        public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1165            this.transportConnectorURIs = transportConnectorURIs;
1166        }
1167    
1168        /**
1169         * @return Returns the jmsBridgeConnectors.
1170         */
1171        public JmsConnector[] getJmsBridgeConnectors() {
1172            return jmsBridgeConnectors;
1173        }
1174    
1175        /**
1176         * @param jmsConnectors
1177         *            The jmsBridgeConnectors to set.
1178         */
1179        public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1180            this.jmsBridgeConnectors = jmsConnectors;
1181        }
1182    
1183        public Service[] getServices() {
1184            return services.toArray(new Service[0]);
1185        }
1186    
1187        /**
1188         * Sets the services associated with this broker such as a
1189         * {@link MasterConnector}
1190         */
1191        public void setServices(Service[] services) {
1192            this.services.clear();
1193            if (services != null) {
1194                for (int i = 0; i < services.length; i++) {
1195                    this.services.add(services[i]);
1196                }
1197            }
1198        }
1199    
1200        /**
1201         * Adds a new service so that it will be started as part of the broker
1202         * lifecycle
1203         */
1204        public void addService(Service service) {
1205            services.add(service);
1206        }
1207    
1208        public void removeService(Service service) {
1209            services.remove(service);
1210        }
1211    
1212        public boolean isUseLoggingForShutdownErrors() {
1213            return useLoggingForShutdownErrors;
1214        }
1215    
1216        /**
1217         * Sets whether or not we should use commons-logging when reporting errors
1218         * when shutting down the broker
1219         */
1220        public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1221            this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1222        }
1223    
1224        public boolean isUseShutdownHook() {
1225            return useShutdownHook;
1226        }
1227    
1228        /**
1229         * Sets whether or not we should use a shutdown handler to close down the
1230         * broker cleanly if the JVM is terminated. It is recommended you leave this
1231         * enabled.
1232         */
1233        public void setUseShutdownHook(boolean useShutdownHook) {
1234            this.useShutdownHook = useShutdownHook;
1235        }
1236    
1237        public boolean isAdvisorySupport() {
1238            return advisorySupport;
1239        }
1240    
1241        /**
1242         * Allows the support of advisory messages to be disabled for performance
1243         * reasons.
1244         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1245         */
1246        public void setAdvisorySupport(boolean advisorySupport) {
1247            this.advisorySupport = advisorySupport;
1248        }
1249    
1250        public List<TransportConnector> getTransportConnectors() {
1251            return new ArrayList<TransportConnector>(transportConnectors);
1252        }
1253    
1254        /**
1255         * Sets the transport connectors which this broker will listen on for new
1256         * clients
1257         *
1258         * @org.apache.xbean.Property
1259         *                            nestedType="org.apache.activemq.broker.TransportConnector"
1260         */
1261        public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1262            for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1263                TransportConnector connector = iter.next();
1264                addConnector(connector);
1265            }
1266        }
1267    
1268        public TransportConnector getTransportConnectorByName(String name){
1269            for (TransportConnector transportConnector:transportConnectors){
1270               if (name.equals(transportConnector.getName())){
1271                   return transportConnector;
1272               }
1273            }
1274            return null;
1275        }
1276    
1277        public TransportConnector getTransportConnectorByScheme(String scheme){
1278            for (TransportConnector transportConnector:transportConnectors){
1279                if (scheme.equals(transportConnector.getUri().getScheme())){
1280                    return transportConnector;
1281                }
1282            }
1283            return null;
1284        }
1285    
1286        public List<NetworkConnector> getNetworkConnectors() {
1287            return new ArrayList<NetworkConnector>(networkConnectors);
1288        }
1289    
1290        public List<ProxyConnector> getProxyConnectors() {
1291            return new ArrayList<ProxyConnector>(proxyConnectors);
1292        }
1293    
1294        /**
1295         * Sets the network connectors which this broker will use to connect to
1296         * other brokers in a federated network
1297         *
1298         * @org.apache.xbean.Property
1299         *                            nestedType="org.apache.activemq.network.NetworkConnector"
1300         */
1301        public void setNetworkConnectors(List networkConnectors) throws Exception {
1302            for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1303                NetworkConnector connector = (NetworkConnector) iter.next();
1304                addNetworkConnector(connector);
1305            }
1306        }
1307    
1308        /**
1309         * Sets the network connectors which this broker will use to connect to
1310         * other brokers in a federated network
1311         */
1312        public void setProxyConnectors(List proxyConnectors) throws Exception {
1313            for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1314                ProxyConnector connector = (ProxyConnector) iter.next();
1315                addProxyConnector(connector);
1316            }
1317        }
1318    
1319        public PolicyMap getDestinationPolicy() {
1320            return destinationPolicy;
1321        }
1322    
1323        /**
1324         * Sets the destination specific policies available either for exact
1325         * destinations or for wildcard areas of destinations.
1326         */
1327        public void setDestinationPolicy(PolicyMap policyMap) {
1328            this.destinationPolicy = policyMap;
1329        }
1330    
1331        public BrokerPlugin[] getPlugins() {
1332            return plugins;
1333        }
1334    
1335        /**
1336         * Sets a number of broker plugins to install such as for security
1337         * authentication or authorization
1338         */
1339        public void setPlugins(BrokerPlugin[] plugins) {
1340            this.plugins = plugins;
1341        }
1342    
1343        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1344            return messageAuthorizationPolicy;
1345        }
1346    
1347        /**
1348         * Sets the policy used to decide if the current connection is authorized to
1349         * consume a given message
1350         */
1351        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1352            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1353        }
1354    
1355        /**
1356         * Delete all messages from the persistent store
1357         *
1358         * @throws IOException
1359         */
1360        public void deleteAllMessages() throws IOException {
1361            getPersistenceAdapter().deleteAllMessages();
1362        }
1363    
1364        public boolean isDeleteAllMessagesOnStartup() {
1365            return deleteAllMessagesOnStartup;
1366        }
1367    
1368        /**
1369         * Sets whether or not all messages are deleted on startup - mostly only
1370         * useful for testing.
1371         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1372         */
1373        public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1374            this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1375        }
1376    
1377        public URI getVmConnectorURI() {
1378            if (vmConnectorURI == null) {
1379                try {
1380                    vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1381                } catch (URISyntaxException e) {
1382                    LOG.error("Badly formed URI from " + getBrokerName(), e);
1383                }
1384            }
1385            return vmConnectorURI;
1386        }
1387    
1388        public void setVmConnectorURI(URI vmConnectorURI) {
1389            this.vmConnectorURI = vmConnectorURI;
1390        }
1391    
1392        public String getDefaultSocketURIString() {
1393    
1394                if (started.get()) {
1395                    if (this.defaultSocketURIString == null) {
1396                        for (TransportConnector tc:this.transportConnectors) {
1397                            String result = null;
1398                            try {
1399                                result = tc.getPublishableConnectString();
1400                            } catch (Exception e) {
1401                              LOG.warn("Failed to get the ConnectURI for "+tc,e);
1402                            }
1403                            if (result != null) {
1404                                // find first publishable uri
1405                                if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1406                                    this.defaultSocketURIString = result;
1407                                    break;
1408                                } else {
1409                                // or use the first defined
1410                                    if (this.defaultSocketURIString == null) {
1411                                        this.defaultSocketURIString = result;
1412                                    }
1413                                }
1414                            }
1415                        }
1416    
1417                    }
1418                    return this.defaultSocketURIString;
1419                }
1420           return null;
1421        }
1422    
1423        /**
1424         * @return Returns the shutdownOnMasterFailure.
1425         */
1426        public boolean isShutdownOnMasterFailure() {
1427            return shutdownOnMasterFailure;
1428        }
1429    
1430        /**
1431         * @param shutdownOnMasterFailure
1432         *            The shutdownOnMasterFailure to set.
1433         */
1434        public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1435            this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1436        }
1437    
1438        public boolean isKeepDurableSubsActive() {
1439            return keepDurableSubsActive;
1440        }
1441    
1442        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1443            this.keepDurableSubsActive = keepDurableSubsActive;
1444        }
1445    
1446        public boolean isUseVirtualTopics() {
1447            return useVirtualTopics;
1448        }
1449    
1450        /**
1451         * Sets whether or not <a
1452         * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1453         * Topics</a> should be supported by default if they have not been
1454         * explicitly configured.
1455         */
1456        public void setUseVirtualTopics(boolean useVirtualTopics) {
1457            this.useVirtualTopics = useVirtualTopics;
1458        }
1459    
1460        public DestinationInterceptor[] getDestinationInterceptors() {
1461            return destinationInterceptors;
1462        }
1463    
1464        public boolean isUseMirroredQueues() {
1465            return useMirroredQueues;
1466        }
1467    
1468        /**
1469         * Sets whether or not <a
1470         * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1471         * Queues</a> should be supported by default if they have not been
1472         * explicitly configured.
1473         */
1474        public void setUseMirroredQueues(boolean useMirroredQueues) {
1475            this.useMirroredQueues = useMirroredQueues;
1476        }
1477    
1478        /**
1479         * Sets the destination interceptors to use
1480         */
1481        public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1482            this.destinationInterceptors = destinationInterceptors;
1483        }
1484    
1485        public ActiveMQDestination[] getDestinations() {
1486            return destinations;
1487        }
1488    
1489        /**
1490         * Sets the destinations which should be loaded/created on startup
1491         */
1492        public void setDestinations(ActiveMQDestination[] destinations) {
1493            this.destinations = destinations;
1494        }
1495    
1496        /**
1497         * @return the tempDataStore
1498         */
1499        public synchronized PListStore getTempDataStore() {
1500            if (tempDataStore == null) {
1501                if (!isPersistent()) {
1502                    return null;
1503                }
1504                boolean result = true;
1505                boolean empty = true;
1506                try {
1507                    File directory = getTmpDataDirectory();
1508                    if (directory.exists() && directory.isDirectory()) {
1509                        File[] files = directory.listFiles();
1510                        if (files != null && files.length > 0) {
1511                            empty = false;
1512                            for (int i = 0; i < files.length; i++) {
1513                                File file = files[i];
1514                                if (!file.isDirectory()) {
1515                                    result &= file.delete();
1516                                }
1517                            }
1518                        }
1519                    }
1520                    if (!empty) {
1521                        String str = result ? "Successfully deleted" : "Failed to delete";
1522                        LOG.info(str + " temporary storage");
1523                    }
1524                    this.tempDataStore = new PListStore();
1525                    this.tempDataStore.setDirectory(getTmpDataDirectory());
1526                    configureService(tempDataStore);
1527                    this.tempDataStore.start();
1528                } catch (Exception e) {
1529                    throw new RuntimeException(e);
1530                }
1531            }
1532            return tempDataStore;
1533        }
1534    
1535        /**
1536         * @param tempDataStore
1537         *            the tempDataStore to set
1538         */
1539        public void setTempDataStore(PListStore tempDataStore) {
1540            this.tempDataStore = tempDataStore;
1541            configureService(tempDataStore);
1542            try {
1543                tempDataStore.start();
1544            } catch (Exception e) {
1545                RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1546                LOG.error(exception.getLocalizedMessage(), e);
1547                throw exception;
1548            }
1549        }
1550    
1551        public int getPersistenceThreadPriority() {
1552            return persistenceThreadPriority;
1553        }
1554    
1555        public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1556            this.persistenceThreadPriority = persistenceThreadPriority;
1557        }
1558    
1559        /**
1560         * @return the useLocalHostBrokerName
1561         */
1562        public boolean isUseLocalHostBrokerName() {
1563            return this.useLocalHostBrokerName;
1564        }
1565    
1566        /**
1567         * @param useLocalHostBrokerName
1568         *            the useLocalHostBrokerName to set
1569         */
1570        public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1571            this.useLocalHostBrokerName = useLocalHostBrokerName;
1572            if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1573                brokerName = LOCAL_HOST_NAME;
1574            }
1575        }
1576    
1577        /**
1578         * @return the supportFailOver
1579         */
1580        public boolean isSupportFailOver() {
1581            return this.supportFailOver;
1582        }
1583    
1584        /**
1585         * @param supportFailOver
1586         *            the supportFailOver to set
1587         */
1588        public void setSupportFailOver(boolean supportFailOver) {
1589            this.supportFailOver = supportFailOver;
1590        }
1591    
1592        /**
1593         * Looks up and lazily creates if necessary the destination for the given
1594         * JMS name
1595         */
1596        public Destination getDestination(ActiveMQDestination destination) throws Exception {
1597            return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1598        }
1599    
1600        public void removeDestination(ActiveMQDestination destination) throws Exception {
1601            getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1602        }
1603    
1604        public int getProducerSystemUsagePortion() {
1605            return producerSystemUsagePortion;
1606        }
1607    
1608        public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1609            this.producerSystemUsagePortion = producerSystemUsagePortion;
1610        }
1611    
1612        public int getConsumerSystemUsagePortion() {
1613            return consumerSystemUsagePortion;
1614        }
1615    
1616        public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1617            this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1618        }
1619    
1620        public boolean isSplitSystemUsageForProducersConsumers() {
1621            return splitSystemUsageForProducersConsumers;
1622        }
1623    
1624        public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1625            this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1626        }
1627    
1628        public boolean isMonitorConnectionSplits() {
1629            return monitorConnectionSplits;
1630        }
1631    
1632        public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1633            this.monitorConnectionSplits = monitorConnectionSplits;
1634        }
1635    
1636        public int getTaskRunnerPriority() {
1637            return taskRunnerPriority;
1638        }
1639    
1640        public void setTaskRunnerPriority(int taskRunnerPriority) {
1641            this.taskRunnerPriority = taskRunnerPriority;
1642        }
1643    
1644        public boolean isDedicatedTaskRunner() {
1645            return dedicatedTaskRunner;
1646        }
1647    
1648        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1649            this.dedicatedTaskRunner = dedicatedTaskRunner;
1650        }
1651    
1652        public boolean isCacheTempDestinations() {
1653            return cacheTempDestinations;
1654        }
1655    
1656        public void setCacheTempDestinations(boolean cacheTempDestinations) {
1657            this.cacheTempDestinations = cacheTempDestinations;
1658        }
1659    
1660        public int getTimeBeforePurgeTempDestinations() {
1661            return timeBeforePurgeTempDestinations;
1662        }
1663    
1664        public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1665            this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1666        }
1667    
1668        public boolean isUseTempMirroredQueues() {
1669            return useTempMirroredQueues;
1670        }
1671    
1672        public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1673            this.useTempMirroredQueues = useTempMirroredQueues;
1674        }
1675    
1676        //
1677        // Implementation methods
1678        // -------------------------------------------------------------------------
1679        /**
1680         * Handles any lazy-creation helper properties which are added to make
1681         * things easier to configure inside environments such as Spring
1682         *
1683         * @throws Exception
1684         */
1685        protected void processHelperProperties() throws Exception {
1686            boolean masterServiceExists = false;
1687            if (transportConnectorURIs != null) {
1688                for (int i = 0; i < transportConnectorURIs.length; i++) {
1689                    String uri = transportConnectorURIs[i];
1690                    addConnector(uri);
1691                }
1692            }
1693            if (networkConnectorURIs != null) {
1694                for (int i = 0; i < networkConnectorURIs.length; i++) {
1695                    String uri = networkConnectorURIs[i];
1696                    addNetworkConnector(uri);
1697                }
1698            }
1699            if (jmsBridgeConnectors != null) {
1700                for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1701                    addJmsConnector(jmsBridgeConnectors[i]);
1702                }
1703            }
1704            for (Service service : services) {
1705                if (service instanceof MasterConnector) {
1706                    masterServiceExists = true;
1707                    break;
1708                }
1709            }
1710            if (masterConnectorURI != null) {
1711                if (masterServiceExists) {
1712                    throw new IllegalStateException(
1713                            "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1714                } else {
1715                    addService(new MasterConnector(masterConnectorURI));
1716                }
1717            }
1718        }
1719    
1720        protected void checkSystemUsageLimits() throws IOException {
1721            SystemUsage usage = getSystemUsage();
1722            long memLimit = usage.getMemoryUsage().getLimit();
1723            long jvmLimit = Runtime.getRuntime().maxMemory();
1724    
1725            if (memLimit > jvmLimit) {
1726                LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1727                          " mb) is more than the maximum available for the JVM: " +
1728                          jvmLimit / (1024 * 1024) + " mb");
1729            }
1730    
1731            if (getPersistenceAdapter() != null) {
1732                PersistenceAdapter adapter = getPersistenceAdapter();
1733                File dir = adapter.getDirectory();
1734    
1735                if (dir != null) {
1736                    String dirPath = dir.getAbsolutePath();
1737                    if (!dir.isAbsolute()) {
1738                        dir = new File(dirPath);
1739                    }
1740    
1741                    while (dir != null && dir.isDirectory() == false) {
1742                        dir = dir.getParentFile();
1743                    }
1744                    long storeLimit = usage.getStoreUsage().getLimit();
1745                    long dirFreeSpace = dir.getUsableSpace();
1746                    if (storeLimit > dirFreeSpace) {
1747                        LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1748                                 " mb, whilst the data directory: " + dir.getAbsolutePath() +
1749                                 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1750                    }
1751                }
1752    
1753                long maxJournalFileSize = 0;
1754                long storeLimit = usage.getStoreUsage().getLimit();
1755    
1756                if (adapter instanceof KahaDBPersistenceAdapter) {
1757                    KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
1758                    maxJournalFileSize = kahaDB.getJournalMaxFileLength();
1759                } else if (adapter instanceof AMQPersistenceAdapter) {
1760                    AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
1761                    maxJournalFileSize = amqAdapter.getMaxFileLength();
1762                }
1763    
1764                if (storeLimit < maxJournalFileSize) {
1765                    LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
1766                              " mb, whilst the max journal file size for the store is: " +
1767                              maxJournalFileSize / (1024 * 1024) + " mb, " +
1768                              "the store will not accept any data when used.");
1769                }
1770            }
1771    
1772            File tmpDir = getTmpDataDirectory();
1773            if (tmpDir != null) {
1774    
1775                String tmpDirPath = tmpDir.getAbsolutePath();
1776                if (!tmpDir.isAbsolute()) {
1777                    tmpDir = new File(tmpDirPath);
1778                }
1779    
1780                long storeLimit = usage.getTempUsage().getLimit();
1781                while (tmpDir != null && tmpDir.isDirectory() == false) {
1782                    tmpDir = tmpDir.getParentFile();
1783                }
1784                long dirFreeSpace = tmpDir.getUsableSpace();
1785                if (storeLimit > dirFreeSpace) {
1786                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1787                              " mb, whilst the temporary data directory: " + tmpDirPath +
1788                              " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1789                }
1790    
1791                long maxJournalFileSize;
1792    
1793                if (usage.getTempUsage().getStore() != null) {
1794                    maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
1795                } else {
1796                    maxJournalFileSize = org.apache.kahadb.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
1797                }
1798    
1799                if (storeLimit < maxJournalFileSize) {
1800                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1801                              " mb, whilst the max journal file size for the temporary store is: " +
1802                              maxJournalFileSize / (1024 * 1024) + " mb, " +
1803                              "the temp store will not accept any data when used.");
1804                }
1805            }
1806        }
1807    
1808        public void stopAllConnectors(ServiceStopper stopper) {
1809            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1810                NetworkConnector connector = iter.next();
1811                unregisterNetworkConnectorMBean(connector);
1812                stopper.stop(connector);
1813            }
1814            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1815                ProxyConnector connector = iter.next();
1816                stopper.stop(connector);
1817            }
1818            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1819                JmsConnector connector = iter.next();
1820                stopper.stop(connector);
1821            }
1822            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1823                TransportConnector connector = iter.next();
1824                stopper.stop(connector);
1825            }
1826        }
1827    
1828        protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1829            try {
1830                ObjectName objectName = createConnectorObjectName(connector);
1831                connector = connector.asManagedConnector(getManagementContext(), objectName);
1832                ConnectorViewMBean view = new ConnectorView(connector);
1833                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1834                return connector;
1835            } catch (Throwable e) {
1836                throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1837            }
1838        }
1839    
1840        protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1841            if (isUseJmx()) {
1842                try {
1843                    ObjectName objectName = createConnectorObjectName(connector);
1844                    getManagementContext().unregisterMBean(objectName);
1845                } catch (Throwable e) {
1846                    throw IOExceptionSupport.create(
1847                            "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1848                }
1849            }
1850        }
1851    
1852        protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1853            return adaptor;
1854        }
1855    
1856        protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1857            if (isUseJmx()) {
1858            }
1859        }
1860    
1861        private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1862            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1863                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1864                    + JMXSupport.encodeObjectNamePart(connector.getName()));
1865        }
1866    
1867        protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1868            NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1869            try {
1870                ObjectName objectName = createNetworkConnectorObjectName(connector);
1871                connector.setObjectName(objectName);
1872                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1873            } catch (Throwable e) {
1874                throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1875            }
1876        }
1877    
1878        protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1879                throws MalformedObjectNameException {
1880            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1881                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1882                    + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1883        }
1884    
1885    
1886        public ObjectName createDuplexNetworkConnectorObjectName(String transport)
1887                throws MalformedObjectNameException {
1888            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1889                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1890                    + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
1891        }
1892    
1893        protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1894            if (isUseJmx()) {
1895                try {
1896                    ObjectName objectName = createNetworkConnectorObjectName(connector);
1897                    getManagementContext().unregisterMBean(objectName);
1898                } catch (Exception e) {
1899                    LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1900                }
1901            }
1902        }
1903    
1904        protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1905            ProxyConnectorView view = new ProxyConnectorView(connector);
1906            try {
1907                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1908                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1909                        + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1910                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1911            } catch (Throwable e) {
1912                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1913            }
1914        }
1915    
1916        protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1917            FTConnectorView view = new FTConnectorView(connector);
1918            try {
1919                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1920                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1921                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1922            } catch (Throwable e) {
1923                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1924            }
1925        }
1926    
1927        protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1928            JmsConnectorView view = new JmsConnectorView(connector);
1929            try {
1930                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1931                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1932                        + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1933                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1934            } catch (Throwable e) {
1935                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1936            }
1937        }
1938    
1939        /**
1940         * Factory method to create a new broker
1941         *
1942         * @throws Exception
1943         * @throws
1944         * @throws
1945         */
1946        protected Broker createBroker() throws Exception {
1947            regionBroker = createRegionBroker();
1948            Broker broker = addInterceptors(regionBroker);
1949            // Add a filter that will stop access to the broker once stopped
1950            broker = new MutableBrokerFilter(broker) {
1951                Broker old;
1952    
1953                @Override
1954                public void stop() throws Exception {
1955                    old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1956                        // Just ignore additional stop actions.
1957                        @Override
1958                        public void stop() throws Exception {
1959                        }
1960                    });
1961                    old.stop();
1962                }
1963    
1964                @Override
1965                public void start() throws Exception {
1966                    if (forceStart && old != null) {
1967                        this.next.set(old);
1968                    }
1969                    getNext().start();
1970                }
1971            };
1972            return broker;
1973        }
1974    
1975        /**
1976         * Factory method to create the core region broker onto which interceptors
1977         * are added
1978         *
1979         * @throws Exception
1980         */
1981        protected Broker createRegionBroker() throws Exception {
1982            if (destinationInterceptors == null) {
1983                destinationInterceptors = createDefaultDestinationInterceptor();
1984            }
1985            configureServices(destinationInterceptors);
1986            DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1987            if (destinationFactory == null) {
1988                destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1989            }
1990            return createRegionBroker(destinationInterceptor);
1991        }
1992    
1993        protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1994            RegionBroker regionBroker;
1995            if (isUseJmx()) {
1996                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1997                        getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
1998            } else {
1999                regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2000                        destinationInterceptor,getScheduler(),getExecutor());
2001            }
2002            destinationFactory.setRegionBroker(regionBroker);
2003            regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2004            regionBroker.setBrokerName(getBrokerName());
2005            regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2006            regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2007            if (brokerId != null) {
2008                regionBroker.setBrokerId(brokerId);
2009            }
2010            return regionBroker;
2011        }
2012    
2013        /**
2014         * Create the default destination interceptor
2015         */
2016        protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2017            List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2018            if (isUseVirtualTopics()) {
2019                VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2020                VirtualTopic virtualTopic = new VirtualTopic();
2021                virtualTopic.setName("VirtualTopic.>");
2022                VirtualDestination[] virtualDestinations = { virtualTopic };
2023                interceptor.setVirtualDestinations(virtualDestinations);
2024                answer.add(interceptor);
2025            }
2026            if (isUseMirroredQueues()) {
2027                MirroredQueue interceptor = new MirroredQueue();
2028                answer.add(interceptor);
2029            }
2030            DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2031            answer.toArray(array);
2032            return array;
2033        }
2034    
2035        /**
2036         * Strategy method to add interceptors to the broker
2037         *
2038         * @throws IOException
2039         */
2040        protected Broker addInterceptors(Broker broker) throws Exception {
2041            if (isSchedulerSupport()) {
2042                SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
2043                if (isUseJmx()) {
2044                    JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2045                    try {
2046                        ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
2047                                + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
2048                                + "Type=jobScheduler," + "jobSchedulerName=JMS");
2049    
2050                        AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2051                        this.adminView.setJMSJobScheduler(objectName);
2052                    } catch (Throwable e) {
2053                        throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2054                                + e.getMessage(), e);
2055                    }
2056    
2057                }
2058                broker = sb;
2059            }
2060            if (isAdvisorySupport()) {
2061                broker = new AdvisoryBroker(broker);
2062            }
2063            broker = new CompositeDestinationBroker(broker);
2064            broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2065            if (isPopulateJMSXUserID()) {
2066                UserIDBroker userIDBroker = new UserIDBroker(broker);
2067                userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2068                broker = userIDBroker;
2069            }
2070            if (isMonitorConnectionSplits()) {
2071                broker = new ConnectionSplitBroker(broker);
2072            }
2073            if (plugins != null) {
2074                for (int i = 0; i < plugins.length; i++) {
2075                    BrokerPlugin plugin = plugins[i];
2076                    broker = plugin.installPlugin(broker);
2077                }
2078            }
2079            return broker;
2080        }
2081    
2082        protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2083            if (isPersistent()) {
2084                PersistenceAdapterFactory fac = getPersistenceFactory();
2085                if (fac != null) {
2086                    return fac.createPersistenceAdapter();
2087                }else {
2088                    KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
2089                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2090                    adaptor.setDirectory(dir);
2091                    return adaptor;
2092                }
2093            } else {
2094                return new MemoryPersistenceAdapter();
2095            }
2096        }
2097    
2098        protected ObjectName createBrokerObjectName() throws IOException {
2099            try {
2100                return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2101                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
2102            } catch (Throwable e) {
2103                throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
2104            }
2105        }
2106    
2107        protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2108            TransportServer transport = TransportFactory.bind(this, brokerURI);
2109            return new TransportConnector(transport);
2110        }
2111    
2112        /**
2113         * Extracts the port from the options
2114         */
2115        protected Object getPort(Map options) {
2116            Object port = options.get("port");
2117            if (port == null) {
2118                port = DEFAULT_PORT;
2119                LOG.warn("No port specified so defaulting to: " + port);
2120            }
2121            return port;
2122        }
2123    
2124        protected void addShutdownHook() {
2125            if (useShutdownHook) {
2126                shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2127                    @Override
2128                    public void run() {
2129                        containerShutdown();
2130                    }
2131                };
2132                Runtime.getRuntime().addShutdownHook(shutdownHook);
2133            }
2134        }
2135    
2136        protected void removeShutdownHook() {
2137            if (shutdownHook != null) {
2138                try {
2139                    Runtime.getRuntime().removeShutdownHook(shutdownHook);
2140                } catch (Exception e) {
2141                    LOG.debug("Caught exception, must be shutting down: " + e);
2142                }
2143            }
2144        }
2145    
2146        /**
2147         * Sets hooks to be executed when broker shut down
2148         *
2149         * @org.apache.xbean.Property
2150         */
2151        public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2152            for (Runnable hook : hooks) {
2153                addShutdownHook(hook);
2154            }
2155        }
2156    
2157        /**
2158         * Causes a clean shutdown of the container when the VM is being shut down
2159         */
2160        protected void containerShutdown() {
2161            try {
2162                stop();
2163            } catch (IOException e) {
2164                Throwable linkedException = e.getCause();
2165                if (linkedException != null) {
2166                    logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2167                } else {
2168                    logError("Failed to shut down: " + e, e);
2169                }
2170                if (!useLoggingForShutdownErrors) {
2171                    e.printStackTrace(System.err);
2172                }
2173            } catch (Exception e) {
2174                logError("Failed to shut down: " + e, e);
2175            }
2176        }
2177    
2178        protected void logError(String message, Throwable e) {
2179            if (useLoggingForShutdownErrors) {
2180                LOG.error("Failed to shut down: " + e);
2181            } else {
2182                System.err.println("Failed to shut down: " + e);
2183            }
2184        }
2185    
2186        /**
2187         * Starts any configured destinations on startup
2188         */
2189        protected void startDestinations() throws Exception {
2190            if (destinations != null) {
2191                ConnectionContext adminConnectionContext = getAdminConnectionContext();
2192                for (int i = 0; i < destinations.length; i++) {
2193                    ActiveMQDestination destination = destinations[i];
2194                    getBroker().addDestination(adminConnectionContext, destination,true);
2195                }
2196            }
2197            if (isUseVirtualTopics()) {
2198                startVirtualConsumerDestinations();
2199            }
2200        }
2201    
2202        /**
2203         * Returns the broker's administration connection context used for
2204         * configuring the broker at startup
2205         */
2206        public ConnectionContext getAdminConnectionContext() throws Exception {
2207            return BrokerSupport.getConnectionContext(getBroker());
2208        }
2209    
2210        protected void waitForSlave() {
2211            try {
2212                if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2213                    throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
2214                }
2215            } catch (InterruptedException e) {
2216                LOG.error("Exception waiting for slave:" + e);
2217            }
2218        }
2219    
2220        protected void slaveConnectionEstablished() {
2221            slaveStartSignal.countDown();
2222        }
2223    
2224        protected void startManagementContext() throws Exception {
2225            getManagementContext().start();
2226            adminView = new BrokerView(this, null);
2227            ObjectName objectName = getBrokerObjectName();
2228            AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2229        }
2230    
2231        /**
2232         * Start all transport and network connections, proxies and bridges
2233         *
2234         * @throws Exception
2235         */
2236        public void startAllConnectors() throws Exception {
2237            if (!isSlave()) {
2238                Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2239                List<TransportConnector> al = new ArrayList<TransportConnector>();
2240                for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2241                    TransportConnector connector = iter.next();
2242                    connector.setBrokerService(this);
2243                    al.add(startTransportConnector(connector));
2244                }
2245                if (al.size() > 0) {
2246                    // let's clear the transportConnectors list and replace it with
2247                    // the started transportConnector instances
2248                    this.transportConnectors.clear();
2249                    setTransportConnectors(al);
2250                }
2251                URI uri = getVmConnectorURI();
2252                Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2253                map.put("network", "true");
2254                map.put("async", "false");
2255                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2256                if (isWaitForSlave()) {
2257                    waitForSlave();
2258                }
2259                if (!stopped.get()) {
2260                    ThreadPoolExecutor networkConnectorStartExecutor = null;
2261                    if (isNetworkConnectorStartAsync()) {
2262                        // spin up as many threads as needed
2263                        networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2264                                10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2265                                new ThreadFactory() {
2266                                    int count=0;
2267                                    public Thread newThread(Runnable runnable) {
2268                                        Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2269                                        thread.setDaemon(true);
2270                                        return thread;
2271                                    }
2272                                });
2273                    }
2274    
2275                    for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2276                        final NetworkConnector connector = iter.next();
2277                        connector.setLocalUri(uri);
2278                        connector.setBrokerName(getBrokerName());
2279                        connector.setDurableDestinations(durableDestinations);
2280                        if (getDefaultSocketURIString() != null) {
2281                            connector.setBrokerURL(getDefaultSocketURIString());
2282                        }
2283                        if (networkConnectorStartExecutor != null) {
2284                            networkConnectorStartExecutor.execute(new Runnable() {
2285                                public void run() {
2286                                    try {
2287                                        LOG.info("Async start of " + connector);
2288                                        connector.start();
2289                                    } catch(Exception e) {
2290                                        LOG.error("Async start of network connector: " + connector + " failed", e);
2291                                    }
2292                                }
2293                            });
2294                        } else {
2295                            connector.start();
2296                        }
2297                    }
2298                    if (networkConnectorStartExecutor != null) {
2299                        // executor done when enqueued tasks are complete
2300                        networkConnectorStartExecutor.shutdown();
2301                        networkConnectorStartExecutor = null;
2302                    }
2303    
2304                    for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2305                        ProxyConnector connector = iter.next();
2306                        connector.start();
2307                    }
2308                    for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2309                        JmsConnector connector = iter.next();
2310                        connector.start();
2311                    }
2312                    for (Service service : services) {
2313                        configureService(service);
2314                        service.start();
2315                    }
2316                }
2317            }
2318        }
2319    
2320        protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2321            connector.setTaskRunnerFactory(getTaskRunnerFactory());
2322            MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2323            if (policy != null) {
2324                connector.setMessageAuthorizationPolicy(policy);
2325            }
2326            if (isUseJmx()) {
2327                connector = registerConnectorMBean(connector);
2328            }
2329            connector.getStatistics().setEnabled(enableStatistics);
2330            connector.start();
2331            return connector;
2332        }
2333    
2334        /**
2335         * Perform any custom dependency injection
2336         */
2337        protected void configureServices(Object[] services) {
2338            for (Object service : services) {
2339                configureService(service);
2340            }
2341        }
2342    
2343        /**
2344         * Perform any custom dependency injection
2345         */
2346        protected void configureService(Object service) {
2347            if (service instanceof BrokerServiceAware) {
2348                BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2349                serviceAware.setBrokerService(this);
2350            }
2351            if (masterConnector == null) {
2352                if (service instanceof MasterConnector) {
2353                    masterConnector = (MasterConnector) service;
2354                    supportFailOver = true;
2355                }
2356            }
2357        }
2358    
2359        public void handleIOException(IOException exception) {
2360            if (ioExceptionHandler != null) {
2361                ioExceptionHandler.handle(exception);
2362             } else {
2363                LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
2364             }
2365        }
2366    
2367        protected void startVirtualConsumerDestinations() throws Exception {
2368            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2369            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2370            DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2371            if (!destinations.isEmpty()) {
2372                for (ActiveMQDestination destination : destinations) {
2373                    if (filter.matches(destination) == true) {
2374                        broker.addDestination(adminConnectionContext, destination, false);
2375                    }
2376                }
2377            }
2378        }
2379    
2380        private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2381            // created at startup, so no sync needed
2382            if (virtualConsumerDestinationFilter == null) {
2383                Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2384                for (DestinationInterceptor interceptor : destinationInterceptors) {
2385                    if (interceptor instanceof VirtualDestinationInterceptor) {
2386                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2387                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2388                            if (virtualDestination instanceof VirtualTopic) {
2389                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2390                            }
2391                        }
2392                    }
2393                }
2394                ActiveMQQueue filter = new ActiveMQQueue();
2395                filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2396                virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2397            }
2398            return virtualConsumerDestinationFilter;
2399        }
2400    
2401        protected synchronized ThreadPoolExecutor getExecutor() {
2402            if (this.executor == null) {
2403                this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2404    
2405                    private long i = 0;
2406    
2407                    @Override
2408                    public Thread newThread(Runnable runnable) {
2409                        this.i++;
2410                        Thread thread = new Thread(runnable, "BrokerService.worker." + this.i);
2411                        thread.setDaemon(true);
2412                        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2413                            @Override
2414                            public void uncaughtException(final Thread t, final Throwable e) {
2415                                LOG.error("Error in thread '{}'", t.getName(), e);
2416                            }
2417                        });
2418                        return thread;
2419                    }
2420                }, new RejectedExecutionHandler() {
2421                    @Override
2422                    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2423                        try {
2424                            executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2425                        } catch (InterruptedException e) {
2426                            throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2427                        }
2428    
2429                        throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2430                    }
2431                });
2432            }
2433            return this.executor;
2434        }
2435    
2436        public synchronized Scheduler getScheduler() {
2437            if (this.scheduler==null) {
2438                this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2439                try {
2440                    this.scheduler.start();
2441                } catch (Exception e) {
2442                   LOG.error("Failed to start Scheduler ",e);
2443                }
2444            }
2445            return this.scheduler;
2446        }
2447    
2448        public Broker getRegionBroker() {
2449            return regionBroker;
2450        }
2451    
2452        public void setRegionBroker(Broker regionBroker) {
2453            this.regionBroker = regionBroker;
2454        }
2455    
2456        public void addShutdownHook(Runnable hook) {
2457            synchronized (shutdownHooks) {
2458                shutdownHooks.add(hook);
2459            }
2460        }
2461    
2462        public void removeShutdownHook(Runnable hook) {
2463            synchronized (shutdownHooks) {
2464                shutdownHooks.remove(hook);
2465            }
2466        }
2467    
2468        public boolean isSystemExitOnShutdown() {
2469            return systemExitOnShutdown;
2470        }
2471    
2472        /**
2473         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2474         */
2475        public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2476            this.systemExitOnShutdown = systemExitOnShutdown;
2477        }
2478    
2479        public int getSystemExitOnShutdownExitCode() {
2480            return systemExitOnShutdownExitCode;
2481        }
2482    
2483        public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2484            this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2485        }
2486    
2487        public SslContext getSslContext() {
2488            return sslContext;
2489        }
2490    
2491        public void setSslContext(SslContext sslContext) {
2492            this.sslContext = sslContext;
2493        }
2494    
2495        public boolean isShutdownOnSlaveFailure() {
2496            return shutdownOnSlaveFailure;
2497        }
2498    
2499        /**
2500         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2501         */
2502        public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2503            this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2504        }
2505    
2506        public boolean isWaitForSlave() {
2507            return waitForSlave;
2508        }
2509    
2510        /**
2511         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2512         */
2513        public void setWaitForSlave(boolean waitForSlave) {
2514            this.waitForSlave = waitForSlave;
2515        }
2516    
2517        public long getWaitForSlaveTimeout() {
2518            return this.waitForSlaveTimeout;
2519        }
2520    
2521        public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2522            this.waitForSlaveTimeout = waitForSlaveTimeout;
2523        }
2524    
2525        public CountDownLatch getSlaveStartSignal() {
2526            return slaveStartSignal;
2527        }
2528    
2529        /**
2530         * Get the passiveSlave
2531         * @return the passiveSlave
2532         */
2533        public boolean isPassiveSlave() {
2534            return this.passiveSlave;
2535        }
2536    
2537        /**
2538         * Set the passiveSlave
2539         * @param passiveSlave the passiveSlave to set
2540         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2541         */
2542        public void setPassiveSlave(boolean passiveSlave) {
2543            this.passiveSlave = passiveSlave;
2544        }
2545    
2546        /**
2547         * override the Default IOException handler, called when persistence adapter
2548         * has experiences File or JDBC I/O Exceptions
2549         *
2550         * @param ioExceptionHandler
2551         */
2552        public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2553            configureService(ioExceptionHandler);
2554            this.ioExceptionHandler = ioExceptionHandler;
2555        }
2556    
2557        public IOExceptionHandler getIoExceptionHandler() {
2558            return ioExceptionHandler;
2559        }
2560    
2561        /**
2562         * @return the schedulerSupport
2563         */
2564        public boolean isSchedulerSupport() {
2565            return this.schedulerSupport;
2566        }
2567    
2568        /**
2569         * @param schedulerSupport the schedulerSupport to set
2570         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2571         */
2572        public void setSchedulerSupport(boolean schedulerSupport) {
2573            this.schedulerSupport = schedulerSupport;
2574        }
2575    
2576        /**
2577         * @return the schedulerDirectory
2578         */
2579        public File getSchedulerDirectoryFile() {
2580            if (this.schedulerDirectoryFile == null) {
2581                this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2582            }
2583            return schedulerDirectoryFile;
2584        }
2585    
2586        /**
2587         * @param schedulerDirectory the schedulerDirectory to set
2588         */
2589        public void setSchedulerDirectoryFile(File schedulerDirectory) {
2590            this.schedulerDirectoryFile = schedulerDirectory;
2591        }
2592    
2593        public void setSchedulerDirectory(String schedulerDirectory) {
2594            setSchedulerDirectoryFile(new File(schedulerDirectory));
2595        }
2596    
2597        public int getSchedulePeriodForDestinationPurge() {
2598            return this.schedulePeriodForDestinationPurge;
2599        }
2600    
2601        public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2602            this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2603        }
2604    
2605        public int getMaxPurgedDestinationsPerSweep() {
2606            return this.maxPurgedDestinationsPerSweep;
2607        }
2608    
2609        public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2610            this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2611        }
2612    
2613        public BrokerContext getBrokerContext() {
2614            return brokerContext;
2615        }
2616    
2617        public void setBrokerContext(BrokerContext brokerContext) {
2618            this.brokerContext = brokerContext;
2619        }
2620    
2621        public void setBrokerId(String brokerId) {
2622            this.brokerId = new BrokerId(brokerId);
2623        }
2624    
2625        public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2626            return useAuthenticatedPrincipalForJMSXUserID;
2627        }
2628    
2629        public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2630            this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2631        }
2632    
2633        /**
2634         * Should MBeans that support showing the Authenticated User Name information have this
2635         * value filled in or not.
2636         *
2637         * @return true if user names should be exposed in MBeans
2638         */
2639        public boolean isPopulateUserNameInMBeans() {
2640            return this.populateUserNameInMBeans;
2641        }
2642    
2643        /**
2644         * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2645         * @param true if MBeans should expose user name information.
2646         */
2647        public void setPopulateUserNameInMBeans(boolean value) {
2648            this.populateUserNameInMBeans = value;
2649        }
2650    
2651        public boolean isNetworkConnectorStartAsync() {
2652            return networkConnectorStartAsync;
2653        }
2654    
2655        public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2656            this.networkConnectorStartAsync = networkConnectorStartAsync;
2657        }
2658    
2659        public boolean isAllowTempAutoCreationOnSend() {
2660            return allowTempAutoCreationOnSend;
2661        }
2662    
2663        /**
2664         * enable if temp destinations need to be propagated through a network when
2665         * advisorySupport==false. This is used in conjunction with the policy
2666         * gcInactiveDestinations for matching temps so they can get removed
2667         * when inactive
2668         *
2669         * @param allowTempAutoCreationOnSend
2670         */
2671        public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2672            this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2673        }
2674    
2675        public int getOfflineDurableSubscriberTimeout() {
2676            return offlineDurableSubscriberTimeout;
2677        }
2678    
2679        public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
2680            this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2681        }
2682    
2683        public int getOfflineDurableSubscriberTaskSchedule() {
2684            return offlineDurableSubscriberTaskSchedule;
2685        }
2686    
2687        public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
2688            this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2689        }
2690    
2691        public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2692            return isUseVirtualTopics() && destination.isQueue() &&
2693                    getVirtualTopicConsumerDestinationFilter().matches(destination);
2694        }
2695    }