diff --git a/testing/base/CMakeLists.txt b/testing/base/CMakeLists.txt index b8f9cf86e..ec77784a4 100644 --- a/testing/base/CMakeLists.txt +++ b/testing/base/CMakeLists.txt @@ -59,3 +59,5 @@ add_morse_test(imu_noise_testing) # Services add_morse_test(communication_service_testing) + +add_morse_test(socket_sync_testing) diff --git a/testing/base/socket_sync_testing.py b/testing/base/socket_sync_testing.py new file mode 100755 index 000000000..d65067958 --- /dev/null +++ b/testing/base/socket_sync_testing.py @@ -0,0 +1,75 @@ +#! /usr/bin/env python +""" +This script tests the 'data stream' oriented feature of the socket interface. +""" + +from morse.testing.testing import MorseTestCase + +try: + # Include this import to be able to use your test file as a regular + # builder script, ie, usable with: 'morse [run|exec] .py + from morse.builder import * +except ImportError: + pass + +import sys +import time +import socket +from pymorse import Morse + + +class SocketSyncTest(MorseTestCase): + + def setUpEnv(self): + bpymorse.set_speed(fps=10) + + robot = Morsy() + + clock = Clock() + clock.add_stream('socket') + robot.append(clock) + + env = Environment('empty') + env.configure_stream_manager('socket', time_sync = True, sync_port = 5000) + + def test_read_clock(self): + with Morse() as morse: + clock_stream = morse.robot.clock + + time.sleep(0.5) + + prev_clock = clock_stream.last() + time.sleep(0.2) + clock = clock_stream.last() + + self.assertTrue(clock['timestamp'] > prev_clock['timestamp']) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync: + sync.connect(('localhost', 5000)) + + # Now the clock is blocked until we triggered it. + # Checking it :) + time.sleep(0.2) + prev_clock = clock_stream.last() + time.sleep(0.2) + clock = clock_stream.last() + self.assertTrue(clock['timestamp'] == prev_clock['timestamp']) + + # triggering once + sync.send(bytes('foo', 'utf-8')) + clock = clock_stream.get() + self.assertAlmostEqual(clock['timestamp'] - prev_clock['timestamp'], 0.1, delta = 0.0001) + + # So cool, isn't it :) + # Close the socket, no more control + + prev_clock = clock_stream.last() + time.sleep(0.2) + clock = clock_stream.last() + + self.assertTrue(clock['timestamp'] > prev_clock['timestamp']) + +########################## Run these tests ########################## +if __name__ == "__main__": + from morse.testing.testing import main + main(SocketSyncTest, time_modes = [TimeStrategies.FixedSimulationStep])