org.apache.uima.adapter.jms.activemq
Class ConcurrentMessageListener
java.lang.Object
org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener
- All Implemented Interfaces:
- org.springframework.jms.listener.SessionAwareMessageListener
public class ConcurrentMessageListener
- extends java.lang.Object
- implements org.springframework.jms.listener.SessionAwareMessageListener
Message listener injected at runtime into Aggregate to handle a race condition when multiple
threads simultaneously process messages from a Cas Multiplier. It is only used to process
messages from a Cas Multiplier and only if the reply queue has more than one consumer thread
configured in a deployment descriptor. The listener creates a pool of threads equal to the number
of concurrent consumers defined in the DD for the listener on the reply queue. Once the message
is handled in onMessage(), it is than delegated for processing to one of the available threads
from the pool.
This listener guarantees processing order. It receives messages from Spring in a single thread
and if it finds a child CAS in the message, it increments the parent (input) CAS child count and
delegates processing to the InputChannel instance.
The race condition: The Cas Multiplier sends the last child and the parent almost at the same
time. Both are received by the aggregate and are processed in different threads, if a scaleout is
used on the reply queue. One thread may start processing the input CAS while the other thread
(with the last child) is not yet allowed to run. The first thread takes the input CAS all the way
to the final step and since at this time, the input CAS has no children ( the thread processing
the last child has not updated the child count yet), it will be prematurely released. When the
thread with the last child is allowed to run, it finds that the parent no longer exists in the
cache.
Constructor Summary |
ConcurrentMessageListener(int concurrentThreads,
java.lang.Object delegateListener,
java.lang.String destination,
java.lang.ThreadGroup threadGroup,
java.lang.String threadPrefix)
Creates a listener with a given number of process threads. |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
ConcurrentMessageListener
public ConcurrentMessageListener(int concurrentThreads,
java.lang.Object delegateListener,
java.lang.String destination,
java.lang.ThreadGroup threadGroup,
java.lang.String threadPrefix)
throws java.io.InvalidClassException
- Creates a listener with a given number of process threads. This listener is injected between
Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on
reply queues that have scale out attribute in DD greater than 1. Its main job is to increment
number of child CASes for a given input CAS. It does so in a single thread, and once it
completes the update this listener submits the CAS for further processing up to the
JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to
process the CAS.
- Parameters:
concurrentThreads
- - number of threads to use to process CASesdelegateListener
- - JmsInputChannel instance to delegate CAS to
- Throws:
java.io.InvalidClassException
getTaskExecutor
public java.util.concurrent.ThreadPoolExecutor getTaskExecutor()
stop
public void stop()
setAnalysisEngineController
public void setAnalysisEngineController(AnalysisEngineController controller)
onMessage
public void onMessage(javax.jms.Message message,
javax.jms.Session session)
throws javax.jms.JMSException
- Intercept a message to increment a child count of the input CAS. This method is always called
in a single thread, guaranteeing order of processing. The child CAS will always come here
first. Once the count is updated, or this CAS is not an child, the message will be delegated to
one of the threads in the pool that will eventually call InputChannel object where the actual
processing of the message begins.
- Specified by:
onMessage
in interface org.springframework.jms.listener.SessionAwareMessageListener
- Throws:
javax.jms.JMSException
Copyright © 2012. All Rights Reserved.