Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to share data between nodes and controller #769

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/xdist/dsession.py
@@ -1,17 +1,17 @@
from queue import Empty, Queue

import py
import pytest

from xdist.workermanage import NodeManager
from xdist.remote import shared_key
from xdist.scheduler import (
EachScheduling,
LoadScheduling,
LoadScopeScheduling,
LoadFileScheduling,
LoadGroupScheduling,
)


from queue import Empty, Queue
from xdist.workermanage import NodeManager


class Interrupted(KeyboardInterrupt):
Expand Down Expand Up @@ -174,6 +174,13 @@ def worker_workerfinished(self, node):
self.shouldstop = "{} received keyboard-interrupt".format(node)
self.worker_errordown(node, "keyboard-interrupt")
return
if "shared" in node.workeroutput:
shared = self.config.stash.setdefault(shared_key, {})
for key, value in node.workeroutput["shared"].items():
if key in shared:
shared[key].append(value)
else:
shared[key] = [value]
if node in self.sched.nodes:
crashitem = self.sched.remove_node(node)
assert not crashitem, (crashitem, node)
Expand Down
26 changes: 25 additions & 1 deletion src/xdist/plugin.py
Expand Up @@ -5,12 +5,14 @@

import py
import pytest

from _pytest.stash import StashKey

PYTEST_GTE_7 = hasattr(pytest, "version_tuple") and pytest.version_tuple >= (7, 0) # type: ignore[attr-defined]

_sys_path = list(sys.path) # freeze a copy of sys.path at interpreter startup

shared_key = StashKey["str"]()


@pytest.hookimpl
def pytest_xdist_auto_num_workers(config):
Expand Down Expand Up @@ -302,3 +304,25 @@ def testrun_uid(request):
return request.config.workerinput["testrunuid"]
else:
return uuid.uuid4().hex


def get_shared_data(request_or_session):
"""Return shared data and True, if it is ran from xdist_controller"""
if is_xdist_controller(request_or_session):
return request_or_session.config.stash.setdefault(shared_key, {}), True
return request_or_session.config.stash.setdefault(shared_key, {}), False


@pytest.fixture(scope="session")
def add_shared_data(request, worker_id):
"""Adds data that will be collected from all workers and be accessible from master node in sessionfinish hook"""

def _add(key, value):
shared = request.config.stash.setdefault(shared_key, {})
if worker_id == "master":
# Worker shared_data are grouped together, master data aren't
shared[key] = [value]
else:
shared[key] = value

return _add
8 changes: 6 additions & 2 deletions src/xdist/remote.py
Expand Up @@ -6,15 +6,16 @@
needs not to be installed in remote environments.
"""

import sys
import os
import sys
import time

import py
import pytest
from _pytest.config import _prepareconfig, Config
from execnet.gateway_base import dumps, DumpError

from _pytest.config import _prepareconfig, Config
from xdist.plugin import shared_key

try:
from setproctitle import setproctitle
Expand Down Expand Up @@ -64,6 +65,9 @@ def pytest_sessionstart(self, session):
def pytest_sessionfinish(self, exitstatus):
# in pytest 5.0+, exitstatus is an IntEnum object
self.config.workeroutput["exitstatus"] = int(exitstatus)
shared = self.config.stash.get(shared_key, None)
if shared:
self.config.workeroutput["shared"] = shared
yield
self.sendevent("workerfinished", workeroutput=self.config.workeroutput)

Expand Down
34 changes: 34 additions & 0 deletions testing/acceptance_test.py
Expand Up @@ -1543,3 +1543,37 @@ def test_get_xdist_worker_id(self, fake_request) -> None:
assert xdist.get_xdist_worker_id(fake_request) == "gw5"
del fake_request.config.workerinput
assert xdist.get_xdist_worker_id(fake_request) == "master"


class TestSharedData:
def test_shared_data_two_nodes(self, pytester: pytest.Pytester) -> None:
pytester.makepyfile(
"""
def test_shared1(add_shared_data):
add_shared_data("test","value2")
assert 1
def test_shared2(add_shared_data):
add_shared_data("test","value1")
assert 1

"""
)
pytester.makeconftest(
"""
from xdist.plugin import get_shared_data
def pytest_sessionfinish(session):
data, master = get_shared_data(session)
if master:
with open('shared_data', 'w') as f:
for key, values in data.items():
for value in values:
f.write('data[%s] = %s\\n' % (key,value))
"""
)
result = pytester.inline_run("-x", "-v", "-n2")
assert result.ret == 0
collected_file = pytester.path / "shared_data"
assert collected_file.is_file()
collected_text = collected_file.read_text()
assert "data[test] = value2" in collected_text
assert "data[test] = value1" in collected_text