001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.pool; 019 020 import java.io.IOException; 021 import java.util.Iterator; 022 import java.util.concurrent.ConcurrentHashMap; 023 import java.util.concurrent.ConcurrentLinkedQueue; 024 import java.util.concurrent.atomic.AtomicBoolean; 025 026 import javax.jms.JMSException; 027 import javax.jms.Session; 028 029 import org.apache.activemq.ActiveMQConnection; 030 import org.apache.activemq.transport.TransportListener; 031 import org.apache.commons.pool.ObjectPoolFactory; 032 033 /** 034 * Holds a real JMS connection along with the session pools associated with it. 035 * 036 * 037 */ 038 public class ConnectionPool { 039 040 private ActiveMQConnection connection; 041 private ConcurrentHashMap<SessionKey, SessionPool> cache; 042 private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>(); 043 private AtomicBoolean started = new AtomicBoolean(false); 044 private int referenceCount; 045 private ObjectPoolFactory poolFactory; 046 private long lastUsed = System.currentTimeMillis(); 047 private long firstUsed = lastUsed; 048 private boolean hasFailed; 049 private boolean hasExpired; 050 private int idleTimeout = 30 * 1000; 051 private long expiryTimeout = 0l; 052 053 public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { 054 this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory); 055 // Add a transport Listener so that we can notice if this connection 056 // should be expired due to a connection failure. 057 connection.addTransportListener(new TransportListener() { 058 public void onCommand(Object command) { 059 } 060 061 public void onException(IOException error) { 062 synchronized (ConnectionPool.this) { 063 hasFailed = true; 064 } 065 } 066 067 public void transportInterupted() { 068 } 069 070 public void transportResumed() { 071 } 072 }); 073 074 // make sure that we set the hasFailed flag, in case the transport already failed 075 // prior to the addition of our new TransportListener 076 if(connection.isTransportFailed()) { 077 hasFailed = true; 078 } 079 } 080 081 public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) { 082 this.connection = connection; 083 this.cache = cache; 084 this.poolFactory = poolFactory; 085 } 086 087 public void start() throws JMSException { 088 if (started.compareAndSet(false, true)) { 089 try { 090 connection.start(); 091 } catch (JMSException e) { 092 started.set(false); 093 throw(e); 094 } 095 } 096 } 097 098 public synchronized ActiveMQConnection getConnection() { 099 return connection; 100 } 101 102 public Session createSession(boolean transacted, int ackMode) throws JMSException { 103 SessionKey key = new SessionKey(transacted, ackMode); 104 SessionPool pool = null; 105 pool = cache.get(key); 106 if (pool == null) { 107 SessionPool newPool = createSessionPool(key); 108 SessionPool prevPool = cache.putIfAbsent(key, newPool); 109 if (prevPool != null && prevPool != newPool) { 110 // newPool was not the first one to be associated with this 111 // key... close created session pool 112 try { 113 newPool.close(); 114 } catch (Exception e) { 115 throw new JMSException(e.getMessage()); 116 } 117 } 118 pool = cache.get(key); // this will return a non-null value... 119 } 120 PooledSession session = pool.borrowSession(); 121 this.loanedSessions.add(session); 122 return session; 123 } 124 125 public synchronized void close() { 126 if (connection != null) { 127 try { 128 Iterator<SessionPool> i = cache.values().iterator(); 129 while (i.hasNext()) { 130 SessionPool pool = i.next(); 131 i.remove(); 132 try { 133 pool.close(); 134 } catch (Exception e) { 135 } 136 } 137 } finally { 138 try { 139 connection.close(); 140 } catch (Exception e) { 141 } finally { 142 connection = null; 143 } 144 } 145 } 146 } 147 148 public synchronized void incrementReferenceCount() { 149 referenceCount++; 150 lastUsed = System.currentTimeMillis(); 151 } 152 153 public synchronized void decrementReferenceCount() { 154 referenceCount--; 155 lastUsed = System.currentTimeMillis(); 156 if (referenceCount == 0) { 157 expiredCheck(); 158 159 for (PooledSession session : this.loanedSessions) { 160 try { 161 session.close(); 162 } catch (Exception e) { 163 } 164 } 165 this.loanedSessions.clear(); 166 167 // only clean up temp destinations when all users 168 // of this connection have called close 169 if (getConnection() != null) { 170 getConnection().cleanUpTempDestinations(); 171 } 172 } 173 } 174 175 /** 176 * @return true if this connection has expired. 177 */ 178 public synchronized boolean expiredCheck() { 179 if (connection == null) { 180 return true; 181 } 182 if (hasExpired) { 183 if (referenceCount == 0) { 184 close(); 185 } 186 return true; 187 } 188 if (hasFailed 189 || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) 190 || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { 191 hasExpired = true; 192 if (referenceCount == 0) { 193 close(); 194 } 195 return true; 196 } 197 return false; 198 } 199 200 public int getIdleTimeout() { 201 return idleTimeout; 202 } 203 204 public void setIdleTimeout(int idleTimeout) { 205 this.idleTimeout = idleTimeout; 206 } 207 208 protected SessionPool createSessionPool(SessionKey key) { 209 return new SessionPool(this, key, poolFactory.createPool()); 210 } 211 212 public void setExpiryTimeout(long expiryTimeout) { 213 this.expiryTimeout = expiryTimeout; 214 } 215 216 public long getExpiryTimeout() { 217 return expiryTimeout; 218 } 219 220 void onSessionReturned(PooledSession session) { 221 this.loanedSessions.remove(session); 222 } 223 224 void onSessionInvalidated(PooledSession session) { 225 this.loanedSessions.remove(session); 226 } 227 }