001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.io.PrintWriter; 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import javax.jms.Connection; 023import javax.jms.ExceptionListener; 024import javax.jms.JMSException; 025import javax.resource.ResourceException; 026import javax.resource.spi.ConnectionEvent; 027import javax.resource.spi.ConnectionEventListener; 028import javax.resource.spi.ConnectionRequestInfo; 029import javax.resource.spi.LocalTransaction; 030import javax.resource.spi.ManagedConnection; 031import javax.resource.spi.ManagedConnectionMetaData; 032import javax.security.auth.Subject; 033import javax.transaction.xa.XAResource; 034import org.apache.activemq.ActiveMQConnection; 035import org.apache.activemq.LocalTransactionEventListener; 036import org.apache.activemq.TransactionContext; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * ActiveMQManagedConnection maps to real physical connection to the server. 042 * Since a ManagedConnection has to provide a transaction managment interface to 043 * the physical connection, and sessions are the objects implement transaction 044 * managment interfaces in the JMS API, this object also maps to a singe 045 * physical JMS session. <p/> The side-effect is that JMS connection the 046 * application gets will allways create the same session object. This is good if 047 * running in an app server since the sessions are elisted in the context 048 * transaction. This is bad if used outside of an app server since the user may 049 * be trying to create 2 different sessions to coordinate 2 different uow. 050 * 051 * 052 */ 053public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: 054 // , 055 // DissociatableManagedConnection 056 // { 057 058 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQManagedConnection.class); 059 060 private PrintWriter logWriter; 061 062 private final ActiveMQConnection physicalConnection; 063 private final TransactionContext transactionContext; 064 private final List<ManagedConnectionProxy> proxyConnections = new CopyOnWriteArrayList<ManagedConnectionProxy>(); 065 private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); 066 private final LocalAndXATransaction localAndXATransaction; 067 068 private Subject subject; 069 private ActiveMQConnectionRequestInfo info; 070 private boolean destroyed; 071 072 public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException { 073 try { 074 this.subject = subject; 075 this.info = info; 076 this.physicalConnection = physicalConnection; 077 this.transactionContext = new TransactionContext(physicalConnection); 078 079 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) { 080 public void setInManagedTx(boolean inManagedTx) throws JMSException { 081 super.setInManagedTx(inManagedTx); 082 for (ManagedConnectionProxy proxy:proxyConnections) { 083 proxy.setUseSharedTxContext(inManagedTx); 084 } 085 } 086 }; 087 088 this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() { 089 public void beginEvent() { 090 fireBeginEvent(); 091 } 092 093 public void commitEvent() { 094 fireCommitEvent(); 095 } 096 097 public void rollbackEvent() { 098 fireRollbackEvent(); 099 } 100 }); 101 102 physicalConnection.setExceptionListener(this); 103 } catch (JMSException e) { 104 throw new ResourceException("Could not create a new connection: " + e.getMessage(), e); 105 } 106 } 107 108 public boolean isInManagedTx() { 109 return localAndXATransaction.isInManagedTx(); 110 } 111 112 public static boolean matches(Object x, Object y) { 113 if (x == null ^ y == null) { 114 return false; 115 } 116 if (x != null && !x.equals(y)) { 117 return false; 118 } 119 return true; 120 } 121 122 public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException { 123 124 // Do we need to change the associated userid/password 125 if (!matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword())) { 126 physicalConnection.changeUserInfo(info.getUserName(), info.getPassword()); 127 } 128 129 // Do we need to set the clientId? 130 if (info.getClientid() != null && info.getClientid().length() > 0) { 131 physicalConnection.setClientID(info.getClientid()); 132 } 133 134 this.subject = subject; 135 this.info = info; 136 } 137 138 public Connection getPhysicalConnection() { 139 return physicalConnection; 140 } 141 142 private void fireBeginEvent() { 143 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED); 144 for(ConnectionEventListener l:listeners) { 145 l.localTransactionStarted(event); 146 } 147 } 148 149 private void fireCommitEvent() { 150 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 151 for(ConnectionEventListener l:listeners) { 152 l.localTransactionCommitted(event); 153 } 154 } 155 156 private void fireRollbackEvent() { 157 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 158 for(ConnectionEventListener l:listeners) { 159 l.localTransactionRolledback(event); 160 } 161 } 162 163 private void fireCloseEvent(ManagedConnectionProxy proxy) { 164 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_CLOSED); 165 event.setConnectionHandle(proxy); 166 167 for(ConnectionEventListener l:listeners) { 168 l.connectionClosed(event); 169 } 170 } 171 172 private void fireErrorOccurredEvent(Exception error) { 173 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, error); 174 for(ConnectionEventListener l:listeners) { 175 l.connectionErrorOccurred(event); 176 } 177 } 178 179 /** 180 * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject, 181 * javax.resource.spi.ConnectionRequestInfo) 182 */ 183 public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException { 184 ManagedConnectionProxy proxy = new ManagedConnectionProxy(this); 185 proxyConnections.add(proxy); 186 return proxy; 187 } 188 189 private boolean isDestroyed() { 190 return destroyed; 191 } 192 193 /** 194 * Close down the physical connection to the server. 195 * 196 * @see javax.resource.spi.ManagedConnection#destroy() 197 */ 198 public void destroy() throws ResourceException { 199 // Have we allready been destroyed?? 200 if (isDestroyed()) { 201 return; 202 } 203 204 cleanup(); 205 206 try { 207 physicalConnection.close(); 208 destroyed = true; 209 } catch (JMSException e) { 210 LOG.info("Error occured during close of a JMS connection.", e); 211 } 212 } 213 214 /** 215 * Cleans up all proxy handles attached to this physical connection so that 216 * they cannot be used anymore. 217 * 218 * @see javax.resource.spi.ManagedConnection#cleanup() 219 */ 220 public void cleanup() throws ResourceException { 221 222 // Have we allready been destroyed?? 223 if (isDestroyed()) { 224 return; 225 } 226 227 for (ManagedConnectionProxy proxy:proxyConnections) { 228 proxy.cleanup(); 229 } 230 proxyConnections.clear(); 231 232 try { 233 physicalConnection.cleanup(); 234 } catch (JMSException e) { 235 throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e); 236 } 237 // defer transaction cleanup till after close so that close is aware of the current tx 238 localAndXATransaction.cleanup(); 239 240 } 241 242 /** 243 * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object) 244 */ 245 public void associateConnection(Object connection) throws ResourceException { 246 if (connection instanceof ManagedConnectionProxy) { 247 ManagedConnectionProxy proxy = (ManagedConnectionProxy)connection; 248 proxyConnections.add(proxy); 249 } else { 250 throw new ResourceException("Not supported : associating connection instance of " + connection.getClass().getName()); 251 } 252 } 253 254 /** 255 * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener) 256 */ 257 public void addConnectionEventListener(ConnectionEventListener listener) { 258 listeners.add(listener); 259 } 260 261 /** 262 * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener) 263 */ 264 public void removeConnectionEventListener(ConnectionEventListener listener) { 265 listeners.remove(listener); 266 } 267 268 /** 269 * @see javax.resource.spi.ManagedConnection#getXAResource() 270 */ 271 public XAResource getXAResource() throws ResourceException { 272 return localAndXATransaction; 273 } 274 275 /** 276 * @see javax.resource.spi.ManagedConnection#getLocalTransaction() 277 */ 278 public LocalTransaction getLocalTransaction() throws ResourceException { 279 return localAndXATransaction; 280 } 281 282 /** 283 * @see javax.resource.spi.ManagedConnection#getMetaData() 284 */ 285 public ManagedConnectionMetaData getMetaData() throws ResourceException { 286 return new ManagedConnectionMetaData() { 287 288 public String getEISProductName() throws ResourceException { 289 if (physicalConnection == null) { 290 throw new ResourceException("Not connected."); 291 } 292 try { 293 return physicalConnection.getMetaData().getJMSProviderName(); 294 } catch (JMSException e) { 295 throw new ResourceException("Error accessing provider.", e); 296 } 297 } 298 299 public String getEISProductVersion() throws ResourceException { 300 if (physicalConnection == null) { 301 throw new ResourceException("Not connected."); 302 } 303 try { 304 return physicalConnection.getMetaData().getProviderVersion(); 305 } catch (JMSException e) { 306 throw new ResourceException("Error accessing provider.", e); 307 } 308 } 309 310 public int getMaxConnections() throws ResourceException { 311 if (physicalConnection == null) { 312 throw new ResourceException("Not connected."); 313 } 314 return Integer.MAX_VALUE; 315 } 316 317 public String getUserName() throws ResourceException { 318 if (physicalConnection == null) { 319 throw new ResourceException("Not connected."); 320 } 321 try { 322 return physicalConnection.getClientID(); 323 } catch (JMSException e) { 324 throw new ResourceException("Error accessing provider.", e); 325 } 326 } 327 }; 328 } 329 330 /** 331 * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter) 332 */ 333 public void setLogWriter(PrintWriter logWriter) throws ResourceException { 334 this.logWriter = logWriter; 335 } 336 337 /** 338 * @see javax.resource.spi.ManagedConnection#getLogWriter() 339 */ 340 public PrintWriter getLogWriter() throws ResourceException { 341 return logWriter; 342 } 343 344 /** 345 * @param subject subject to match 346 * @param info cri to match 347 * @return whether the subject and cri match sufficiently to allow using this connection under the new circumstances 348 */ 349 public boolean matches(Subject subject, ConnectionRequestInfo info) { 350 // Check to see if it is our info class 351 if (info == null) { 352 return false; 353 } 354 if (info.getClass() != ActiveMQConnectionRequestInfo.class) { 355 return false; 356 } 357 358 // Do the subjects match? 359 if (subject == null ^ this.subject == null) { 360 return false; 361 } 362 if (subject != null && !subject.equals(this.subject)) { 363 return false; 364 } 365 366 // Does the info match? 367 return info.equals(this.info); 368 } 369 370 /** 371 * When a proxy is closed this cleans up the proxy and notifys the 372 * ConnectionEventListeners that a connection closed. 373 * 374 * @param proxy 375 */ 376 public void proxyClosedEvent(ManagedConnectionProxy proxy) { 377 proxyConnections.remove(proxy); 378 proxy.cleanup(); 379 fireCloseEvent(proxy); 380 } 381 382 public void onException(JMSException e) { 383 LOG.warn("Connection failed: " + e); 384 LOG.debug("Cause: ", e); 385 386 for (ManagedConnectionProxy proxy:proxyConnections) { 387 proxy.onException(e); 388 } 389 // Let the container know that the error occured. 390 fireErrorOccurredEvent(e); 391 } 392 393 /** 394 * @return Returns the transactionContext. 395 */ 396 public TransactionContext getTransactionContext() { 397 return transactionContext; 398 } 399 400}