001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.lang.reflect.Method;
020
021import java.util.concurrent.atomic.AtomicBoolean;
022import javax.jms.Connection;
023import javax.jms.ConnectionConsumer;
024import javax.jms.ExceptionListener;
025import javax.jms.JMSException;
026import javax.jms.Message;
027import javax.jms.MessageListener;
028import javax.jms.Session;
029import javax.jms.Topic;
030import javax.resource.ResourceException;
031import javax.resource.spi.endpoint.MessageEndpointFactory;
032import javax.resource.spi.work.Work;
033import javax.resource.spi.work.WorkException;
034import javax.resource.spi.work.WorkManager;
035
036import org.apache.activemq.ActiveMQConnection;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQQueue;
039import org.apache.activemq.command.ActiveMQTopic;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 *  $Date$
045 */
046public class ActiveMQEndpointWorker {
047
048    public static final Method ON_MESSAGE_METHOD;
049    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class);
050
051    private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
052    private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds.
053    private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<Session>();
054
055    static {
056        try {
057            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
058                Message.class
059            });
060        } catch (Exception e) {
061            throw new ExceptionInInitializerError(e);
062        }
063    }
064
065    protected final ActiveMQEndpointActivationKey endpointActivationKey;
066    protected final MessageEndpointFactory endpointFactory;
067    protected final WorkManager workManager;
068    protected final boolean transacted;
069
070    private final ActiveMQDestination dest;
071    private final Work connectWork;
072    private final AtomicBoolean connecting = new AtomicBoolean(false);    
073    private final Object shutdownMutex = new String("shutdownMutex");
074    
075    private ActiveMQConnection connection;
076    private ConnectionConsumer consumer;
077    private ServerSessionPoolImpl serverSessionPool;
078    private boolean running;
079
080    protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
081        this.endpointActivationKey = key;
082        this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
083        this.workManager = adapter.getBootstrapContext().getWorkManager();
084        try {
085            this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
086        } catch (NoSuchMethodException e) {
087            throw new ResourceException("Endpoint does not implement the onMessage method.");
088        }
089
090        connectWork = new Work() {
091            long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
092
093            public void release() {
094                //
095            }
096
097            public void run() {
098                currentReconnectDelay = INITIAL_RECONNECT_DELAY;
099                MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
100                if ( LOG.isInfoEnabled() ) {
101                    LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
102                }
103
104                while ( connecting.get() && running ) {
105                try {
106                    connection = adapter.makeConnection(activationSpec);
107                    connection.setExceptionListener(new ExceptionListener() {
108                        public void onException(JMSException error) {
109                            if (!serverSessionPool.isClosing()) {
110                                    // initiate reconnection only once, i.e. on initial exception
111                                    // and only if not already trying to connect
112                                    LOG.error("Connection to broker failed: " + error.getMessage(), error);
113                                    if ( connecting.compareAndSet(false, true) ) {
114                                        synchronized ( connectWork ) {
115                                            disconnect();
116                                            serverSessionPool.closeIdleSessions();
117                                            connect();
118                            }
119                                    } else {
120                                        // connection attempt has already been initiated
121                                        LOG.info("Connection attempt already in progress, ignoring connection exception");
122                        }
123                                }
124                            }
125                    });
126                        connection.start();
127
128                        int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
129                    if (activationSpec.isDurableSubscription()) {
130                            consumer = connection.createDurableConnectionConsumer(
131                                    (Topic) dest,
132                                    activationSpec.getSubscriptionName(), 
133                                    emptyToNull(activationSpec.getMessageSelector()),
134                                    serverSessionPool, 
135                                    prefetchSize,
136                                    activationSpec.getNoLocalBooleanValue());
137                    } else {
138                            consumer = connection.createConnectionConsumer(
139                                    dest, 
140                                    emptyToNull(activationSpec.getMessageSelector()), 
141                                    serverSessionPool, 
142                                    prefetchSize,
143                                                                       activationSpec.getNoLocalBooleanValue());
144                    }
145
146
147                        if ( connecting.compareAndSet(true, false) ) {
148                            if ( LOG.isInfoEnabled() ) {
149                                LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
150                            }
151                        } else {
152                            LOG.error("Could not release connection lock");
153                        }
154                } catch (JMSException error) {
155                        if ( LOG.isDebugEnabled() ) {
156                            LOG.debug("Failed to connect: " + error.getMessage(), error);
157                }
158                        disconnect();
159                        pause(error);
160            }
161                }
162            }
163            
164            private void pause(JMSException error) {
165                if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
166                    LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 
167                            + error.getMessage(), error);
168                    LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
169                }
170                try {
171                    synchronized ( shutdownMutex ) {
172                        // shutdownMutex will be notified by stop() method in
173                        // order to accelerate shutdown of endpoint
174                        shutdownMutex.wait(currentReconnectDelay);
175                    }
176                } catch ( InterruptedException e ) {
177                    Thread.interrupted();
178                }
179                currentReconnectDelay *= 2;
180                if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
181                    currentReconnectDelay = MAX_RECONNECT_DELAY;
182                }                
183            }
184        };
185
186        MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
187        if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
188            dest = new ActiveMQQueue(activationSpec.getDestination());
189        } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
190            dest = new ActiveMQTopic(activationSpec.getDestination());
191        } else {
192            throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
193        }
194
195    }
196
197    /**
198     * @param c
199     */
200    public static void safeClose(Connection c) {
201        try {
202            if (c != null) {
203                LOG.debug("Closing connection to broker");
204                c.close();
205            }
206        } catch (JMSException e) {
207            //
208        }
209    }
210
211    /**
212     * @param cc
213     */
214    public static void safeClose(ConnectionConsumer cc) {
215        try {
216            if (cc != null) {
217                LOG.debug("Closing ConnectionConsumer");
218                cc.close();
219            }
220        } catch (JMSException e) {
221            //
222        }
223    }
224
225    /**
226     * 
227     */
228    public void start() throws ResourceException {
229        synchronized (connectWork) {
230            if (running)
231            return;
232        running = true;
233
234            if ( connecting.compareAndSet(false, true) ) {
235                LOG.info("Starting");
236        serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
237        connect();
238            } else {
239                LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
240    }
241        }
242    }
243
244    /**
245     * 
246     */
247    public void stop() throws InterruptedException {
248        synchronized (shutdownMutex) {
249            if (!running)
250                return;
251            running = false;
252            LOG.info("Stopping");
253            // wake up pausing reconnect attempt
254            shutdownMutex.notifyAll();
255            serverSessionPool.close();
256        }
257        disconnect();
258    }
259
260    private boolean isRunning() {
261        return running;
262    }
263
264    private void connect() {
265        synchronized ( connectWork ) {
266        if (!running) {
267            return;
268        }
269
270        try {
271            workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
272        } catch (WorkException e) {
273            running = false;
274            LOG.error("Work Manager did not accept work: ", e);
275        }
276    }
277    }
278
279    /**
280     * 
281     */
282    private void disconnect() {
283        synchronized ( connectWork ) {
284        safeClose(consumer);
285        consumer = null;
286        safeClose(connection);
287        connection = null;
288    }
289            }
290
291    protected void registerThreadSession(Session session) {
292        THREAD_LOCAL.set(session);
293    }
294
295    protected void unregisterThreadSession(Session session) {
296        THREAD_LOCAL.set(null);
297    }
298
299    protected ActiveMQConnection getConnection() {
300        // make sure we only return a working connection
301        // in particular make sure that we do not return null
302        // after the resource adapter got disconnected from
303        // the broker via the disconnect() method
304        synchronized ( connectWork ) {
305            return connection;
306        }
307    }
308
309    private String emptyToNull(String value) {
310        if (value == null || value.length() == 0) {
311            return null;
312        }
313        return value;
314    }
315
316}