001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.util.HashMap;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.Map;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import javax.jms.Connection;
025    import javax.jms.ConnectionFactory;
026    import javax.jms.JMSException;
027    import org.apache.activemq.ActiveMQConnection;
028    import org.apache.activemq.ActiveMQConnectionFactory;
029    import org.apache.activemq.Service;
030    import org.apache.activemq.util.IOExceptionSupport;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    import org.apache.commons.pool.ObjectPoolFactory;
034    import org.apache.commons.pool.impl.GenericObjectPool;
035    import org.apache.commons.pool.impl.GenericObjectPoolFactory;
036    
037    /**
038     * A JMS provider which pools Connection, Session and MessageProducer instances
039     * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
040     * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
041     * Connections, sessions and producers are returned to a pool after use so that they can be reused later
042     * without having to undergo the cost of creating them again.
043     *
044     * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
045     * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
046     * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
047     * just created at startup and left active, handling incoming messages as they come. When a consumer is
048     * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
049     * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
050     * where they'll get held until the consumer is active again.
051     *
052     * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
053     * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
054     * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
055     * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
056     *
057     * @org.apache.xbean.XBean element="pooledConnectionFactory"
058     *
059     *
060     */
061    public class PooledConnectionFactory implements ConnectionFactory, Service {
062        private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
063        private ConnectionFactory connectionFactory;
064        private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
065        private ObjectPoolFactory poolFactory;
066        private int maximumActive = 500;
067        private int maxConnections = 1;
068        private int idleTimeout = 30 * 1000;
069        private boolean blockIfSessionPoolIsFull = true;
070        private AtomicBoolean stopped = new AtomicBoolean(false);
071        private long expiryTimeout = 0l;
072    
073        public PooledConnectionFactory() {
074            this(new ActiveMQConnectionFactory());
075        }
076    
077        public PooledConnectionFactory(String brokerURL) {
078            this(new ActiveMQConnectionFactory(brokerURL));
079        }
080    
081        public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
082            this.connectionFactory = connectionFactory;
083        }
084    
085        public ConnectionFactory getConnectionFactory() {
086            return connectionFactory;
087        }
088    
089        public void setConnectionFactory(ConnectionFactory connectionFactory) {
090            this.connectionFactory = connectionFactory;
091        }
092    
093        public Connection createConnection() throws JMSException {
094            return createConnection(null, null);
095        }
096    
097        public synchronized Connection createConnection(String userName, String password) throws JMSException {
098            if (stopped.get()) {
099                LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
100                return null;
101            }
102    
103            ConnectionKey key = new ConnectionKey(userName, password);
104            LinkedList<ConnectionPool> pools = cache.get(key);
105    
106            if (pools == null) {
107                pools = new LinkedList<ConnectionPool>();
108                cache.put(key, pools);
109            }
110    
111            ConnectionPool connection = null;
112            if (pools.size() == maxConnections) {
113                connection = pools.removeFirst();
114            }
115    
116            // Now.. we might get a connection, but it might be that we need to
117            // dump it..
118            if (connection != null && connection.expiredCheck()) {
119                connection = null;
120            }
121    
122            if (connection == null) {
123                ActiveMQConnection delegate = createConnection(key);
124                connection = createConnectionPool(delegate);
125            }
126            pools.add(connection);
127            return new PooledConnection(connection);
128        }
129    
130        protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
131            ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
132            result.setIdleTimeout(getIdleTimeout());
133            result.setExpiryTimeout(getExpiryTimeout());
134            return result;
135        }
136    
137        protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
138            if (key.getUserName() == null && key.getPassword() == null) {
139                return (ActiveMQConnection)connectionFactory.createConnection();
140            } else {
141                return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
142            }
143        }
144    
145        /**
146         * @see org.apache.activemq.service.Service#start()
147         */
148        public void start() {
149            try {
150                stopped.set(false);
151                createConnection();
152            } catch (JMSException e) {
153                LOG.warn("Create pooled connection during start failed.", e);
154                IOExceptionSupport.create(e);
155            }
156        }
157    
158        public void stop() {
159            LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
160            stopped.set(true);
161            for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
162                for (ConnectionPool connection : iter.next()) {
163                    try {
164                        connection.close();
165                    }catch(Exception e) {
166                        LOG.warn("Close connection failed",e);
167                    }
168                }
169            }
170            cache.clear();
171        }
172    
173        public ObjectPoolFactory getPoolFactory() {
174            if (poolFactory == null) {
175                poolFactory = createPoolFactory();
176            }
177            return poolFactory;
178        }
179    
180        /**
181         * Sets the object pool factory used to create individual session pools for
182         * each connection
183         */
184        public void setPoolFactory(ObjectPoolFactory poolFactory) {
185            this.poolFactory = poolFactory;
186        }
187    
188        public int getMaximumActive() {
189            return maximumActive;
190        }
191    
192        /**
193         * Sets the maximum number of active sessions per connection
194         */
195        public void setMaximumActive(int maximumActive) {
196            this.maximumActive = maximumActive;
197        }
198    
199        /**
200         * Controls the behavior of the internal session pool. By default the call to
201         * Connection.getSession() will block if the session pool is full.  If the
202         * argument false is given, it will change the default behavior and instead the
203         * call to getSession() will throw a JMSException.
204         *
205         * The size of the session pool is controlled by the @see #maximumActive
206         * property.
207         *
208         * @param block - if true, the call to getSession() blocks if the pool is full
209         * until a session object is available.  defaults to true.
210         */
211        public void setBlockIfSessionPoolIsFull(boolean block) {
212            this.blockIfSessionPoolIsFull = block;
213        }
214    
215        /**
216         * @return the maxConnections
217         */
218        public int getMaxConnections() {
219            return maxConnections;
220        }
221    
222        /**
223         * @param maxConnections the maxConnections to set
224         */
225        public void setMaxConnections(int maxConnections) {
226            this.maxConnections = maxConnections;
227        }
228    
229        /**
230         * Creates an ObjectPoolFactory. Its behavior is controlled by the two
231         * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
232         *
233         * @return the newly created but empty ObjectPoolFactory
234         */
235        protected ObjectPoolFactory createPoolFactory() {
236             if (blockIfSessionPoolIsFull) {
237                return new GenericObjectPoolFactory(null, maximumActive);
238            } else {
239                return new GenericObjectPoolFactory(null,
240                    maximumActive,
241                    GenericObjectPool.WHEN_EXHAUSTED_FAIL,
242                    GenericObjectPool.DEFAULT_MAX_WAIT);
243            }
244        }
245    
246        public int getIdleTimeout() {
247            return idleTimeout;
248        }
249    
250        public void setIdleTimeout(int idleTimeout) {
251            this.idleTimeout = idleTimeout;
252        }
253    
254        /**
255         * allow connections to expire, irrespective of load or idle time. This is useful with failover
256         * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
257         *
258         * @param expiryTimeout non zero in milliseconds
259         */
260        public void setExpiryTimeout(long expiryTimeout) {
261            this.expiryTimeout = expiryTimeout;
262        }
263    
264        public long getExpiryTimeout() {
265            return expiryTimeout;
266        }
267    }