Groovy Documentation

gpars.actor
Class AbstractPooledActor

java.lang.Object
  groovyx.gpars.actor.impl.SequentialProcessingActor
      gpars.actor.AbstractPooledActor

@SuppressWarnings({"ThrowCaughtLocally", "UnqualifiedStaticUsage"})
class AbstractPooledActor
extends SequentialProcessingActor

AbstractPooledActor provides the default Actor implementation. It represents a standalone active object (actor), which reacts asynchronously to messages sent to it from outside through the send() method. Each Actor has its own message queue and a thread pool shared with other Actors by means of an instance of the PGroup, which they have in common. The PGroup instance is responsible for the pool creation, management and shutdown. All work performed by an Actor is divided into chunks, which are sequentially submitted as independent tasks to the thread pool for processing. Whenever an Actor looks for a new message through the react() method, the actor gets detached from the thread, making the thread available for other actors. Thanks to the ability to dynamically attach and detach threads to actors, Actors can scale far beyond the limits of the underlying platform on number of concurrently available threads. The receive() method can be used to read a message from the queue without giving up the thread. If no message is available, the call to receive() blocks until a message arrives or the supplied timeout expires. The loop() method allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different thread from the thread pool. To support continuations correctly the react() and loop() methods never return.

 import static groovyx.gpars.actor.Actors.actor
 

def actor = actor { loop { react {message -> println message } //this line will never be reached } //this line will never be reached }.start()

actor.send 'Hi!'

This requires the code to be structured accordingly.

 def adder = actor {
     loop {
         react {a ->
             react {b ->
                 println a+b
                 replyIfExists a+b  //sends reply, if b was sent by a PooledActor
             }
         }
         //this line will never be reached
     }
     //this line will never be reached
 }.start()
 
The react method can accept multiple messages in the passed-in closure
 react {Integer a, String b ->
     ...
 }
 
The closures passed to the react() method can call reply() or replyIfExists(), which will send a message back to the originator of the currently processed message. The replyIfExists() method unlike the reply() method will not fail if the original message wasn't sent by an actor nor if the original sender actor is no longer running. The reply() and replyIfExists() methods are also dynamically added to the processed messages.
 react {a ->
     react {b ->
         reply 'message'  //sent to senders of a as well as b
         a.reply 'private message'  //sent to the sender of a only
     }
 }
 

The react() method accepts timeout specified using the TimeCategory DSL.

 react(10.MINUTES) {
     println 'Received message: ' + it
 }
 
If no message arrives within the given timeout, the onTimeout() lifecycle handler is invoked, if exists, and the actor terminates. Each Actor has at any point in time at most one active instance of ActorAction associated, which abstracts the current chunk of actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward till loop() or react() is called. These methods schedule another ActorAction for processing and throw dedicated exception to terminate the current ActorAction.

Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.

author:
Vaclav Pech, Alex Tkachman, Dierk Koenig Date: Feb 7, 2009


Constructor Summary
AbstractPooledActor()

 
Method Summary
protected void act()

This method represents the body of the actor.

protected void doOnException(Throwable exception)

protected void doOnInterrupt(InterruptedException exception)

protected void doOnStart()

protected void doOnTermination()

protected void doOnTimeout()

void onStop(Closure onStop)

Set on stop handler for this actor

protected void receive(Closure handler)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected void receive(long timeout, TimeUnit timeUnit, Closure handler)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected void receive(Duration duration, Closure handler)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected Object receiveImpl()

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected Object receiveImpl(long timeout, TimeUnit units)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

List sweepQueue()

Clears the message queue returning all the messages it held.

 

Constructor Detail

AbstractPooledActor

AbstractPooledActor()


 
Method Detail

act

protected void act()
This method represents the body of the actor. It is called upon actor's start and can exit either normally by return or due to actor being stopped through the stop() method, which cancels the current actor action. Provides an extension point for subclasses to provide their custom Actor's message handling code.


doOnException

@SuppressWarnings({"UseOfSystemOutOrSystemErr"})
@Override
protected void doOnException(Throwable exception)


doOnInterrupt

@SuppressWarnings({"UseOfSystemOutOrSystemErr"})
@Override
protected void doOnInterrupt(InterruptedException exception)


doOnStart

@Override
protected void doOnStart()


doOnTermination

@Override
@SuppressWarnings({"FeatureEnvy"})
protected void doOnTermination()


doOnTimeout

@Override
protected void doOnTimeout()


onStop

public final void onStop(Closure onStop)
Set on stop handler for this actor
param:
onStop The code to invoke when stopping


receive

@SuppressWarnings({"MethodOverloadsMethodOfSuperclass"})
protected final void receive(Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter.
param:
handler A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.


receive

protected final void receive(long timeout, TimeUnit timeUnit, Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter. A null value is passed into the handler, if the timeout expires
param:
timeout how long to wait before giving up, in units of unit
param:
timeUnit a TimeUnit determining how to interpret the timeout parameter
param:
handler A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.


receive

@SuppressWarnings({"MethodOverloadsMethodOfSuperclass", "TypeMayBeWeakened"})
protected final void receive(Duration duration, Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter. A null value is passed into the handler, if the timeout expires
param:
duration how long to wait before giving up, in units of unit
param:
handler A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.


receiveImpl

@Override
protected final Object receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
return:
The message retrieved from the queue.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.


receiveImpl

@Override
protected final Object receiveImpl(long timeout, TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
param:
timeout how long to wait before giving up, in units of unit
param:
units a TimeUnit determining how to interpret the timeout parameter
return:
The message retrieved from the queue, or null, if the timeout expires.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.


sweepQueue

public final List sweepQueue()
Clears the message queue returning all the messages it held.
return:
The messages stored in the queue


 

Groovy Documentation