|
Groovy Documentation | |||||||
FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectgroovyx.gpars.actor.impl.SequentialProcessingActor
gpars.actor.AbstractPooledActor
@SuppressWarnings({"ThrowCaughtLocally", "UnqualifiedStaticUsage"}) class AbstractPooledActor extends SequentialProcessingActor
AbstractPooledActor provides the default Actor implementation. It represents a standalone active object (actor), which reacts asynchronously to messages sent to it from outside through the send() method. Each Actor has its own message queue and a thread pool shared with other Actors by means of an instance of the PGroup, which they have in common. The PGroup instance is responsible for the pool creation, management and shutdown. All work performed by an Actor is divided into chunks, which are sequentially submitted as independent tasks to the thread pool for processing. Whenever an Actor looks for a new message through the react() method, the actor gets detached from the thread, making the thread available for other actors. Thanks to the ability to dynamically attach and detach threads to actors, Actors can scale far beyond the limits of the underlying platform on number of concurrently available threads. The receive() method can be used to read a message from the queue without giving up the thread. If no message is available, the call to receive() blocks until a message arrives or the supplied timeout expires. The loop() method allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different thread from the thread pool. To support continuations correctly the react() and loop() methods never return.
import static groovyx.gpars.actor.Actors.actor def actor = actor { loop { react {message -> println message } //this line will never be reached } //this line will never be reached }.start() actor.send 'Hi!'This requires the code to be structured accordingly.
def adder = actor { loop { react {a -> react {b -> println a+b replyIfExists a+b //sends reply, if b was sent by a PooledActor } } //this line will never be reached } //this line will never be reached }.start()The react method can accept multiple messages in the passed-in closure
react {Integer a, String b -> ... }The closures passed to the react() method can call reply() or replyIfExists(), which will send a message back to the originator of the currently processed message. The replyIfExists() method unlike the reply() method will not fail if the original message wasn't sent by an actor nor if the original sender actor is no longer running. The reply() and replyIfExists() methods are also dynamically added to the processed messages.
react {a -> react {b -> reply 'message' //sent to senders of a as well as b a.reply 'private message' //sent to the sender of a only } }The react() method accepts timeout specified using the TimeCategory DSL.
react(10.MINUTES) { println 'Received message: ' + it }If no message arrives within the given timeout, the onTimeout() lifecycle handler is invoked, if exists, and the actor terminates. Each Actor has at any point in time at most one active instance of ActorAction associated, which abstracts the current chunk of actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward till loop() or react() is called. These methods schedule another ActorAction for processing and throw dedicated exception to terminate the current ActorAction. Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.
Constructor Summary | |
AbstractPooledActor()
|
Method Summary | |
---|---|
protected void
|
act()
This method represents the body of the actor. |
protected void
|
doOnException(Throwable exception)
|
protected void
|
doOnInterrupt(InterruptedException exception)
|
protected void
|
doOnStart()
|
protected void
|
doOnTermination()
|
protected void
|
doOnTimeout()
|
void
|
onStop(Closure onStop)
Set on stop handler for this actor |
protected void
|
receive(Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void
|
receive(long timeout, TimeUnit timeUnit, Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void
|
receive(Duration duration, Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected Object
|
receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected Object
|
receiveImpl(long timeout, TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
List
|
sweepQueue()
Clears the message queue returning all the messages it held. |
Constructor Detail |
---|
AbstractPooledActor()
Method Detail |
---|
protected void act()
@SuppressWarnings({"UseOfSystemOutOrSystemErr"}) @Override protected void doOnException(Throwable exception)
@SuppressWarnings({"UseOfSystemOutOrSystemErr"}) @Override protected void doOnInterrupt(InterruptedException exception)
@Override protected void doOnStart()
@Override @SuppressWarnings({"FeatureEnvy"}) protected void doOnTermination()
@Override protected void doOnTimeout()
public final void onStop(Closure onStop)
@SuppressWarnings({"MethodOverloadsMethodOfSuperclass"}) protected final void receive(Closure handler)
protected final void receive(long timeout, TimeUnit timeUnit, Closure handler)
@SuppressWarnings({"MethodOverloadsMethodOfSuperclass", "TypeMayBeWeakened"}) protected final void receive(Duration duration, Closure handler)
@Override protected final Object receiveImpl()
@Override protected final Object receiveImpl(long timeout, TimeUnit units)
public final List sweepQueue()
Groovy Documentation