Source code for morse.middleware.hla_datastream

import logging; logger = logging.getLogger("morse." + __name__)
import hla.rti as rti
import sys

from morse.core.datastream import DatastreamManager
from morse.core import blenderapi

[docs]class MorseBaseAmbassador(rti.FederateAmbassador): def __init__(self, rtia, federation, time_sync, timestep): self._rtia = rtia self.federation = federation self._time_sync = time_sync self.synchronisation_points = {} self.registred_objects = {} # name -> obj_handle self.registred_class_ref = {} # obj_handle -> int self._object_handles = {} # string -> obj_handle self._attributes_handles = {} # (obj_handle, string) -> attr_handle self._attributes_subscribed = {} # obj_name -> [attr_handle] self._attributes_values = {} # obj_name -> { attr_handle -> value } self.timestep = timestep
[docs] def initialize_time_regulation(self): self.logical_time = self._rtia.queryFederateTime() logger.debug("federation %s time %f timestep %f" % (self.federation, self.logical_time, self.timestep)) self.constraint_enabled = False self.regulator_enabled = False self.granted = False self.lookahead = self.timestep self._rtia.enableTimeConstrained() self._rtia.enableTimeRegulation(self.logical_time, self.timestep) while not (self.constraint_enabled and self.regulator_enabled): self._rtia.tick(0, self.lookahead)
[docs] def advance_time(self): if self._time_sync: self.granted = False self._rtia.timeAdvanceRequest(self.logical_time + self.timestep) while not self.granted: self._rtia.tick(0, self.lookahead) else: self._rtia.tick()
[docs] def register_sync_point(self, label): self._rtia.registerFederationSynchronizationPoint(label, "Waiting for other simulators")
[docs] def wait_until_sync(self, label): # Make sure that we receive the announce sync point while not label in self.synchronisation_points: self._rtia.tick() self._rtia.synchronizationPointAchieved(label) while not self.synchronisation_points[label]: self._rtia.tick()
[docs] def register_object(self, handle, name): obj = self._rtia.registerObjectInstance(handle, name) self.registred_objects[name] = obj return obj
[docs] def delete_object(self, name): self._rtia.deleteObjectInstance( self.registred_objects[name], name) del self.registred_objects[name]
[docs] def object_handle(self, name): handle = self._object_handles.get(name, None) if not handle: handle = self._rtia.getObjectClassHandle(name) self._object_handles[name] = handle return handle
[docs] def attribute_handle(self, name, obj_handle): handle = self._attributes_handles.get((obj_handle, name), None) if not handle: handle = self._rtia.getAttributeHandle(name, obj_handle) self._attributes_handles[(obj_handle, name)] = handle return handle
[docs] def suscribe_attributes(self, name, obj_handle, attr_handles): logger.debug("suscribe_attributes %s %s => %s" % (name, obj_handle, attr_handles)) curr_tracked_attr = set(self._attributes_subscribed.get(name, [])) res = list(curr_tracked_attr.union(attr_handles)) self._attributes_subscribed[name] = res self._rtia.subscribeObjectClassAttributes(obj_handle, res) ref_cnt = self.registred_class_ref.get(obj_handle, 0) self.registred_class_ref[obj_handle] = ref_cnt + 1 logger.debug("registred_class_ref %s => %d" % (obj_handle, ref_cnt + 1))
[docs] def unsuscribe_attributes(self, obj_handle): logger.debug("unsuscribe_attributes %s" % (obj_handle)) if not obj_handle in self.registred_class_ref: return self.registred_class_ref[obj_handle] -= 1 logger.debug("registred_class_ref %s => %d" % (obj_handle, self.registred_class_ref[obj_handle])) if self.registred_class_ref[obj_handle] == 0: self._rtia.unsubscribeObjectClass(obj_handle) del self.registred_class_ref[obj_handle]
[docs] def get_attributes(self, obj_name): return self._attributes_values.get(obj_name, None)
[docs] def update_attribute(self, obj_handle, value): if self._time_sync: self._rtia.updateAttributeValues(obj_handle, value, "morse_update", self.logical_time + self.timestep) else: self._rtia.updateAttributeValues(obj_handle, value, "morse_update") # Callbacks for FedereteAmbassadors
[docs] def discoverObjectInstance(self, obj, objectclass, name): logger.debug("DISCOVER %s %s %s" % (name, obj, objectclass)) subscribed_attributes = self._attributes_subscribed.get(name, None) if subscribed_attributes: self._rtia.requestObjectAttributeValueUpdate(obj, subscribed_attributes) default_value = {} for attr in subscribed_attributes: default_value[attr] = None self._attributes_values[name] = default_value
[docs] def reflectAttributeValues(self, obj, attributes, tag, order, transport, time=None, retraction=None): obj_name = self._rtia.getObjectInstanceName(obj) logger.debug("reflectAttributeValues for %s %s" % (obj_name, attributes)) attr_entry = self._attributes_values.get(obj_name, None) if not attr_entry: return for key in attr_entry.keys(): if key in attributes: attr_entry[key] = attributes[key]
[docs] def timeConstrainedEnabled(self, time): logger.debug("Constrained at time %f" % time) self.logical_time = time self.constraint_enabled = True
[docs] def timeRegulationEnabled(self, time): logger.debug("Regulator at time %f" % time) self.logical_time = time self.regulator_enabled = True
[docs] def timeAdvanceGrant(self, time): logger.debug("time Advance granted %f" % time) self.logical_time = time self.granted = True
[docs] def announceSynchronizationPoint(self, label, tag): self.synchronisation_points[label] = False
[docs] def federationSynchronized(self, label): self.synchronisation_points[label] = True
[docs]class HLABaseNode: def __init__(self, klass, fom, node_name, federation, sync_point, sync_register, time_sync, timestep): """ Initializes HLA (connection to RTIg, FOM file, publish robots...) """ logger.info("Initializing HLA node.") self._federation = federation try: logger.debug("Creating RTIA...") self.rtia = rti.RTIAmbassador() logger.debug("RTIA created!") try: self.rtia.createFederationExecution(federation, fom) logger.info("%s federation created", federation) except rti.FederationExecutionAlreadyExists: logger.debug("%s federation already exists", federation) except rti.CouldNotOpenFED: logger.error("FED file not found! " + \ "Please check that the '.fed' file is in the CERTI " + \ "search path of RTIg.") raise except rti.ErrorReadingFED: logger.error("Error when reading FED file! " + \ "Please check the '.fed' file syntax.") raise logger.debug("Creating MorseAmbassador...") self.morse_ambassador = klass(self.rtia, federation, time_sync, timestep) try: self.rtia.joinFederationExecution(node_name, federation, self.morse_ambassador) except rti.FederateAlreadyExecutionMember: logger.error("A Federate with name %s has already registered."+\ " Change the name of your federate or " + \ "check your federation architecture.", self.node_name) raise except rti.CouldNotOpenFED: logger.error("FED file not found! Please check that the " + \ "'.fed' file is in the CERTI search path.") raise except rti.ErrorReadingFED: logger.error("Error when reading FED file! "+ \ "Please check the '.fed' file syntax.") raise logger.info("HLA middleware initialized.") except Exception as error: logger.error("Error when connecting to the RTIg: %s." + \ "Please check your HLA network configuration.", error) raise if sync_point: if sync_register: self.morse_ambassador.register_sync_point(sync_point) print("Press ENTER when all simulators are ready") sys.stdin.read(1) self.morse_ambassador.wait_until_sync(sync_point) if time_sync: self.morse_ambassador.initialize_time_regulation()
[docs] def finalize(self): """ Close all open HLA connections. """ logger.info("Resigning from the HLA federation") if self.morse_ambassador: del self.morse_ambassador self.rtia.resignFederationExecution( rti.ResignAction.DeleteObjectsAndReleaseAttributes) try: self.rtia.destroyFederationExecution(self._federation) except: pass del self.rtia
[docs]class HLADatastreamManager(DatastreamManager): """ External communication using sockets. """ def __init__(self, args, kwargs): """ Initialize the socket connections """ # Call the constructor of the parent class DatastreamManager.__init__(self, args, kwargs) try: fom = kwargs["fom"] node_name = kwargs["name"] federation = kwargs["federation"] sync_point = kwargs.get("sync_point", None) sync_register = kwargs.get("sync_register", False) time_sync = kwargs.get("time_sync", False) timestep = kwargs.get("timestep", 1.0 / blenderapi.getfrequency()) self.node = HLABaseNode(MorseBaseAmbassador, fom, node_name, federation, sync_point, sync_register, time_sync, timestep) except KeyError as error: logger.error("One of [fom, name, federation] attribute is not configured: " "Cannot create HLADatastreamManager") raise
[docs] def finalize(self): DatastreamManager.finalize(self) self.node.finalize()
[docs] def register_component(self, component_name, component_instance, mw_data): """ Open the port used to communicate by the specified component. """ mw_data[2]['__hla_node'] = self.node DatastreamManager.register_component(self, component_name, component_instance, mw_data)
[docs] def action(self): self.node.morse_ambassador.advance_time()