Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI: Test when running with pytest-xdist, pytest -n 8 has duplicated event emitted. #262

Open
eddiebergman opened this issue Feb 22, 2024 · 0 comments
Labels
bug Something isn't working ci

Comments

@eddiebergman
Copy link
Contributor

eddiebergman commented Feb 22, 2024

While tests run fine if running in just a single worker of with -n 4, it seems that when boosting this further to 8 cores, the scheduler emitted an extra event of EMPTY.

Looking at the async task the monitors for empty, it is possible there's a race condition causing this to trigger twice. This should be handled properly although it's hard to create a reproducing example. In general, how can we deterministically create race conditions for testing?

async def _monitor_queue_empty(self) -> None:
"""Monitor for the queue being empty and trigger an event when it is."""
if not self.running():
raise RuntimeError("The scheduler is not running!")
while True:
while self.queue:
queue = list(self.queue)
await asyncio.wait(queue, return_when=asyncio.ALL_COMPLETED)
# Signal that the queue is now empty
self._queue_has_items_event.clear()
self.on_empty.emit()
# Wait for an item to be in the queue
await self._queue_has_items_event.wait()
logger.debug("Queue has been filled again")


Error, traceback of test

test_queue_empty_status[loky]

    def test_queue_empty_status(scheduler: Scheduler) -> None:
        task = scheduler.task(sleep_and_return)
    
        # Reload on the first empty
        @scheduler.on_empty(when=lambda: scheduler.event_counts[scheduler.EMPTY] == 1)
        def launch_first() -> None:
            task.submit(sleep_time=0.1)
    
        # Stop on the second empty
        @scheduler.on_empty(when=lambda: scheduler.event_counts[scheduler.EMPTY] == 2)
        def stop_scheduler() -> None:
            scheduler.stop()
    
        end_status = scheduler.run(timeout=3, end_on_empty=False)
    
        assert task.event_counts == Counter(
            {task.SUBMITTED: 1, task.DONE: 1, task.RESULT: 1},
        )
    
>       assert scheduler.event_counts == Counter(
            {
                scheduler.STARTED: 1,
                scheduler.FINISHING: 1,
                scheduler.FINISHED: 1,
                scheduler.EMPTY: 2,
                scheduler.STOP: 1,
                scheduler.FUTURE_SUBMITTED: 1,
                scheduler.FUTURE_DONE: 1,
                scheduler.FUTURE_RESULT: 1,
            },
        )
E       AssertionError: assert Counter({Event(name='on_empty'): 1, Event(name='on_start'): 1, Event(name='on_future_submitted'): 1, Event(name='on_timeout'): 1, Event(name='on_finishing'): 1, Event(name='on_finished'): 1, Event(name='on_future_done'): 1, Event(name='on_future_result'): 1}) == Counter({Event(name='on_empty'): 2, Event(name='on_start'): 1, Event(name='on_finishing'): 1, Event(name='on_finished'): 1, Event(name='on_stop'): 1, Event(name='on_future_submitted'): 1, Event(name='on_future_done'): 1, Event(name='on_future_result'): 1})
E         Common items:
E         {Event(name='on_start'): 1,
E          Event(name='on_finishing'): 1,
E          Event(name='on_finished'): 1,
E          Event(name='on_future_submitted'): 1,
E          Event(name='on_future_done'): 1,
E          Event(name='on_future_result'): 1}
E         Differing items:
E         {Event(name='on_empty'): 1} != {Event(name='on_empty'): 2}
E         Left contains 1 more item:
E         {Event(name='on_timeout'): 1}
E         Right contains 1 more item:
E         {Event(name='on_stop'): 1}
E         Full diff:
E         - Counter({Event(name='on_empty'): 2,
E         ?                                  ^
E         + Counter({Event(name='on_empty'): 1,
E         ?                                  ^
E                    Event(name='on_start'): 1,
E         +          Event(name='on_future_submitted'): 1,
E         +          Event(name='on_timeout'): 1,
E                    Event(name='on_finishing'): 1,
E                    Event(name='on_finished'): 1,
E         -          Event(name='on_stop'): 1,
E         -          Event(name='on_future_submitted'): 1,
E                    Event(name='on_future_done'): 1,
E                    Event(name='on_future_result'): 1},
E           )

tests/scheduling/test_scheduler.py:241: AssertionError
@eddiebergman eddiebergman added bug Something isn't working ci labels Feb 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working ci
Projects
None yet
Development

No branches or pull requests

1 participant