Source code for morse.middleware.socket_datastream

import logging; logger = logging.getLogger("morse." + __name__)
import socket
import select
import json
import errno
from morse.core.datastream import DatastreamManager
from morse.helpers.transformation import Transformation3d
from morse.middleware import AbstractDatastream
from morse.core import services
from morse.core.exceptions import MorseRPCInvokationError, MorseMiddlewareError

try:
    import mathutils
except ImportError:
    # running outside Blender
    mathutils = None

[docs]class MorseEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, mathutils.Vector): return obj[:] if isinstance(obj, mathutils.Matrix): # obj[:][:] gives list(mathutils.Vector) return [list(vec) for vec in obj] if isinstance(obj, mathutils.Quaternion): return {'x' : obj.x, 'y': obj.y, 'z': obj.z, 'w': obj.w } if isinstance(obj, mathutils.Euler): return {'yaw': obj.z, 'pitch': obj.y, 'roll': obj.x } if isinstance(obj, Transformation3d): return {'x': obj.x, 'y': obj.y, 'z': obj.z, 'yaw': obj.yaw, 'pitch': obj.pitch, 'roll': obj.roll } return json.JSONEncoder.default(self, obj)
[docs]class SocketServ(AbstractDatastream):
[docs] def initialize(self): # List of socket clients self._client_sockets = [] self._message_size = 4096 self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._server.bind(('', self.kwargs['port'])) self._server.listen(1) logger.info("Socket Mw Server now listening on port " + str(self.kwargs['port']) + \ " for component " + str(self.component_name) + ".")
[docs] def finalize(self): """ Terminate the ports used to accept requests """ if self._client_sockets: logger.info("Closing client sockets...") for s in self._client_sockets: s.close() if self._server: try: logger.info("Shutting down connections to server...") self._server.shutdown(socket.SHUT_RDWR) except socket.error as err_info: # ignore exception raised on OSX for closed sockets if err_info.errno != errno.ENOTCONN: raise logger.info("Closing socket server...") self._server.close()
[docs] def close_socket(self, sock): try: self._client_sockets.remove(sock) sock.close() except socket.error as error_info: logger.warning("Socket error catched while closing: " + str(error_info)) del self._server except: pass
[docs]class SocketPublisher(SocketServ): _type_name = "straight JSON serialization"
[docs] def default(self, ci='unused'): sockets = self._client_sockets + [self._server] try: inputready, outputready, _ = select.select(sockets, sockets, [], 0) except select.error: pass except socket.error: pass if self._server in inputready: sock, _ = self._server.accept() self._client_sockets.append(sock) if outputready: message = self.encode() for o in outputready: try: o.send(message) except socket.error: self.close_socket(o)
[docs] def encode(self): js = json.dumps(self.component_instance.local_data, cls=MorseEncoder) return (js + '\n').encode()
[docs]class SocketReader(SocketServ): _type_name = "straight JSON deserialization"
[docs] def default(self, ci='unused'): sockets = self._client_sockets + [self._server] try: inputready, outputready, exceptready = select.select(sockets, [], [], 0) except select.error: pass except socket.error: pass got_new_information = False for i in inputready: if i == self._server: sock, addr = self._server.accept() logger.debug("New client connected to %s datastream" % self.component_name) if self._client_sockets: logger.warning("More than one client trying to write on %s datastream!!" % self.component_name) self._client_sockets.append(sock) else: try: buf = [] msg = "" full_msg = False while not full_msg: msg = i.recv(self._message_size).decode() logger.debug("received msg %s" % msg) if not msg: # client disconnected self.close_socket(i) else: buf.append(msg) full_msg = (len(msg) != self._message_size) if not msg.endswith('\n'): logger.error("Malformed message on socket datastream "+\ "(no linefeed at the end): <%s>" % msg) continue msg = ''.join(buf).rstrip("\n").split("\n") logger.debug("received msg %s" % msg) if len(msg)>1: logger.warning("Messages missed on socket datastream! <%s>" % msg[:-1]) self.component_instance.local_data = self.decode(msg[-1]) # keep only the last msg if we got several in row got_new_information = True except socket.error as detail: self.close_socket(i) return got_new_information
[docs] def decode(self, msg): return json.loads(msg)
[docs]class SocketDatastreamManager(DatastreamManager): """ External communication using sockets. """ def __init__(self, args, kwargs): """ Initialize the socket connections """ # Call the constructor of the parent class DatastreamManager.__init__(self, args, kwargs) self.time_sync = kwargs.get('time_sync', False) self.sync_port = kwargs.get('sync_port', -1) if self.time_sync: if self.sync_port == -1: logger.error("time_sync is required, but sync_port is not configured") raise MorseMiddlewareError("sync_port is not configured") else: self._init_trigger() # port -> MorseSocketServ self._server_dict = {} # component name (string) -> Port (int) self._component_nameservice = {} # Base port self._base_port = 60000 # Register two special services in the socket service manager: # TODO To use a new special component instead of 'simulation', # uncomment the line :-) # blenderapi.persistantstorage().morse_services.register_request_manager_mapping("streams", "SocketRequestManager") services.do_service_registration(self.list_streams, 'simulation') services.do_service_registration(self.get_stream_port, 'simulation') services.do_service_registration(self.get_all_stream_ports, 'simulation') def __del__(self): if self.time_sync: self._end_trigger() def _init_trigger(self): self._sync_client = None self._sync_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sync_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sync_server.bind(('', self.sync_port)) self._sync_server.listen(1) logger.info("Creating clock synchronisation on port %d" % self.sync_port) def _wait_trigger(self): # If there is some client, just wait on it if self._sync_client: logger.debug("Waiting trigger") msg = self._sync_client.recv(2048) if not msg: #deconnection of client self._sync_client = None else: # Otherwise, we just check if there is some client waiting # If there is no client, we do not block for the moment to avoid # weird interaction at the startup logger.debug("Checking for some client on synchronisation port") try: inputready, _, _ = select.select([self._sync_server], [], [], 0) except select.error: pass except socket.error: pass if self._sync_server in inputready: self._sync_client, _ = self._sync_server.accept() def _end_trigger(self): self._sync_client.close() self._sync_server.shutdown(socket.SHUT_RDWR)
[docs] def list_streams(self): """ List all publish streams. """ return list(self._component_nameservice.keys())
[docs] def get_stream_port(self, name): """ Get stream port for stream name. """ port = -1 try: port = self._component_nameservice[name] except KeyError: pass if port < 0: raise MorseRPCInvokationError("Stream unavailable for component %s" % name) return port
[docs] def get_all_stream_ports(self): """ Get stream ports for all streams. """ return self._component_nameservice
[docs] def register_component(self, component_name, component_instance, mw_data): """ Open the port used to communicate by the specified component. """ register_success = False must_inc_base_port = False if not 'port' in mw_data[2]: must_inc_base_port = True mw_data[2]['port'] = self._base_port while not register_success: try: # Create a socket server for this component serv = DatastreamManager.register_component(self, component_name, component_instance, mw_data) register_success = True except socket.error as error_info: if error_info.errno == errno.EADDRINUSE: mw_data[2]['port'] += 1 if must_inc_base_port: self._base_port += 1 else: raise self._server_dict[mw_data[2]['port']] = serv self._component_nameservice[component_name] = mw_data[2]['port'] if must_inc_base_port: self._base_port += 1
[docs] def action(self): if self.time_sync: self._wait_trigger()