001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import javax.jms.Connection;
023import javax.jms.ConnectionConsumer;
024import javax.jms.ConnectionMetaData;
025import javax.jms.Destination;
026import javax.jms.ExceptionListener;
027import javax.jms.IllegalStateException;
028import javax.jms.JMSException;
029import javax.jms.Queue;
030import javax.jms.QueueConnection;
031import javax.jms.QueueSession;
032import javax.jms.ServerSessionPool;
033import javax.jms.Session;
034import javax.jms.Topic;
035import javax.jms.TopicConnection;
036import javax.jms.TopicSession;
037import org.apache.activemq.ActiveMQQueueSession;
038import org.apache.activemq.ActiveMQSession;
039import org.apache.activemq.ActiveMQTopicSession;
040
041/**
042 * Acts as a pass through proxy for a JMS Connection object. It intercepts
043 * events that are of interest of the ActiveMQManagedConnection.
044 *
045 * 
046 */
047public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {
048
049    private ActiveMQManagedConnection managedConnection;
050    private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>();
051    private ExceptionListener exceptionListener;
052
053    public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) {
054        this.managedConnection = managedConnection;
055    }
056
057    /**
058     * Used to let the ActiveMQManagedConnection that this connection handel is
059     * not needed by the app.
060     *
061     * @throws JMSException
062     */
063    public void close() throws JMSException {
064        if (managedConnection != null) {
065            managedConnection.proxyClosedEvent(this);
066        }
067    }
068
069    /**
070     * Called by the ActiveMQManagedConnection to invalidate this proxy.
071     */
072    public void cleanup() {
073        exceptionListener = null;
074        managedConnection = null;
075        synchronized (sessions) {
076            for (ManagedSessionProxy p : sessions) {
077                try {
078                    //TODO is this dangerous?  should we copy the list before iterating?
079                    p.cleanup();
080                } catch (JMSException ignore) {
081                }
082            }
083            sessions.clear();
084        }
085    }
086
087    /**
088     * @return "physical" underlying activemq connection, if proxy is associated with a managed connection
089     * @throws javax.jms.JMSException if managed connection is null
090     */
091    private Connection getConnection() throws JMSException {
092        if (managedConnection == null) {
093            throw new IllegalStateException("The Connection is closed");
094        }
095        return managedConnection.getPhysicalConnection();
096    }
097
098    /**
099     * @param transacted      Whether session is transacted
100     * @param acknowledgeMode session acknowledge mode
101     * @return session proxy
102     * @throws JMSException on error
103     */
104    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
105        return createSessionProxy(transacted, acknowledgeMode);
106    }
107
108    /**
109     * @param transacted      Whether session is transacted
110     * @param acknowledgeMode session acknowledge mode
111     * @return session proxy
112     * @throws JMSException on error
113     */
114    private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
115        if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
116            acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
117        }
118        ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode);
119        ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
120        session.setTransactionContext(txContext);
121        ManagedSessionProxy p = new ManagedSessionProxy(session, this);
122        p.setUseSharedTxContext(managedConnection.isInManagedTx());
123        synchronized (sessions) {
124            sessions.add(p);
125        }
126        return p;
127    }
128
129    protected void sessionClosed(ManagedSessionProxy session) {
130        synchronized (sessions) {
131            sessions.remove(session);
132        }
133    }
134
135    public void setUseSharedTxContext(boolean enable) throws JMSException {
136        synchronized (sessions) {
137            for (ManagedSessionProxy p : sessions) {
138                p.setUseSharedTxContext(enable);
139            }
140        }
141    }
142
143    /**
144     * @param transacted      Whether session is transacted
145     * @param acknowledgeMode session acknowledge mode
146     * @return session proxy
147     * @throws JMSException on error
148     */
149    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
150        return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
151    }
152
153    /**
154     * @param transacted      Whether session is transacted
155     * @param acknowledgeMode session acknowledge mode
156     * @return session proxy
157     * @throws JMSException on error
158     */
159    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
160        return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
161    }
162
163    /**
164     * @return client id from delegate
165     * @throws JMSException
166     */
167    public String getClientID() throws JMSException {
168        return getConnection().getClientID();
169    }
170
171    /**
172     * @return exception listener from delegate
173     * @throws JMSException
174     */
175    public ExceptionListener getExceptionListener() throws JMSException {
176        return getConnection().getExceptionListener();
177    }
178
179    /**
180     * @return connection metadata from delegate
181     * @throws JMSException
182     */
183    public ConnectionMetaData getMetaData() throws JMSException {
184        return getConnection().getMetaData();
185    }
186
187    /**
188     * Sets client id on delegate
189     * @param clientID new clientId
190     * @throws JMSException
191     */
192    public void setClientID(String clientID) throws JMSException {
193        getConnection().setClientID(clientID);
194    }
195
196    /**
197     * sets exception listener on delegate
198     * @param listener new listener
199     * @throws JMSException
200     */
201    public void setExceptionListener(ExceptionListener listener) throws JMSException {
202        getConnection();
203        exceptionListener = listener;
204    }
205
206    /**
207     * @throws JMSException
208     */
209    public void start() throws JMSException {
210        getConnection().start();
211    }
212
213    /**
214     * @throws JMSException
215     */
216    public void stop() throws JMSException {
217        getConnection().stop();
218    }
219
220    /**
221     * @param queue
222     * @param messageSelector
223     * @param sessionPool
224     * @param maxMessages
225     * @return
226     * @throws JMSException
227     */
228    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
229        throw new JMSException("Not Supported.");
230    }
231
232    /**
233     * @param topic
234     * @param messageSelector
235     * @param sessionPool
236     * @param maxMessages
237     * @return
238     * @throws JMSException
239     */
240    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
241        throw new JMSException("Not Supported.");
242    }
243
244    /**
245     * @param destination
246     * @param messageSelector
247     * @param sessionPool
248     * @param maxMessages
249     * @return
250     * @throws JMSException
251     */
252    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
253        throw new JMSException("Not Supported.");
254    }
255
256    /**
257     * @param topic
258     * @param subscriptionName
259     * @param messageSelector
260     * @param sessionPool
261     * @param maxMessages
262     * @return
263     * @throws JMSException
264     */
265    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
266        throw new JMSException("Not Supported.");
267    }
268
269    /**
270     * @return Returns the managedConnection.
271     */
272    public ActiveMQManagedConnection getManagedConnection() {
273        return managedConnection;
274    }
275
276    public void onException(JMSException e) {
277        if (exceptionListener != null && managedConnection != null) {
278            try {
279                exceptionListener.onException(e);
280            } catch (Throwable ignore) {
281                // We can never trust user code so ignore any exceptions.
282            }
283        }
284    }
285
286}