001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.HashMap;
022    import java.util.Map;
023    import java.util.Properties;
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.ThreadFactory;
027    
028    import javax.jms.Connection;
029    import javax.jms.ConnectionFactory;
030    import javax.jms.ExceptionListener;
031    import javax.jms.JMSException;
032    import javax.jms.QueueConnection;
033    import javax.jms.QueueConnectionFactory;
034    import javax.jms.TopicConnection;
035    import javax.jms.TopicConnectionFactory;
036    import javax.naming.Context;
037    
038    import org.apache.activemq.blob.BlobTransferPolicy;
039    import org.apache.activemq.jndi.JNDIBaseStorable;
040    import org.apache.activemq.management.JMSStatsImpl;
041    import org.apache.activemq.management.StatsCapable;
042    import org.apache.activemq.management.StatsImpl;
043    import org.apache.activemq.transport.Transport;
044    import org.apache.activemq.transport.TransportFactory;
045    import org.apache.activemq.transport.TransportListener;
046    import org.apache.activemq.util.IdGenerator;
047    import org.apache.activemq.util.IntrospectionSupport;
048    import org.apache.activemq.util.JMSExceptionSupport;
049    import org.apache.activemq.util.URISupport;
050    import org.apache.activemq.util.URISupport.CompositeData;
051    
052    /**
053     * A ConnectionFactory is an an Administered object, and is used for creating
054     * Connections. <p/> This class also implements QueueConnectionFactory and
055     * TopicConnectionFactory. You can use this connection to create both
056     * QueueConnections and TopicConnections.
057     *
058     *
059     * @see javax.jms.ConnectionFactory
060     */
061    public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062    
063        public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
064        public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
065        public static final String DEFAULT_USER = null;
066        public static final String DEFAULT_PASSWORD = null;
067        public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
068    
069        protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
070            public Thread newThread(Runnable run) {
071                Thread thread = new Thread(run);
072                thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
073                return thread;
074            }
075        });
076    
077        protected URI brokerURL;
078        protected String userName;
079        protected String password;
080        protected String clientID;
081        protected boolean dispatchAsync=true;
082        protected boolean alwaysSessionAsync=true;
083    
084        JMSStatsImpl factoryStats = new JMSStatsImpl();
085    
086        private IdGenerator clientIdGenerator;
087        private String clientIDPrefix;
088        private IdGenerator connectionIdGenerator;
089        private String connectionIDPrefix;
090    
091        // client policies
092        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
093        private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
094        private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
095        private MessageTransformer transformer;
096    
097        private boolean disableTimeStampsByDefault;
098        private boolean optimizedMessageDispatch = true;
099        private long optimizeAcknowledgeTimeOut = 300;
100        private boolean copyMessageOnSend = true;
101        private boolean useCompression;
102        private boolean objectMessageSerializationDefered;
103        private boolean useAsyncSend;
104        private boolean optimizeAcknowledge;
105        private int closeTimeout = 15000;
106        private boolean useRetroactiveConsumer;
107        private boolean exclusiveConsumer;
108        private boolean nestedMapAndListEnabled = true;
109        private boolean alwaysSyncSend;
110        private boolean watchTopicAdvisories = true;
111        private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
112        private long warnAboutUnstartedConnectionTimeout = 500L;
113        private int sendTimeout = 0;
114        private boolean sendAcksAsync=true;
115        private TransportListener transportListener;
116        private ExceptionListener exceptionListener;
117        private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
118        private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
119        private boolean useDedicatedTaskRunner;
120        private long consumerFailoverRedeliveryWaitPeriod = 0;
121        private boolean checkForDuplicates = true;
122        private ClientInternalExceptionListener clientInternalExceptionListener;
123        private boolean messagePrioritySupported = true;
124        private boolean transactedIndividualAck = false;
125        private boolean nonBlockingRedelivery = false;
126    
127        // /////////////////////////////////////////////
128        //
129        // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
130        //
131        // /////////////////////////////////////////////
132    
133        public ActiveMQConnectionFactory() {
134            this(DEFAULT_BROKER_URL);
135        }
136    
137        public ActiveMQConnectionFactory(String brokerURL) {
138            this(createURI(brokerURL));
139        }
140    
141        public ActiveMQConnectionFactory(URI brokerURL) {
142            setBrokerURL(brokerURL.toString());
143        }
144    
145        public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
146            setUserName(userName);
147            setPassword(password);
148            setBrokerURL(brokerURL.toString());
149        }
150    
151        public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
152            setUserName(userName);
153            setPassword(password);
154            setBrokerURL(brokerURL);
155        }
156    
157        /**
158         * Returns a copy of the given connection factory
159         */
160        public ActiveMQConnectionFactory copy() {
161            try {
162                return (ActiveMQConnectionFactory)super.clone();
163            } catch (CloneNotSupportedException e) {
164                throw new RuntimeException("This should never happen: " + e, e);
165            }
166        }
167    
168        /**
169         * @param brokerURL
170         * @return
171         * @throws URISyntaxException
172         */
173        private static URI createURI(String brokerURL) {
174            try {
175                return new URI(brokerURL);
176            } catch (URISyntaxException e) {
177                throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
178            }
179        }
180    
181        /**
182         * @return Returns the Connection.
183         */
184        public Connection createConnection() throws JMSException {
185            return createActiveMQConnection();
186        }
187    
188        /**
189         * @return Returns the Connection.
190         */
191        public Connection createConnection(String userName, String password) throws JMSException {
192            return createActiveMQConnection(userName, password);
193        }
194    
195        /**
196         * @return Returns the QueueConnection.
197         * @throws JMSException
198         */
199        public QueueConnection createQueueConnection() throws JMSException {
200            return createActiveMQConnection();
201        }
202    
203        /**
204         * @return Returns the QueueConnection.
205         */
206        public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
207            return createActiveMQConnection(userName, password);
208        }
209    
210        /**
211         * @return Returns the TopicConnection.
212         * @throws JMSException
213         */
214        public TopicConnection createTopicConnection() throws JMSException {
215            return createActiveMQConnection();
216        }
217    
218        /**
219         * @return Returns the TopicConnection.
220         */
221        public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
222            return createActiveMQConnection(userName, password);
223        }
224    
225        /**
226         * @returns the StatsImpl associated with this ConnectionFactory.
227         */
228        public StatsImpl getStats() {
229            return this.factoryStats;
230        }
231    
232        // /////////////////////////////////////////////
233        //
234        // Implementation methods.
235        //
236        // /////////////////////////////////////////////
237    
238        protected ActiveMQConnection createActiveMQConnection() throws JMSException {
239            return createActiveMQConnection(userName, password);
240        }
241    
242        /**
243         * Creates a Transport based on this object's connection settings. Separated
244         * from createActiveMQConnection to allow for subclasses to override.
245         *
246         * @return The newly created Transport.
247         * @throws JMSException If unable to create trasnport.
248         * @author sepandm@gmail.com
249         */
250        protected Transport createTransport() throws JMSException {
251            try {
252                return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
253            } catch (Exception e) {
254                throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
255            }
256        }
257    
258        /**
259         * @return Returns the Connection.
260         */
261        protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
262            if (brokerURL == null) {
263                throw new ConfigurationException("brokerURL not set.");
264            }
265            ActiveMQConnection connection = null;
266            try {
267                Transport transport = createTransport();
268                connection = createActiveMQConnection(transport, factoryStats);
269    
270                connection.setUserName(userName);
271                connection.setPassword(password);
272    
273                configureConnection(connection);
274    
275                transport.start();
276    
277                if (clientID != null) {
278                    connection.setDefaultClientID(clientID);
279                }
280    
281                return connection;
282            } catch (JMSException e) {
283                // Clean up!
284                try {
285                    connection.close();
286                } catch (Throwable ignore) {
287                }
288                throw e;
289            } catch (Exception e) {
290                // Clean up!
291                try {
292                    connection.close();
293                } catch (Throwable ignore) {
294                }
295                throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
296            }
297        }
298    
299        protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
300            ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
301                    getConnectionIdGenerator(), stats);
302            return connection;
303        }
304    
305        protected void configureConnection(ActiveMQConnection connection) throws JMSException {
306            connection.setPrefetchPolicy(getPrefetchPolicy());
307            connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
308            connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
309            connection.setCopyMessageOnSend(isCopyMessageOnSend());
310            connection.setUseCompression(isUseCompression());
311            connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
312            connection.setDispatchAsync(isDispatchAsync());
313            connection.setUseAsyncSend(isUseAsyncSend());
314            connection.setAlwaysSyncSend(isAlwaysSyncSend());
315            connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
316            connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
317            connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
318            connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
319            connection.setExclusiveConsumer(isExclusiveConsumer());
320            connection.setRedeliveryPolicy(getRedeliveryPolicy());
321            connection.setTransformer(getTransformer());
322            connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
323            connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
324            connection.setProducerWindowSize(getProducerWindowSize());
325            connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
326            connection.setSendTimeout(getSendTimeout());
327            connection.setCloseTimeout(getCloseTimeout());
328            connection.setSendAcksAsync(isSendAcksAsync());
329            connection.setAuditDepth(getAuditDepth());
330            connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
331            connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
332            connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
333            connection.setCheckForDuplicates(isCheckForDuplicates());
334            connection.setMessagePrioritySupported(isMessagePrioritySupported());
335            connection.setTransactedIndividualAck(isTransactedIndividualAck());
336            connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
337            if (transportListener != null) {
338                connection.addTransportListener(transportListener);
339            }
340            if (exceptionListener != null) {
341                connection.setExceptionListener(exceptionListener);
342            }
343            if (clientInternalExceptionListener != null) {
344                connection.setClientInternalExceptionListener(clientInternalExceptionListener);
345            }
346        }
347    
348        // /////////////////////////////////////////////
349        //
350        // Property Accessors
351        //
352        // /////////////////////////////////////////////
353    
354        public String getBrokerURL() {
355            return brokerURL == null ? null : brokerURL.toString();
356        }
357    
358        /**
359         * Sets the <a
360         * href="http://activemq.apache.org/configuring-transports.html">connection
361         * URL</a> used to connect to the ActiveMQ broker.
362         */
363        public void setBrokerURL(String brokerURL) {
364            this.brokerURL = createURI(brokerURL);
365    
366            // Use all the properties prefixed with 'jms.' to set the connection
367            // factory
368            // options.
369            if (this.brokerURL.getQuery() != null) {
370                // It might be a standard URI or...
371                try {
372    
373                    Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
374                    Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
375                    if (buildFromMap(jmsOptionsMap)) {
376                        if (!jmsOptionsMap.isEmpty()) {
377                            String msg = "There are " + jmsOptionsMap.size()
378                                + " jms options that couldn't be set on the ConnectionFactory."
379                                + " Check the options are spelled correctly."
380                                + " Unknown parameters=[" + jmsOptionsMap + "]."
381                                + " This connection factory cannot be started.";
382                            throw new IllegalArgumentException(msg);
383                        }
384    
385                        this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
386                    }
387    
388                } catch (URISyntaxException e) {
389                }
390    
391            } else {
392    
393                // It might be a composite URI.
394                try {
395                    CompositeData data = URISupport.parseComposite(this.brokerURL);
396                    Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
397                    if (buildFromMap(jmsOptionsMap)) {
398                        if (!jmsOptionsMap.isEmpty()) {
399                            String msg = "There are " + jmsOptionsMap.size()
400                                + " jms options that couldn't be set on the ConnectionFactory."
401                                + " Check the options are spelled correctly."
402                                + " Unknown parameters=[" + jmsOptionsMap + "]."
403                                + " This connection factory cannot be started.";
404                            throw new IllegalArgumentException(msg);
405                        }
406    
407                        this.brokerURL = data.toURI();
408                    }
409                } catch (URISyntaxException e) {
410                }
411            }
412        }
413    
414        public String getClientID() {
415            return clientID;
416        }
417    
418        /**
419         * Sets the JMS clientID to use for the created connection. Note that this
420         * can only be used by one connection at once so generally its a better idea
421         * to set the clientID on a Connection
422         */
423        public void setClientID(String clientID) {
424            this.clientID = clientID;
425        }
426    
427        public boolean isCopyMessageOnSend() {
428            return copyMessageOnSend;
429        }
430    
431        /**
432         * Should a JMS message be copied to a new JMS Message object as part of the
433         * send() method in JMS. This is enabled by default to be compliant with the
434         * JMS specification. You can disable it if you do not mutate JMS messages
435         * after they are sent for a performance boost
436         */
437        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
438            this.copyMessageOnSend = copyMessageOnSend;
439        }
440    
441        public boolean isDisableTimeStampsByDefault() {
442            return disableTimeStampsByDefault;
443        }
444    
445        /**
446         * Sets whether or not timestamps on messages should be disabled or not. If
447         * you disable them it adds a small performance boost.
448         */
449        public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
450            this.disableTimeStampsByDefault = disableTimeStampsByDefault;
451        }
452    
453        public boolean isOptimizedMessageDispatch() {
454            return optimizedMessageDispatch;
455        }
456    
457        /**
458         * If this flag is set then an larger prefetch limit is used - only
459         * applicable for durable topic subscribers.
460         */
461        public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
462            this.optimizedMessageDispatch = optimizedMessageDispatch;
463        }
464    
465        public String getPassword() {
466            return password;
467        }
468    
469        /**
470         * Sets the JMS password used for connections created from this factory
471         */
472        public void setPassword(String password) {
473            this.password = password;
474        }
475    
476        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
477            return prefetchPolicy;
478        }
479    
480        /**
481         * Sets the <a
482         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
483         * policy</a> for consumers created by this connection.
484         */
485        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
486            this.prefetchPolicy = prefetchPolicy;
487        }
488    
489        public boolean isUseAsyncSend() {
490            return useAsyncSend;
491        }
492    
493        public BlobTransferPolicy getBlobTransferPolicy() {
494            return blobTransferPolicy;
495        }
496    
497        /**
498         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
499         * OBjects) are transferred from producers to brokers to consumers
500         */
501        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
502            this.blobTransferPolicy = blobTransferPolicy;
503        }
504    
505        /**
506         * Forces the use of <a
507         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
508         * adds a massive performance boost; but means that the send() method will
509         * return immediately whether the message has been sent or not which could
510         * lead to message loss.
511         */
512        public void setUseAsyncSend(boolean useAsyncSend) {
513            this.useAsyncSend = useAsyncSend;
514        }
515    
516        public synchronized boolean isWatchTopicAdvisories() {
517            return watchTopicAdvisories;
518        }
519    
520        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
521            this.watchTopicAdvisories = watchTopicAdvisories;
522        }
523    
524        /**
525         * @return true if always sync send messages
526         */
527        public boolean isAlwaysSyncSend() {
528            return this.alwaysSyncSend;
529        }
530    
531        /**
532         * Set true if always require messages to be sync sent
533         *
534         * @param alwaysSyncSend
535         */
536        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
537            this.alwaysSyncSend = alwaysSyncSend;
538        }
539    
540        public String getUserName() {
541            return userName;
542        }
543    
544        /**
545         * Sets the JMS userName used by connections created by this factory
546         */
547        public void setUserName(String userName) {
548            this.userName = userName;
549        }
550    
551        public boolean isUseRetroactiveConsumer() {
552            return useRetroactiveConsumer;
553        }
554    
555        /**
556         * Sets whether or not retroactive consumers are enabled. Retroactive
557         * consumers allow non-durable topic subscribers to receive old messages
558         * that were published before the non-durable subscriber started.
559         */
560        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
561            this.useRetroactiveConsumer = useRetroactiveConsumer;
562        }
563    
564        public boolean isExclusiveConsumer() {
565            return exclusiveConsumer;
566        }
567    
568        /**
569         * Enables or disables whether or not queue consumers should be exclusive or
570         * not for example to preserve ordering when not using <a
571         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
572         *
573         * @param exclusiveConsumer
574         */
575        public void setExclusiveConsumer(boolean exclusiveConsumer) {
576            this.exclusiveConsumer = exclusiveConsumer;
577        }
578    
579        public RedeliveryPolicy getRedeliveryPolicy() {
580            return redeliveryPolicy;
581        }
582    
583        /**
584         * Sets the global redelivery policy to be used when a message is delivered
585         * but the session is rolled back
586         */
587        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
588            this.redeliveryPolicy = redeliveryPolicy;
589        }
590    
591        public MessageTransformer getTransformer() {
592            return transformer;
593        }
594    
595        /**
596         * @return the sendTimeout
597         */
598        public int getSendTimeout() {
599            return sendTimeout;
600        }
601    
602        /**
603         * @param sendTimeout the sendTimeout to set
604         */
605        public void setSendTimeout(int sendTimeout) {
606            this.sendTimeout = sendTimeout;
607        }
608    
609        /**
610         * @return the sendAcksAsync
611         */
612        public boolean isSendAcksAsync() {
613            return sendAcksAsync;
614        }
615    
616        /**
617         * @param sendAcksAsync the sendAcksAsync to set
618         */
619        public void setSendAcksAsync(boolean sendAcksAsync) {
620            this.sendAcksAsync = sendAcksAsync;
621        }
622    
623        /**
624         * @return the messagePrioritySupported
625         */
626        public boolean isMessagePrioritySupported() {
627            return this.messagePrioritySupported;
628        }
629    
630        /**
631         * @param messagePrioritySupported the messagePrioritySupported to set
632         */
633        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
634            this.messagePrioritySupported = messagePrioritySupported;
635        }
636    
637    
638        /**
639         * Sets the transformer used to transform messages before they are sent on
640         * to the JMS bus or when they are received from the bus but before they are
641         * delivered to the JMS client
642         */
643        public void setTransformer(MessageTransformer transformer) {
644            this.transformer = transformer;
645        }
646    
647        @SuppressWarnings({ "unchecked", "rawtypes" })
648        @Override
649        public void buildFromProperties(Properties properties) {
650    
651            if (properties == null) {
652                properties = new Properties();
653            }
654    
655            String temp = properties.getProperty(Context.PROVIDER_URL);
656            if (temp == null || temp.length() == 0) {
657                temp = properties.getProperty("brokerURL");
658            }
659            if (temp != null && temp.length() > 0) {
660                setBrokerURL(temp);
661            }
662    
663            Map<String, Object> p = new HashMap(properties);
664            buildFromMap(p);
665        }
666    
667        public boolean buildFromMap(Map<String, Object> properties) {
668            boolean rc = false;
669    
670            ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
671            if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
672                setPrefetchPolicy(p);
673                rc = true;
674            }
675    
676            RedeliveryPolicy rp = new RedeliveryPolicy();
677            if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
678                setRedeliveryPolicy(rp);
679                rc = true;
680            }
681    
682            BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
683            if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
684                setBlobTransferPolicy(blobTransferPolicy);
685                rc = true;
686            }
687    
688            rc |= IntrospectionSupport.setProperties(this, properties);
689    
690            return rc;
691        }
692    
693        @Override
694        public void populateProperties(Properties props) {
695            props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
696    
697            if (getBrokerURL() != null) {
698                props.setProperty(Context.PROVIDER_URL, getBrokerURL());
699                props.setProperty("brokerURL", getBrokerURL());
700            }
701    
702            if (getClientID() != null) {
703                props.setProperty("clientID", getClientID());
704            }
705    
706            IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
707            IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
708            IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
709    
710            props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
711            props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
712            props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
713            props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
714    
715            if (getPassword() != null) {
716                props.setProperty("password", getPassword());
717            }
718    
719            props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
720            props.setProperty("useCompression", Boolean.toString(isUseCompression()));
721            props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
722            props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
723    
724            if (getUserName() != null) {
725                props.setProperty("userName", getUserName());
726            }
727    
728            props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
729            props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
730            props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
731            props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
732            props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
733            props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
734            props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
735            props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
736            props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
737            props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
738            props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
739            props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
740            props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
741            props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
742        }
743    
744        public boolean isUseCompression() {
745            return useCompression;
746        }
747    
748        /**
749         * Enables the use of compression of the message bodies
750         */
751        public void setUseCompression(boolean useCompression) {
752            this.useCompression = useCompression;
753        }
754    
755        public boolean isObjectMessageSerializationDefered() {
756            return objectMessageSerializationDefered;
757        }
758    
759        /**
760         * When an object is set on an ObjectMessage, the JMS spec requires the
761         * object to be serialized by that set method. Enabling this flag causes the
762         * object to not get serialized. The object may subsequently get serialized
763         * if the message needs to be sent over a socket or stored to disk.
764         */
765        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
766            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
767        }
768    
769        public boolean isDispatchAsync() {
770            return dispatchAsync;
771        }
772    
773        /**
774         * Enables or disables the default setting of whether or not consumers have
775         * their messages <a
776         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
777         * synchronously or asynchronously by the broker</a>. For non-durable
778         * topics for example we typically dispatch synchronously by default to
779         * minimize context switches which boost performance. However sometimes its
780         * better to go slower to ensure that a single blocked consumer socket does
781         * not block delivery to other consumers.
782         *
783         * @param asyncDispatch If true then consumers created on this connection
784         *                will default to having their messages dispatched
785         *                asynchronously. The default value is true.
786         */
787        public void setDispatchAsync(boolean asyncDispatch) {
788            this.dispatchAsync = asyncDispatch;
789        }
790    
791        /**
792         * @return Returns the closeTimeout.
793         */
794        public int getCloseTimeout() {
795            return closeTimeout;
796        }
797    
798        /**
799         * Sets the timeout before a close is considered complete. Normally a
800         * close() on a connection waits for confirmation from the broker; this
801         * allows that operation to timeout to save the client hanging if there is
802         * no broker
803         */
804        public void setCloseTimeout(int closeTimeout) {
805            this.closeTimeout = closeTimeout;
806        }
807    
808        /**
809         * @return Returns the alwaysSessionAsync.
810         */
811        public boolean isAlwaysSessionAsync() {
812            return alwaysSessionAsync;
813        }
814    
815        /**
816         * If this flag is set then a separate thread is not used for dispatching
817         * messages for each Session in the Connection. However, a separate thread
818         * is always used if there is more than one session, or the session isn't in
819         * auto acknowledge or duplicates ok mode
820         */
821        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
822            this.alwaysSessionAsync = alwaysSessionAsync;
823        }
824    
825        /**
826         * @return Returns the optimizeAcknowledge.
827         */
828        public boolean isOptimizeAcknowledge() {
829            return optimizeAcknowledge;
830        }
831    
832        /**
833         * @param optimizeAcknowledge The optimizeAcknowledge to set.
834         */
835        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
836            this.optimizeAcknowledge = optimizeAcknowledge;
837        }
838    
839        /**
840         * The max time in milliseconds between optimized ack batches
841         * @param optimizeAcknowledgeTimeOut
842         */
843        public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
844            this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
845        }
846    
847        public long getOptimizeAcknowledgeTimeOut() {
848            return optimizeAcknowledgeTimeOut;
849        }
850    
851        public boolean isNestedMapAndListEnabled() {
852            return nestedMapAndListEnabled;
853        }
854    
855        /**
856         * Enables/disables whether or not Message properties and MapMessage entries
857         * support <a
858         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
859         * Structures</a> of Map and List objects
860         */
861        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
862            this.nestedMapAndListEnabled = structuredMapsEnabled;
863        }
864    
865        public String getClientIDPrefix() {
866            return clientIDPrefix;
867        }
868    
869        /**
870         * Sets the prefix used by autogenerated JMS Client ID values which are used
871         * if the JMS client does not explicitly specify on.
872         *
873         * @param clientIDPrefix
874         */
875        public void setClientIDPrefix(String clientIDPrefix) {
876            this.clientIDPrefix = clientIDPrefix;
877        }
878    
879        protected synchronized IdGenerator getClientIdGenerator() {
880            if (clientIdGenerator == null) {
881                if (clientIDPrefix != null) {
882                    clientIdGenerator = new IdGenerator(clientIDPrefix);
883                } else {
884                    clientIdGenerator = new IdGenerator();
885                }
886            }
887            return clientIdGenerator;
888        }
889    
890        protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
891            this.clientIdGenerator = clientIdGenerator;
892        }
893    
894        /**
895         * Sets the prefix used by connection id generator
896         * @param connectionIDPrefix
897         */
898        public void setConnectionIDPrefix(String connectionIDPrefix) {
899            this.connectionIDPrefix = connectionIDPrefix;
900        }
901    
902        protected synchronized IdGenerator getConnectionIdGenerator() {
903            if (connectionIdGenerator == null) {
904                if (connectionIDPrefix != null) {
905                    connectionIdGenerator = new IdGenerator(connectionIDPrefix);
906                } else {
907                    connectionIdGenerator = new IdGenerator();
908                }
909            }
910            return connectionIdGenerator;
911        }
912    
913        protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
914            this.connectionIdGenerator = connectionIdGenerator;
915        }
916    
917        /**
918         * @return the statsEnabled
919         */
920        public boolean isStatsEnabled() {
921            return this.factoryStats.isEnabled();
922        }
923    
924        /**
925         * @param statsEnabled the statsEnabled to set
926         */
927        public void setStatsEnabled(boolean statsEnabled) {
928            this.factoryStats.setEnabled(statsEnabled);
929        }
930    
931        public synchronized int getProducerWindowSize() {
932            return producerWindowSize;
933        }
934    
935        public synchronized void setProducerWindowSize(int producerWindowSize) {
936            this.producerWindowSize = producerWindowSize;
937        }
938    
939        public long getWarnAboutUnstartedConnectionTimeout() {
940            return warnAboutUnstartedConnectionTimeout;
941        }
942    
943        /**
944         * Enables the timeout from a connection creation to when a warning is
945         * generated if the connection is not properly started via
946         * {@link Connection#start()} and a message is received by a consumer. It is
947         * a very common gotcha to forget to <a
948         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
949         * the connection</a> so this option makes the default case to create a
950         * warning if the user forgets. To disable the warning just set the value to <
951         * 0 (say -1).
952         */
953        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
954            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
955        }
956    
957        public TransportListener getTransportListener() {
958            return transportListener;
959        }
960    
961        /**
962         * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
963         * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
964         * a transport listener.
965         *
966         * @param transportListener sets the listener to be registered on all connections
967         * created by this factory
968         */
969        public void setTransportListener(TransportListener transportListener) {
970            this.transportListener = transportListener;
971        }
972    
973    
974        public ExceptionListener getExceptionListener() {
975            return exceptionListener;
976        }
977    
978        /**
979         * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
980         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
981         * an exception listener.
982         * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
983         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
984         * @param exceptionListener sets the exception listener to be registered on all connections
985         * created by this factory
986         */
987        public void setExceptionListener(ExceptionListener exceptionListener) {
988            this.exceptionListener = exceptionListener;
989        }
990    
991        public int getAuditDepth() {
992            return auditDepth;
993        }
994    
995        public void setAuditDepth(int auditDepth) {
996            this.auditDepth = auditDepth;
997        }
998    
999        public int getAuditMaximumProducerNumber() {
1000            return auditMaximumProducerNumber;
1001        }
1002    
1003        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1004            this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1005        }
1006    
1007        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009        }
1010    
1011        public boolean isUseDedicatedTaskRunner() {
1012            return useDedicatedTaskRunner;
1013        }
1014    
1015        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1016            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1017        }
1018    
1019        public long getConsumerFailoverRedeliveryWaitPeriod() {
1020            return consumerFailoverRedeliveryWaitPeriod;
1021        }
1022    
1023        public ClientInternalExceptionListener getClientInternalExceptionListener() {
1024            return clientInternalExceptionListener;
1025        }
1026    
1027        /**
1028         * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1029         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1030         * an exception listener.
1031         * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1032         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1033         * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1034         * created by this factory
1035         */
1036        public void setClientInternalExceptionListener(
1037                ClientInternalExceptionListener clientInternalExceptionListener) {
1038            this.clientInternalExceptionListener = clientInternalExceptionListener;
1039        }
1040    
1041        /**
1042         * @return the checkForDuplicates
1043         */
1044        public boolean isCheckForDuplicates() {
1045            return this.checkForDuplicates;
1046        }
1047    
1048        /**
1049         * @param checkForDuplicates the checkForDuplicates to set
1050         */
1051        public void setCheckForDuplicates(boolean checkForDuplicates) {
1052            this.checkForDuplicates = checkForDuplicates;
1053        }
1054    
1055        public boolean isTransactedIndividualAck() {
1056             return transactedIndividualAck;
1057         }
1058    
1059         /**
1060          * when true, submit individual transacted acks immediately rather than with transaction completion.
1061          * This allows the acks to represent delivery status which can be persisted on rollback
1062          * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1063          */
1064         public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1065             this.transactedIndividualAck = transactedIndividualAck;
1066         }
1067    
1068    
1069         public boolean isNonBlockingRedelivery() {
1070             return nonBlockingRedelivery;
1071         }
1072    
1073         /**
1074          * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1075          * from a rolled back transaction.  This implies that message order will not be preserved and
1076          * also will result in the TransactedIndividualAck option to be enabled.
1077          */
1078         public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1079             this.nonBlockingRedelivery = nonBlockingRedelivery;
1080         }
1081    
1082    }