001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.io.PrintWriter;
020import java.util.List;
021import java.util.concurrent.CopyOnWriteArrayList;
022import javax.jms.Connection;
023import javax.jms.ExceptionListener;
024import javax.jms.JMSException;
025import javax.resource.ResourceException;
026import javax.resource.spi.ConnectionEvent;
027import javax.resource.spi.ConnectionEventListener;
028import javax.resource.spi.ConnectionRequestInfo;
029import javax.resource.spi.LocalTransaction;
030import javax.resource.spi.ManagedConnection;
031import javax.resource.spi.ManagedConnectionMetaData;
032import javax.security.auth.Subject;
033import javax.transaction.xa.XAResource;
034import org.apache.activemq.ActiveMQConnection;
035import org.apache.activemq.LocalTransactionEventListener;
036import org.apache.activemq.TransactionContext;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * ActiveMQManagedConnection maps to real physical connection to the server.
042 * Since a ManagedConnection has to provide a transaction managment interface to
043 * the physical connection, and sessions are the objects implement transaction
044 * managment interfaces in the JMS API, this object also maps to a singe
045 * physical JMS session. <p/> The side-effect is that JMS connection the
046 * application gets will allways create the same session object. This is good if
047 * running in an app server since the sessions are elisted in the context
048 * transaction. This is bad if used outside of an app server since the user may
049 * be trying to create 2 different sessions to coordinate 2 different uow.
050 * 
051 * 
052 */
053public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO:
054                                                                                            // ,
055                                                                                            // DissociatableManagedConnection
056                                                                                            // {
057
058    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQManagedConnection.class);
059
060    private PrintWriter logWriter;
061
062    private final ActiveMQConnection physicalConnection;
063    private final TransactionContext transactionContext;
064    private final List<ManagedConnectionProxy> proxyConnections = new CopyOnWriteArrayList<ManagedConnectionProxy>();
065    private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
066    private final LocalAndXATransaction localAndXATransaction;
067
068    private Subject subject;
069    private ActiveMQConnectionRequestInfo info;
070    private boolean destroyed;
071
072    public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
073        try {
074            this.subject = subject;
075            this.info = info;
076            this.physicalConnection = physicalConnection;
077            this.transactionContext = new TransactionContext(physicalConnection);
078
079            this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
080                public void setInManagedTx(boolean inManagedTx) throws JMSException {
081                    super.setInManagedTx(inManagedTx);
082                    for (ManagedConnectionProxy proxy:proxyConnections) {
083                        proxy.setUseSharedTxContext(inManagedTx);
084                    }
085                }
086            };
087
088            this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() {
089                public void beginEvent() {
090                    fireBeginEvent();
091                }
092
093                public void commitEvent() {
094                    fireCommitEvent();
095                }
096
097                public void rollbackEvent() {
098                    fireRollbackEvent();
099                }
100            });
101
102            physicalConnection.setExceptionListener(this);
103        } catch (JMSException e) {
104            throw new ResourceException("Could not create a new connection: " + e.getMessage(), e);
105        }
106    }
107
108    public boolean isInManagedTx() {
109        return localAndXATransaction.isInManagedTx();
110    }
111
112    public static boolean matches(Object x, Object y) {
113        if (x == null ^ y == null) {
114            return false;
115        }
116        if (x != null && !x.equals(y)) {
117            return false;
118        }
119        return true;
120    }
121
122    public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
123
124        // Do we need to change the associated userid/password
125        if (!matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword())) {
126            physicalConnection.changeUserInfo(info.getUserName(), info.getPassword());
127        }
128
129        // Do we need to set the clientId?
130        if (info.getClientid() != null && info.getClientid().length() > 0) {
131            physicalConnection.setClientID(info.getClientid());
132        }
133
134        this.subject = subject;
135        this.info = info;
136    }
137
138    public Connection getPhysicalConnection() {
139        return physicalConnection;
140    }
141
142    private void fireBeginEvent() {
143        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
144        for(ConnectionEventListener l:listeners) {
145            l.localTransactionStarted(event);
146        }
147    }
148
149    private void fireCommitEvent() {
150        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
151        for(ConnectionEventListener l:listeners) {
152            l.localTransactionCommitted(event);
153        }
154    }
155
156    private void fireRollbackEvent() {
157        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
158        for(ConnectionEventListener l:listeners) {
159            l.localTransactionRolledback(event);
160        }
161    }
162
163    private void fireCloseEvent(ManagedConnectionProxy proxy) {
164        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_CLOSED);
165        event.setConnectionHandle(proxy);
166
167        for(ConnectionEventListener l:listeners) {
168            l.connectionClosed(event);
169        }
170    }
171
172    private void fireErrorOccurredEvent(Exception error) {
173        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
174        for(ConnectionEventListener l:listeners) {
175            l.connectionErrorOccurred(event);
176        }
177    }
178
179    /**
180     * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
181     *      javax.resource.spi.ConnectionRequestInfo)
182     */
183    public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
184        ManagedConnectionProxy proxy = new ManagedConnectionProxy(this);
185        proxyConnections.add(proxy);
186        return proxy;
187    }
188
189    private boolean isDestroyed() {
190        return destroyed;
191    }
192
193    /**
194     * Close down the physical connection to the server.
195     * 
196     * @see javax.resource.spi.ManagedConnection#destroy()
197     */
198    public void destroy() throws ResourceException {
199        // Have we allready been destroyed??
200        if (isDestroyed()) {
201            return;
202        }
203
204        cleanup();
205
206        try {
207            physicalConnection.close();
208            destroyed = true;
209        } catch (JMSException e) {
210            LOG.info("Error occured during close of a JMS connection.", e);
211        }
212    }
213
214    /**
215     * Cleans up all proxy handles attached to this physical connection so that
216     * they cannot be used anymore.
217     * 
218     * @see javax.resource.spi.ManagedConnection#cleanup()
219     */
220    public void cleanup() throws ResourceException {
221
222        // Have we allready been destroyed??
223        if (isDestroyed()) {
224            return;
225        }
226
227        for (ManagedConnectionProxy proxy:proxyConnections) {
228            proxy.cleanup();
229        }
230        proxyConnections.clear();
231
232        try {
233            physicalConnection.cleanup();
234        } catch (JMSException e) {
235            throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e);
236        }
237        // defer transaction cleanup till after close so that close is aware of the current tx
238        localAndXATransaction.cleanup();
239
240    }
241
242    /**
243     * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
244     */
245    public void associateConnection(Object connection) throws ResourceException {
246        if (connection instanceof ManagedConnectionProxy) {
247            ManagedConnectionProxy proxy = (ManagedConnectionProxy)connection;
248            proxyConnections.add(proxy);
249        } else {
250            throw new ResourceException("Not supported : associating connection instance of " + connection.getClass().getName());
251        }
252    }
253
254    /**
255     * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
256     */
257    public void addConnectionEventListener(ConnectionEventListener listener) {
258        listeners.add(listener);
259    }
260
261    /**
262     * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
263     */
264    public void removeConnectionEventListener(ConnectionEventListener listener) {
265        listeners.remove(listener);
266    }
267
268    /**
269     * @see javax.resource.spi.ManagedConnection#getXAResource()
270     */
271    public XAResource getXAResource() throws ResourceException {
272        return localAndXATransaction;
273    }
274
275    /**
276     * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
277     */
278    public LocalTransaction getLocalTransaction() throws ResourceException {
279        return localAndXATransaction;
280    }
281
282    /**
283     * @see javax.resource.spi.ManagedConnection#getMetaData()
284     */
285    public ManagedConnectionMetaData getMetaData() throws ResourceException {
286        return new ManagedConnectionMetaData() {
287
288            public String getEISProductName() throws ResourceException {
289                if (physicalConnection == null) {
290                    throw new ResourceException("Not connected.");
291                }
292                try {
293                    return physicalConnection.getMetaData().getJMSProviderName();
294                } catch (JMSException e) {
295                    throw new ResourceException("Error accessing provider.", e);
296                }
297            }
298
299            public String getEISProductVersion() throws ResourceException {
300                if (physicalConnection == null) {
301                    throw new ResourceException("Not connected.");
302                }
303                try {
304                    return physicalConnection.getMetaData().getProviderVersion();
305                } catch (JMSException e) {
306                    throw new ResourceException("Error accessing provider.", e);
307                }
308            }
309
310            public int getMaxConnections() throws ResourceException {
311                if (physicalConnection == null) {
312                    throw new ResourceException("Not connected.");
313                }
314                return Integer.MAX_VALUE;
315            }
316
317            public String getUserName() throws ResourceException {
318                if (physicalConnection == null) {
319                    throw new ResourceException("Not connected.");
320                }
321                try {
322                    return physicalConnection.getClientID();
323                } catch (JMSException e) {
324                    throw new ResourceException("Error accessing provider.", e);
325                }
326            }
327        };
328    }
329
330    /**
331     * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
332     */
333    public void setLogWriter(PrintWriter logWriter) throws ResourceException {
334        this.logWriter = logWriter;
335    }
336
337    /**
338     * @see javax.resource.spi.ManagedConnection#getLogWriter()
339     */
340    public PrintWriter getLogWriter() throws ResourceException {
341        return logWriter;
342    }
343
344    /**
345     * @param subject subject to match
346     * @param info cri to match
347     * @return whether the subject and cri match sufficiently to allow using this connection under the new circumstances
348     */
349    public boolean matches(Subject subject, ConnectionRequestInfo info) {
350        // Check to see if it is our info class
351        if (info == null) {
352            return false;
353        }
354        if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
355            return false;
356        }
357
358        // Do the subjects match?
359        if (subject == null ^ this.subject == null) {
360            return false;
361        }
362        if (subject != null && !subject.equals(this.subject)) {
363            return false;
364        }
365
366        // Does the info match?
367        return info.equals(this.info);
368    }
369
370    /**
371     * When a proxy is closed this cleans up the proxy and notifys the
372     * ConnectionEventListeners that a connection closed.
373     * 
374     * @param proxy
375     */
376    public void proxyClosedEvent(ManagedConnectionProxy proxy) {
377        proxyConnections.remove(proxy);
378        proxy.cleanup();
379        fireCloseEvent(proxy);
380    }
381
382    public void onException(JMSException e) {
383        LOG.warn("Connection failed: " + e);
384        LOG.debug("Cause: ", e);
385
386        for (ManagedConnectionProxy proxy:proxyConnections) {
387            proxy.onException(e);
388        }
389        // Let the container know that the error occured.
390        fireErrorOccurredEvent(e);
391    }
392
393    /**
394     * @return Returns the transactionContext.
395     */
396    public TransactionContext getTransactionContext() {
397        return transactionContext;
398    }
399
400}