import logging; logger = logging.getLogger("morse." + __name__)
import socket
import select
import json
import errno
from morse.core.datastream import Datastream
from morse.helpers.transformation import Transformation3d
from morse.middleware import AbstractDatastream
from morse.core import services
from morse.core.exceptions import MorseRPCInvokationError
try:
import mathutils
except ImportError:
# running outside Blender
mathutils = None
BASE_PORT = 60000
[docs]class MorseEncoder(json.JSONEncoder):
[docs] def default(self, obj):
if isinstance(obj, mathutils.Vector):
return obj[:]
if isinstance(obj, mathutils.Matrix):
return obj[:][:]
if isinstance(obj, mathutils.Quaternion):
return {'x' : obj.x, 'y': obj.y, 'z': obj.z, 'w': obj.w }
if isinstance(obj, mathutils.Euler):
return {'yaw': obj.z, 'pitch': obj.y, 'roll': obj.x }
if isinstance(obj, Transformation3d):
return {'x': obj.x, 'y': obj.y, 'z': obj.z,
'yaw': obj.yaw, 'pitch': obj.pitch, 'roll': obj.roll }
return json.JSONEncoder.default(self, obj)
[docs]class SocketServ(AbstractDatastream):
[docs] def initialize(self):
# List of socket clients
self._client_sockets = []
self._message_size = 4096
self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._server.bind(('', BASE_PORT))
self._server.listen(1)
logger.info("Socket Mw Server now listening on port " + str(BASE_PORT) + \
" for component " + str(self.component_name) + ".")
[docs] def finalize(self):
""" Terminate the ports used to accept requests """
if self._client_sockets:
logger.info("Closing client sockets...")
for s in self._client_sockets:
s.close()
if self._server:
try:
logger.info("Shutting down connections to server...")
self._server.shutdown(socket.SHUT_RDWR)
except socket.error as err_info:
# ignore exception raised on OSX for closed sockets
if err_info.errno != errno.ENOTCONN:
raise
logger.info("Closing socket server...")
self._server.close()
[docs] def close_socket(self, sock):
try:
self._client_sockets.remove(sock)
sock.close()
except socket.error as error_info:
logger.warning("Socket error catched while closing: " + str(error_info))
del self._server
except:
pass
[docs]class SocketPublisher(SocketServ):
_type_name = "straight JSON serialization"
[docs] def default(self, ci='unused'):
sockets = self._client_sockets + [self._server]
try:
inputready, outputready, _ = select.select(sockets, sockets, [], 0)
except select.error:
pass
except socket.error:
pass
if self._server in inputready:
sock, _ = self._server.accept()
self._client_sockets.append(sock)
if outputready != []:
message = self.encode()
for o in outputready:
try:
o.send(message)
except socket.error:
self.close_socket(o)
[docs] def encode(self):
js = json.dumps(self.component_instance.local_data, cls=MorseEncoder)
return (js + '\n').encode()
[docs]class SocketReader(SocketServ):
_type_name = "straight JSON deserialization"
[docs] def default(self, ci='unused'):
sockets = self._client_sockets + [self._server]
try:
inputready, outputready, exceptready = select.select(sockets, [], [], 0)
except select.error:
pass
except socket.error:
pass
for i in inputready:
if i == self._server:
sock, addr = self._server.accept()
logger.debug("New client connected to %s datastream" % self.component_name)
if self._client_sockets != []:
logger.warning("More than one client trying to write on %s datastream!!" % self.component_name)
self._client_sockets.append(sock)
else:
try:
buf = []
msg = ""
full_msg = False
while not full_msg:
msg = i.recv(self._message_size).decode()
logger.debug("received msg %s" % msg)
if not msg: # client disconnected
self.close_socket(i)
else:
buf.append(msg)
full_msg = (len(msg) != self._message_size)
if not msg.endswith('\n'):
logger.error("Malformed message on socket datastream "+\
"(no linefeed at the end): <%s>" % msg)
continue
msg = ''.join(buf).rstrip("\n").split("\n")
logger.debug("received msg %s" % msg)
if len(msg)>1:
logger.warning("Messages missed on socket datastream! <%s>" % msg[:-1])
self.component_instance.local_data = self.decode(msg[-1]) # keep only the last msg if we got several in row
except socket.error as detail:
self.close_socket(i)
[docs] def decode(self, msg):
return json.loads(msg)
[docs]class Socket(Datastream):
""" External communication using sockets. """
def __init__(self):
""" Initialize the socket connections """
# Call the constructor of the parent class
super(self.__class__, self).__init__()
# port -> MorseSocketServ
self._server_dict = {}
# component name (string) -> Port (int)
self._component_nameservice = {}
# Register two special services in the socket service manager:
# TODO To use a new special component instead of 'simulation',
# uncomment the line :-)
# blenderapi.persistantstorage().morse_services.register_request_manager_mapping("streams", "SocketRequestManager")
services.do_service_registration(self.list_streams, 'simulation')
services.do_service_registration(self.get_stream_port, 'simulation')
services.do_service_registration(self.get_all_stream_ports, 'simulation')
[docs] def list_streams(self):
""" List all publish streams.
"""
return list(self._component_nameservice.keys())
[docs] def get_stream_port(self, name):
""" Get stream port for stream name.
"""
port = -1
try:
port = self._component_nameservice[name]
except KeyError:
pass
if port < 0:
raise MorseRPCInvokationError("Stream unavailable for component %s" % name)
return port
[docs] def get_all_stream_ports(self):
""" Get stream ports for all streams.
"""
return self._component_nameservice
[docs] def register_component(self, component_name, component_instance, mw_data):
""" Open the port used to communicate by the specified component.
"""
global BASE_PORT
register_success = False
while (not register_success):
try:
# Create a socket server for this component
serv = Datastream.register_component(self, component_name,
component_instance, mw_data)
register_success = True
except socket.error as error_info:
if error_info.errno == errno.EADDRINUSE:
BASE_PORT = BASE_PORT + 1
else:
raise
self._server_dict[BASE_PORT] = serv
self._component_nameservice[component_name] = BASE_PORT
BASE_PORT = BASE_PORT + 1