A Python interface to control the MORSE robotics simulator
The pymorse library exposes MORSE services and data stream with a friendly Python API.
It uses underneath the MORSE socket API.
import pymorse
with pymorse.Morse() as simu:
# ...
pass
The context manager will take care of properly closing the connection to the simulator. You can also directly create an instance of the pymorse.Morse class, passing the host and/or port of the simulator (defaults to localhost:4000). In this case, you must call pymorse.Morse.close() before leaving.
Let consider MORSE has been started with the following simulation script:
from morse.builder import *
# Add a robot with a position sensor and a motion controller
r2d2 = ATRV()
pose = Pose()
pose.add_interface('socket')
r2d2.append(pose)
motion = Waypoint()
motion.add_interface('socket')
r2d2.append(motion)
# Environment
env = Environment('land-1/trees')
The following Python program sends a destination to the robot, and prints in background its pose:
import time
import pymorse
def print_pos(pose):
print("I'm currently at %s" % pose)
with pymorse.Morse() as simu:
# subscribes to updates from the Pose sensor by passing a callback
simu.r2d2.pose.subscribe(print_pos)
# sends a destination
simu.r2d2.motion.publish({'x' : 10.0, 'y': 5.0, 'z': 0.0,
'tolerance' : 0.5,
'speed' : 1.0})
# Leave a couple of millisec to the simulator to start the action
time.sleep(0.1)
# waits until we reach the target
while simu.r2d2.motion.get_status() != "Arrived":
time.sleep(0.5)
print("Here we are!")
Every component (sensor or actuator) exposes a datastream interface, either read-only (for sensors) or write-only (for actuators)
They are accessible by their names, as defined in the simulation script (cf example above).
Streams are Python pymorse.Stream objects. It offers several methods to read data:
These methods are demonstrated in the example below:
import time
import pymorse
def printer(data):
print("Incoming data! " + str(data))
with pymorse.Morse("localhost", 4000) as simu:
try:
# Get the 'Pose' sensor datastream
pose = simu.r2d2.pose
# Blocks until something is available
print(pose.get())
# Asynchronous read: the following line do not block.
pose.subscribe(printer)
# Read for 10 sec
time.sleep(10)
except pymorse.MorseServerError as mse:
print('Oups! An error occured!')
print(mse)
Writing on actuator’s datastreams is achieved with the pymorse.Stream.publish() method, as illustrated in the first example above.
The data format is always formatted as a JSON dictionary (which means that, currently, binary data like images are not supported). The documentation page of each component specify the exact content of the dictionary.
Some components export RPC services. Please refer to the components’ documentation for details.
These services can be accessed from pymorse, and mostly look like regular methods:
import pymorse
with pymorse.Morse() as simu:
# Invokes the get_status service from the 'Waypoints' actuator
print(simu.r2d2.motion.get_status())
However, these call are asynchronous: a call to simu.r2d2.motion.get_status() does not block, and returns instead a future. See the concurrent.futures API to learn more about futures.
Non-blocking call are useful for long lasting services, like in the example below:
import time
import pymorse
def done(evt):
print("We have reached our destination!")
with pymorse.Morse() as simu:
# Start the motion. It may take several seconds before finishing
# The line below is however non-blocking
goto_action = simu.r2d2.motion.goto(2.5, 0, 0)
# Register a callback to know when the action is done
goto_action.add_done_callback(done)
print("Am I currently moving? %s" % goto_action.running())
while goto_action.running():
time.sleep(0.5)
Use the cancel method on the future returned by the RPC call to abort the service.
To actually wait for a result, call the result method on the future:
import pymorse
with pymorse.Morse() as simu:
status = simu.r2d2.motion.get_status().result()
if status == "Arrived":
print("Here we are")
However, for certain common cases (printing or comparing the return value), the result() method is automatically called. Thus, the previous code sample can be rewritten:
import pymorse
with pymorse.Morse() as simu:
if simu.r2d2.motion.get_status() == "Arrived":
print("Here we are")
Several services are available to control the overall behaviour of the simulator.
The whole list of such services is here: Supervision services.
For instance, you can stop the simulator (MORSE will quit) with pymorse.quit(), and reset it with pymorse.Morse.reset().
These methods are demonstrated in the example below:
import pymorse
with pymorse.Morse("localhost", 4000) as simu:
try:
print(simu.robots)
simu.quit()
except pymorse.MorseServerError as mse:
print('Oups! An error occured!')
print(mse)
This library use the standard Python logging mechanism. You can retrieve pymorse log messages through the “pymorse” logger.
The complete example below shows how to retrieve the logger and how to configure it to print debug messages on the console.
import logging
import pymorse
logger = logging.getLogger("pymorse")
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)-15s %(name)s: %(levelname)s - %(message)s')
console.setFormatter(formatter)
logger.addHandler(console)
with pymorse.Morse("localhost", 4000) as simu:
try:
print(simu.robots)
simu.quit()
except pymorse.MorseServerError as mse:
print('Oups! An error occured!')
print(mse)
Bases: threading.Thread
Parameters: |
|
---|
Bases: builtins.object
Bases: builtins.object
Send a cancelation request for an existing (running) service.
If the service is not running or does not exist, the request is ignored.
This method asks MORSE for the scene structure, and dynamically creates corresponding objects in ‘self’.
Calls a service from the simulator.
The call will block until a response from the simulator is received.
Parameters: |
|
---|