pymorse

A Python interface to control the MORSE robotics simulator

The pymorse library exposes MORSE services and data stream with a friendly Python API.

It uses underneath the MORSE socket API.

Usage

Creating a connection to the simulator

  • Import the pymorse module
  • Create a context with the with statement:
import pymorse

with pymorse.Morse() as simu:
    # ...
    pass

The context manager will take care of properly closing the connection to the simulator. You can also directly create an instance of the pymorse.Morse class, passing the host and/or port of the simulator (defaults to localhost:4000). In this case, you must call pymorse.Morse.close() before leaving.

  • Once created, the context generates proxies for every robots in the scene, and every sensors and actuators for each robot.

First example

Let consider MORSE has been started with the following simulation script:

from morse.builder import *

# Add a robot with a position sensor and a motion controller
r2d2 = ATRV()

pose = Pose()
pose.add_interface('socket')
r2d2.append(pose)

motion = Waypoint()
motion.add_interface('socket')
r2d2.append(motion)


# Environment
env = Environment('land-1/trees')

The following Python program sends a destination to the robot, and prints in background its pose:

import time
import pymorse

def print_pos(pose):
    print("I'm currently at %s" % pose)

with pymorse.Morse() as simu:
    
    # subscribes to updates from the Pose sensor by passing a callback
    simu.r2d2.pose.subscribe(print_pos)

    # sends a destination
    simu.r2d2.motion.publish({'x' : 10.0, 'y': 5.0, 'z': 0.0, 
                              'tolerance' : 0.5, 
                              'speed' : 1.0})

    # Leave a couple of millisec to the simulator to start the action
    time.sleep(0.1)

    # waits until we reach the target
    while simu.r2d2.motion.get_status() != "Arrived":
        time.sleep(0.5)

    print("Here we are!")

Data stream manipulation

Every component (sensor or actuator) exposes a datastream interface, either read-only (for sensors) or write-only (for actuators)

They are accessible by their names, as defined in the simulation script (cf example above).

Streams are Python pymorse.Stream objects. It offers several methods to read data:

  • pymorse.Stream.get(): blocks until new data are available and returns them.
  • pymorse.Stream.last(): returns the last/the n last (if an integer argument is passed) records received, or none/less, if none/less have been received.
  • pymorse.Stream.subscribe() (and pymorse.Stream.unsubscribe()): this method is called with a callback that is triggered everytime incoming data is received.

These methods are demonstrated in the example below:

import time
import pymorse

def printer(data):
    print("Incoming data! " + str(data))

with pymorse.Morse("localhost", 4000) as simu:

    try:
        # Get the 'Pose' sensor datastream
        pose = simu.r2d2.pose

        # Blocks until something is available
        print(pose.get())

        # Asynchronous read: the following line do not block.
        pose.subscribe(printer)

        # Read for 10 sec
        time.sleep(10)

    except pymorse.MorseServerError as mse:
        print('Oups! An error occured!')
        print(mse)

Writing on actuator’s datastreams is achieved with the pymorse.Stream.publish() method, as illustrated in the first example above.

The data format is always formatted as a JSON dictionary (which means that, currently, binary data like images are not supported). The documentation page of each component specify the exact content of the dictionary.

Services

Some components export RPC services. Please refer to the components’ documentation for details.

These services can be accessed from pymorse, and mostly look like regular methods:

import pymorse

with pymorse.Morse() as simu:

    # Invokes the get_status service from the 'Waypoints' actuator
    print(simu.r2d2.motion.get_status())

However, these call are asynchronous: a call to simu.r2d2.motion.get_status() does not block, and returns instead a future. See the concurrent.futures API to learn more about futures.

Non-blocking call are useful for long lasting services, like in the example below:

import time
import pymorse

def done(evt):
    print("We have reached our destination!")

with pymorse.Morse() as simu:

    # Start the motion. It may take several seconds before finishing
    # The line below is however non-blocking
    goto_action = simu.r2d2.motion.goto(2.5, 0, 0)

    # Register a callback to know when the action is done
    goto_action.add_done_callback(done)

    print("Am I currently moving? %s" % goto_action.running())

    while goto_action.running():
        time.sleep(0.5)

