001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.lang.reflect.Method; 020 021import javax.jms.JMSException; 022import javax.jms.Message; 023import javax.jms.MessageListener; 024import javax.jms.MessageProducer; 025import javax.jms.ServerSession; 026import javax.jms.Session; 027import javax.resource.spi.endpoint.MessageEndpoint; 028import javax.resource.spi.work.Work; 029import javax.resource.spi.work.WorkEvent; 030import javax.resource.spi.work.WorkException; 031import javax.resource.spi.work.WorkListener; 032import javax.resource.spi.work.WorkManager; 033 034import org.apache.activemq.ActiveMQSession; 035import org.apache.activemq.ActiveMQSession.DeliveryListener; 036import org.apache.activemq.TransactionContext; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * 042 */ 043public class ServerSessionImpl implements ServerSession, InboundContext, Work, DeliveryListener { 044 045 public static final Method ON_MESSAGE_METHOD; 046 private static int nextLogId; 047 048 static { 049 try { 050 ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] { 051 Message.class 052 }); 053 } catch (Exception e) { 054 throw new ExceptionInInitializerError(e); 055 } 056 } 057 058 059 private int serverSessionId = getNextLogId(); 060 private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class.getName() + ":" + serverSessionId); 061 062 private ActiveMQSession session; 063 private WorkManager workManager; 064 private MessageEndpoint endpoint; 065 private MessageProducer messageProducer; 066 private final ServerSessionPoolImpl pool; 067 068 private Object runControlMutex = new Object(); 069 private boolean runningFlag; 070 /** 071 * True if an error was detected that cause this session to be stale. When a 072 * session is stale, it should not be used again for proccessing. 073 */ 074 private boolean stale; 075 /** 076 * Does the TX commit need to be managed by the RA? 077 */ 078 private final boolean useRAManagedTx; 079 /** 080 * The maximum number of messages to batch 081 */ 082 private final int batchSize; 083 /** 084 * The current number of messages in the batch 085 */ 086 private int currentBatchSize; 087 088 public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException { 089 this.pool = pool; 090 this.session = session; 091 this.workManager = workManager; 092 this.endpoint = endpoint; 093 this.useRAManagedTx = useRAManagedTx; 094 this.session.setMessageListener((MessageListener)endpoint); 095 this.session.setDeliveryListener(this); 096 this.batchSize = batchSize; 097 } 098 099 private static synchronized int getNextLogId() { 100 return nextLogId++; 101 } 102 103 public Session getSession() throws JMSException { 104 return session; 105 } 106 107 protected boolean isStale() { 108 return stale || !session.isRunning(); 109 } 110 111 public MessageProducer getMessageProducer() throws JMSException { 112 if (messageProducer == null) { 113 messageProducer = getSession().createProducer(null); 114 } 115 return messageProducer; 116 } 117 118 /** 119 * @see javax.jms.ServerSession#start() 120 */ 121 public void start() throws JMSException { 122 123 synchronized (runControlMutex) { 124 if (runningFlag) { 125 log.debug("Start request ignored, already running."); 126 return; 127 } 128 runningFlag = true; 129 } 130 131 // We get here because we need to start a async worker. 132 log.debug("Starting run."); 133 try { 134 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, new WorkListener() { 135 // The work listener is useful only for debugging... 136 public void workAccepted(WorkEvent event) { 137 log.debug("Work accepted: " + event); 138 } 139 140 public void workRejected(WorkEvent event) { 141 log.debug("Work rejected: " + event); 142 } 143 144 public void workStarted(WorkEvent event) { 145 log.debug("Work started: " + event); 146 } 147 148 public void workCompleted(WorkEvent event) { 149 log.debug("Work completed: " + event); 150 } 151 152 }); 153 } catch (WorkException e) { 154 throw (JMSException)new JMSException("Start failed: " + e).initCause(e); 155 } 156 } 157 158 /** 159 * @see java.lang.Runnable#run() 160 */ 161 public void run() { 162 log.debug("Running"); 163 currentBatchSize = 0; 164 while (true) { 165 log.debug("run loop start"); 166 try { 167 InboundContextSupport.register(this); 168 if ( session.isRunning() ) { 169 session.run(); 170 } else { 171 log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale"); 172 stale = true; 173 } 174 } catch (Throwable e) { 175 stale = true; 176 if ( log.isDebugEnabled() ) { 177 log.debug("Endpoint failed to process message.", e); 178 } else if ( log.isInfoEnabled() ) { 179 log.info("Endpoint failed to process message. Reason: " + e.getMessage()); 180 } 181 } finally { 182 InboundContextSupport.unregister(this); 183 log.debug("run loop end"); 184 synchronized (runControlMutex) { 185 // This endpoint may have gone stale due to error 186 if (stale) { 187 runningFlag = false; 188 pool.removeFromPool(this); 189 break; 190 } 191 if (!session.hasUncomsumedMessages()) { 192 runningFlag = false; 193 pool.returnToPool(this); 194 break; 195 } 196 } 197 } 198 } 199 log.debug("Run finished"); 200 } 201 202 /** 203 * The ActiveMQSession's run method will call back to this method before 204 * dispactching a message to the MessageListener. 205 */ 206 public void beforeDelivery(ActiveMQSession session, Message msg) { 207 if (currentBatchSize == 0) { 208 try { 209 endpoint.beforeDelivery(ON_MESSAGE_METHOD); 210 } catch (Throwable e) { 211 throw new RuntimeException("Endpoint before delivery notification failure", e); 212 } 213 } 214 } 215 216 /** 217 * The ActiveMQSession's run method will call back to this method after 218 * dispactching a message to the MessageListener. 219 */ 220 public void afterDelivery(ActiveMQSession session, Message msg) { 221 if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) { 222 currentBatchSize = 0; 223 try { 224 endpoint.afterDelivery(); 225 } catch (Throwable e) { 226 throw new RuntimeException("Endpoint after delivery notification failure", e); 227 } finally { 228 TransactionContext transactionContext = session.getTransactionContext(); 229 if (transactionContext != null && transactionContext.isInLocalTransaction()) { 230 if (!useRAManagedTx) { 231 // Sanitiy Check: If the local transaction has not been 232 // commited.. 233 // Commit it now. 234 log.warn("Local transaction had not been commited. Commiting now."); 235 } 236 try { 237 session.commit(); 238 } catch (JMSException e) { 239 log.info("Commit failed:", e); 240 } 241 } 242 } 243 } 244 } 245 246 /** 247 * @see javax.resource.spi.work.Work#release() 248 */ 249 public void release() { 250 log.debug("release called"); 251 } 252 253 /** 254 * @see java.lang.Object#toString() 255 */ 256 @Override 257 public String toString() { 258 return "ServerSessionImpl:" + serverSessionId; 259 } 260 261 public void close() { 262 try { 263 endpoint.release(); 264 } catch (Throwable e) { 265 log.debug("Endpoint did not release properly: " + e.getMessage(), e); 266 } 267 try { 268 session.close(); 269 } catch (Throwable e) { 270 log.debug("Session did not close properly: " + e.getMessage(), e); 271 } 272 } 273 274}