001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.lang.reflect.Method;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023import javax.jms.MessageListener;
024import javax.jms.MessageProducer;
025import javax.jms.ServerSession;
026import javax.jms.Session;
027import javax.resource.spi.endpoint.MessageEndpoint;
028import javax.resource.spi.work.Work;
029import javax.resource.spi.work.WorkEvent;
030import javax.resource.spi.work.WorkException;
031import javax.resource.spi.work.WorkListener;
032import javax.resource.spi.work.WorkManager;
033
034import org.apache.activemq.ActiveMQSession;
035import org.apache.activemq.ActiveMQSession.DeliveryListener;
036import org.apache.activemq.TransactionContext;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * 
042 */
043public class ServerSessionImpl implements ServerSession, InboundContext, Work, DeliveryListener {
044
045    public static final Method ON_MESSAGE_METHOD;
046    private static int nextLogId;
047
048    static {
049        try {
050            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
051                Message.class
052            });
053        } catch (Exception e) {
054            throw new ExceptionInInitializerError(e);
055        }
056    }
057
058
059    private int serverSessionId = getNextLogId();
060    private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class.getName() + ":" + serverSessionId);
061
062    private ActiveMQSession session;
063    private WorkManager workManager;
064    private MessageEndpoint endpoint;
065    private MessageProducer messageProducer;
066    private final ServerSessionPoolImpl pool;
067
068    private Object runControlMutex = new Object();
069    private boolean runningFlag;
070    /**
071     * True if an error was detected that cause this session to be stale. When a
072     * session is stale, it should not be used again for proccessing.
073     */
074    private boolean stale;
075    /**
076     * Does the TX commit need to be managed by the RA?
077     */
078    private final boolean useRAManagedTx;
079    /**
080     * The maximum number of messages to batch
081     */
082    private final int batchSize;
083    /**
084     * The current number of messages in the batch
085     */
086    private int currentBatchSize;
087
088    public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
089        this.pool = pool;
090        this.session = session;
091        this.workManager = workManager;
092        this.endpoint = endpoint;
093        this.useRAManagedTx = useRAManagedTx;
094        this.session.setMessageListener((MessageListener)endpoint);
095        this.session.setDeliveryListener(this);
096        this.batchSize = batchSize;
097    }
098
099    private static synchronized int getNextLogId() {
100        return nextLogId++;
101    }
102
103    public Session getSession() throws JMSException {
104        return session;
105    }
106
107    protected boolean isStale() {
108        return stale || !session.isRunning();
109    }
110
111    public MessageProducer getMessageProducer() throws JMSException {
112        if (messageProducer == null) {
113            messageProducer = getSession().createProducer(null);
114        }
115        return messageProducer;
116    }
117
118    /**
119     * @see javax.jms.ServerSession#start()
120     */
121    public void start() throws JMSException {
122
123        synchronized (runControlMutex) {
124            if (runningFlag) {
125                log.debug("Start request ignored, already running.");
126                return;
127            }
128            runningFlag = true;
129        }
130
131        // We get here because we need to start a async worker.
132        log.debug("Starting run.");
133        try {
134            workManager.scheduleWork(this, WorkManager.INDEFINITE, null, new WorkListener() {
135                // The work listener is useful only for debugging...
136                public void workAccepted(WorkEvent event) {
137                    log.debug("Work accepted: " + event);
138                }
139
140                public void workRejected(WorkEvent event) {
141                    log.debug("Work rejected: " + event);
142                }
143
144                public void workStarted(WorkEvent event) {
145                    log.debug("Work started: " + event);
146                }
147
148                public void workCompleted(WorkEvent event) {
149                    log.debug("Work completed: " + event);
150                }
151
152            });
153        } catch (WorkException e) {
154            throw (JMSException)new JMSException("Start failed: " + e).initCause(e);
155        }
156    }
157
158    /**
159     * @see java.lang.Runnable#run()
160     */
161    public void run() {
162        log.debug("Running");
163        currentBatchSize = 0;
164        while (true) {
165            log.debug("run loop start");
166            try {
167                InboundContextSupport.register(this);
168                if ( session.isRunning() ) {
169                session.run();
170                } else {
171                    log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
172                    stale = true;
173                }
174            } catch (Throwable e) {
175                stale = true;
176                if ( log.isDebugEnabled() ) {
177                    log.debug("Endpoint failed to process message.", e);
178                } else if ( log.isInfoEnabled() ) {
179                    log.info("Endpoint failed to process message. Reason: " + e.getMessage());                    
180                }
181            } finally {
182                InboundContextSupport.unregister(this);
183                log.debug("run loop end");
184                synchronized (runControlMutex) {
185                    // This endpoint may have gone stale due to error
186                    if (stale) {
187                        runningFlag = false;
188                        pool.removeFromPool(this);
189                        break;
190                    }
191                    if (!session.hasUncomsumedMessages()) {
192                        runningFlag = false;
193                        pool.returnToPool(this);
194                        break;
195                    }
196                }
197            }
198        }
199        log.debug("Run finished");
200    }
201
202    /**
203     * The ActiveMQSession's run method will call back to this method before
204     * dispactching a message to the MessageListener.
205     */
206    public void beforeDelivery(ActiveMQSession session, Message msg) {
207        if (currentBatchSize == 0) {
208            try {
209                endpoint.beforeDelivery(ON_MESSAGE_METHOD);
210            } catch (Throwable e) {
211                throw new RuntimeException("Endpoint before delivery notification failure", e);
212            }
213        }
214    }
215
216    /**
217     * The ActiveMQSession's run method will call back to this method after
218     * dispactching a message to the MessageListener.
219     */
220    public void afterDelivery(ActiveMQSession session, Message msg) {
221        if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
222            currentBatchSize = 0;
223            try {
224                endpoint.afterDelivery();
225            } catch (Throwable e) {
226                throw new RuntimeException("Endpoint after delivery notification failure", e);
227            } finally {
228                TransactionContext transactionContext = session.getTransactionContext();
229                if (transactionContext != null && transactionContext.isInLocalTransaction()) {
230                    if (!useRAManagedTx) {
231                        // Sanitiy Check: If the local transaction has not been
232                        // commited..
233                        // Commit it now.
234                        log.warn("Local transaction had not been commited. Commiting now.");
235                    }
236                    try {
237                        session.commit();
238                    } catch (JMSException e) {
239                        log.info("Commit failed:", e);
240                    }
241                }
242            }
243        }
244    }
245
246    /**
247     * @see javax.resource.spi.work.Work#release()
248     */
249    public void release() {
250        log.debug("release called");
251    }
252
253    /**
254     * @see java.lang.Object#toString()
255     */
256    @Override
257    public String toString() {
258        return "ServerSessionImpl:" + serverSessionId;
259    }
260
261    public void close() {
262        try {
263            endpoint.release();
264        } catch (Throwable e) {
265            log.debug("Endpoint did not release properly: " + e.getMessage(), e);
266        }
267        try {
268            session.close();
269        } catch (Throwable e) {
270            log.debug("Session did not close properly: " + e.getMessage(), e);
271        }
272    }
273
274}