Use the cancel method on the future returned by the RPC call to abort the service.

To actually wait for a result, call the result method on the future:

import pymorse

with pymorse.Morse() as simu:

    status = simu.r2d2.motion.get_status().result()

    if status == "Arrived":
        print("Here we are")

However, for certain common cases (printing or comparing the return value), the result() method is automatically called. Thus, the previous code sample can be rewritten:

import pymorse

with pymorse.Morse() as simu:

    if simu.r2d2.motion.get_status() == "Arrived":
        print("Here we are")

Simulator control

Several services are available to control the overall behaviour of the simulator.

The whole list of such services is here: Supervision services.

For instance, you can stop the simulator (MORSE will quit) with pymorse.quit(), and reset it with pymorse.Morse.reset().

These methods are demonstrated in the example below:

import pymorse

with pymorse.Morse("localhost", 4000) as simu:

    try:
        print(simu.robots)
        simu.quit()

    except pymorse.MorseServerError as mse:
        print('Oups! An error occured!')
        print(mse)

Logging

This library use the standard Python logging mechanism. You can retrieve pymorse log messages through the “pymorse” logger.

The complete example below shows how to retrieve the logger and how to configure it to print debug messages on the console.

import logging
import pymorse

logger = logging.getLogger("pymorse")

console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)-15s %(name)s: %(levelname)s - %(message)s')
console.setFormatter(formatter)

logger.addHandler(console)

with pymorse.Morse("localhost", 4000) as simu:

    try:
        print(simu.robots)

        simu.quit()

    except pymorse.MorseServerError as mse:
        print('Oups! An error occured!')
        print(mse)
class ComChannel(host)[source]

Bases: threading.Thread

close()[source]
connect(port, read_queue, write_queue, buf=None, cb=None)[source]
Parameters:
  • port – the socket port to connect to
  • read_queue – the content read from the socket is pushed to this queue. Must be a queue.queue.
  • write_queue – content to be sent to the simulator should be pushed to this queue. Must be a queue.queue.
  • buf – (optional, default: None) if provided, content stored in the read_queue is copied as well in the buffer. Must support ‘appendleft()’. Attention: if ‘buf’ is provided, the connection is considered as a stream connection. Else as a RPC connection (using MORSE RPC protocol to parse the response).
  • cb – (optional, default: None) a callback, invoked when content is read on the socket.
id = 0
parse_response(raw)[source]
process()[source]

make sure the write requests are processed by explicitely waking up select.

run()[source]

This method reads and writes to/from the simulator. When a new request is pushed in _morse_requests_queue, it is send to the server, when the server answer smthg, we check if it’s a normal answer or an event, and dispatch accordingly the answer’s content.

class Component(morse, name, fqn, stream=None, port=None, services=[])[source]

Bases: builtins.object

close()[source]
class Morse(host='localhost', service_port=4000)[source]

Bases: builtins.object

cancel(id)[source]

Send a cancelation request for an existing (running) service.

If the service is not running or does not exist, the request is ignored.

close(cancel_async_services=False)[source]
get_stream_port(stream)[source]
id = 0
initapi()[source]

This method asks MORSE for the scene structure, and dynamically creates corresponding objects in ‘self’.

quit()[source]
reset()[source]
rpc(component, service, *args)[source]

Calls a service from the simulator.

The call will block until a response from the simulator is received.

Parameters:
  • component – the component that expose the service (like a robot name)
  • service – the name of the service
  • args... – (variadic) each service parameter, as a separate argument
streams()[source]
exception MorseServerError(value)[source]

Bases: builtins.Exception

exception MorseServiceFailed(value)[source]

Bases: builtins.Exception

exception MorseServicePreempted(value)[source]

Bases: builtins.Exception

class NullHandler(level=0)[source]

Bases: logging.Handler

Defines a NullHandler for logging, in case pymorse is used in an application that doesn’t use logging.

emit(record)[source]
class Robot[source]

Bases: builtins.dict

Table Of Contents

Previous topic

HLA-based multi-node simulation

Next topic

Tutorials

This Page