Skip to content

Commit

Permalink
[socket] Implement external time synchronisation for socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Arnaud Degroote committed Apr 21, 2015
1 parent b82fbd3 commit 0e816a9
Showing 1 changed file with 53 additions and 1 deletion.
54 changes: 53 additions & 1 deletion src/morse/middleware/socket_datastream.py
Expand Up @@ -7,7 +7,7 @@
from morse.helpers.transformation import Transformation3d
from morse.middleware import AbstractDatastream
from morse.core import services
from morse.core.exceptions import MorseRPCInvokationError
from morse.core.exceptions import MorseRPCInvokationError, MorseMiddlewareError

try:
import mathutils
Expand Down Expand Up @@ -166,6 +166,16 @@ def __init__(self, args, kwargs):
# Call the constructor of the parent class
DatastreamManager.__init__(self, args, kwargs)

self.time_sync = kwargs.get('time_sync', False)
self.sync_port = kwargs.get('sync_port', -1)

if self.time_sync:
if self.sync_port == -1:
logger.error("time_sync is required, but sync_port is not configured")
raise MorseMiddlewareError("sync_port is not configured")
else:
self._init_trigger()

# port -> MorseSocketServ
self._server_dict = {}

Expand All @@ -184,6 +194,44 @@ def __init__(self, args, kwargs):
services.do_service_registration(self.get_stream_port, 'simulation')
services.do_service_registration(self.get_all_stream_ports, 'simulation')

def __del__(self):
if self.time_sync:
self._end_trigger()

def _init_trigger(self):
self._sync_client = None
self._sync_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sync_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sync_server.bind(('', self.sync_port))
self._sync_server.listen(1)
logger.info("Creating clock synchronisation on port %d" % self.sync_port)

def _wait_trigger(self):
# If there is some client, just wait on it
if self._sync_client:
logger.debug("Waiting trigger")
msg = self._sync_client.recv(2048)
if not msg: #deconnection of client
self._sync_client = None
else:
# Otherwise, we just check if there is some client waiting
# If there is no client, we do not block for the moment to avoid
# weird interaction at the startup
logger.debug("Checking for some client on synchronisation port")
try:
inputready, _, _ = select.select([self._sync_server], [], [], 0)
except select.error:
pass
except socket.error:
pass

if self._sync_server in inputready:
self._sync_client, _ = self._sync_server.accept()

def _end_trigger(self):
self._sync_client.close()
self._sync_server.shutdown(socket.SHUT_RDWR)

def list_streams(self):
""" List all publish streams.
"""
Expand Down Expand Up @@ -236,3 +284,7 @@ def register_component(self, component_name, component_instance, mw_data):
self._component_nameservice[component_name] = mw_data[2]['port']
if must_inc_base_port:
self._base_port += 1

def action(self):
if self.time_sync:
self._wait_trigger()

0 comments on commit 0e816a9

Please sign in to comment.