Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experiment with creating a multiprocessing backend #626

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/xdist/backends.py
@@ -0,0 +1,87 @@
import re
import fnmatch
import os
import py # TODO remove
import pytest


def parse_spec_config(config):
xspeclist = []
for xspec in config.getvalue("tx"):
i = xspec.find("*")
try:
num = int(xspec[:i])
except ValueError:
xspeclist.append(xspec)
else:
xspeclist.extend([xspec[i + 1 :]] * num)
if not xspeclist:
raise pytest.UsageError(
"MISSING test execution (tx) nodes: please specify --tx"
)
return xspeclist


class ExecnetNodeControl:
@classmethod
def from_config(cls, config, specs, defaultchdir):
final_specs = []

import execnet

group = execnet.Group()
if specs is None:
specs = [execnet.XSpec(x) for x in parse_spec_config(config)]
for spec in specs:
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
group.allocate_id(spec)
final_specs.append(spec)

return cls(group, final_specs)

def __init__(self, group, specs):
self.group = group
self.specs = specs

@staticmethod
def get_rsync(source, verbose=False, ignores=None):
import execnet

# todo: cache the class
class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""

def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores = kwargs.pop("ignores", None) or []
self._ignores = [
re.compile(fnmatch.translate(getattr(x, "strpath", x)))
for x in ignores
]
super().__init__(sourcedir=sourcedir, **kwargs)

def filter(self, path):
path = py.path.local(path)
for cre in self._ignores:
if cre.match(path.basename) or cre.match(path.strpath):
return False
else:
return True

def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super().add_target(
gateway, remotepath, finishedcallback=finished, delete=True
)

def _report_send_file(self, gateway, modified_rel_path):
if self._verbose > 0:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.chdir
print("{}:{} <= {}".format(gateway.spec, remotepath, path))

return HostRSync(source, verbose=verbose, ignores=ignores)
2 changes: 1 addition & 1 deletion src/xdist/dsession.py
Expand Up @@ -308,7 +308,7 @@ def _clone_node(self, node):
"""
spec = node.gateway.spec
spec.id = None
self.nodemanager.group.allocate_id(spec)
self.nodemanager._execnet.group.allocate_id(spec)
node = self.nodemanager.setup_node(spec, self.queue.put)
self._active_nodes.add(node)
return node
Expand Down
3 changes: 2 additions & 1 deletion src/xdist/looponfail.py
Expand Up @@ -10,7 +10,6 @@
import pytest
import sys
import time
import execnet


def pytest_addoption(parser):
Expand Down Expand Up @@ -65,6 +64,8 @@ def trace(self, *args):
print("RemoteControl:", msg)

def initgateway(self):
import execnet

return execnet.makegateway("popen")

def setup(self, out=None):
Expand Down
69 changes: 14 additions & 55 deletions src/xdist/workermanage.py
@@ -1,14 +1,12 @@
import fnmatch
import os
import re
import sys
import uuid

import py
import pytest
import execnet

import xdist.remote
from .backends import ExecnetNodeControl


def parse_spec_config(config):
Expand Down Expand Up @@ -38,17 +36,9 @@ def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"):
self.testrunuid = self.config.getoption("testrunuid")
if self.testrunuid is None:
self.testrunuid = uuid.uuid4().hex
self.group = execnet.Group()
if specs is None:
specs = self._getxspecs()
self.specs = []
for spec in specs:
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.group.allocate_id(spec)
self.specs.append(spec)

self._execnet = ExecnetNodeControl.from_config(config, specs, defaultchdir)

self.roots = self._getrsyncdirs()
self.rsyncoptions = self._getrsyncoptions()
self._rsynced_specs = set()
Expand All @@ -60,12 +50,14 @@ def rsync_roots(self, gateway):
self.rsync(gateway, root, **self.rsyncoptions)

def setup_nodes(self, putevent):
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.config.hook.pytest_xdist_setupnodes(
config=self.config, specs=self._execnet.specs
)
self.trace("setting up nodes")
return [self.setup_node(spec, putevent) for spec in self.specs]
return [self.setup_node(spec, putevent) for spec in self._execnet.specs]

def setup_node(self, spec, putevent):
gw = self.group.makegateway(spec)
gw = self._execnet.group.makegateway(spec)
self.config.hook.pytest_xdist_newgateway(gateway=gw)
self.rsync_roots(gw)
node = WorkerController(self, gw, self.config, putevent)
Expand All @@ -75,13 +67,11 @@ def setup_node(self, spec, putevent):
return node

def teardown_nodes(self):
self.group.terminate(self.EXIT_TIMEOUT)

def _getxspecs(self):
return [execnet.XSpec(x) for x in parse_spec_config(self.config)]
self._execnet.group.terminate(self.EXIT_TIMEOUT)

def _getrsyncdirs(self):
for spec in self.specs:
# todo: move to backends ?
for spec in self._execnet.specs:
if not spec.popen or spec.chdir:
break
else:
Expand Down Expand Up @@ -130,7 +120,7 @@ def rsync(self, gateway, source, notify=None, verbose=False, ignores=None):
# XXX This changes the calling behaviour of
# pytest_xdist_rsyncstart and pytest_xdist_rsyncfinish to
# be called once per rsync target.
rsync = HostRSync(source, verbose=verbose, ignores=ignores)
rsync = self._execnet.get_rsync(source, verbose=verbose, ignores=ignores)
spec = gateway.spec
if spec.popen and not spec.chdir:
# XXX This assumes that sources are python-packages
Expand All @@ -156,37 +146,6 @@ def finished():
self.config.hook.pytest_xdist_rsyncfinish(source=source, gateways=[gateway])


class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""

def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores = kwargs.pop("ignores", None) or []
self._ignores = [
re.compile(fnmatch.translate(getattr(x, "strpath", x))) for x in ignores
]
super().__init__(sourcedir=sourcedir, **kwargs)

def filter(self, path):
path = py.path.local(path)
for cre in self._ignores:
if cre.match(path.basename) or cre.match(path.strpath):
return False
else:
return True

def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super().add_target(gateway, remotepath, finishedcallback=finished, delete=True)

def _report_send_file(self, gateway, modified_rel_path):
if self._verbose > 0:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.chdir
print("{}:{} <= {}".format(gateway.spec, remotepath, path))


def make_reltoroot(roots, args):
# XXX introduce/use public API for splitting pytest args
splitcode = "::"
Expand Down Expand Up @@ -224,7 +183,7 @@ def __init__(self, nodemanager, gateway, config, putevent):
self.config = config
self.workerinput = {
"workerid": gateway.id,
"workercount": len(nodemanager.specs),
"workercount": len(nodemanager._execnet.specs),
"testrunuid": nodemanager.testrunuid,
"mainargv": sys.argv,
}
Expand Down
4 changes: 2 additions & 2 deletions testing/test_plugin.py
Expand Up @@ -149,15 +149,15 @@ class TestDistOptions:
def test_getxspecs(self, testdir):
config = testdir.parseconfigure("--tx=popen", "--tx", "ssh=xyz")
nodemanager = NodeManager(config)
xspecs = nodemanager._getxspecs()
xspecs = nodemanager._execnet.specs
assert len(xspecs) == 2
print(xspecs)
assert xspecs[0].popen
assert xspecs[1].ssh == "xyz"

def test_xspecs_multiplied(self, testdir):
config = testdir.parseconfigure("--tx=3*popen")
xspecs = NodeManager(config)._getxspecs()
xspecs = NodeManager(config)._execnet.specs
assert len(xspecs) == 3
assert xspecs[1].popen

Expand Down
4 changes: 3 additions & 1 deletion testing/test_remote.py
Expand Up @@ -46,7 +46,9 @@ def setup(self,):

class DummyMananger:
testrunuid = uuid.uuid4().hex
specs = [0, 1]

class _execnet:
specs = [0, 1]

self.slp = WorkerController(DummyMananger, self.gateway, config, putevent)
self.request.addfinalizer(self.slp.ensure_teardown)
Expand Down
41 changes: 24 additions & 17 deletions testing/test_workermanage.py
Expand Up @@ -4,7 +4,8 @@
import execnet
from _pytest.pytester import HookRecorder
from xdist import workermanage, newhooks
from xdist.workermanage import HostRSync, NodeManager
from xdist.workermanage import NodeManager
from xdist.backends import ExecnetNodeControl

pytest_plugins = "pytester"

Expand Down Expand Up @@ -48,13 +49,13 @@ def setup(self):
class TestNodeManagerPopen:
def test_popen_no_default_chdir(self, config):
gm = NodeManager(config, ["popen"])
assert gm.specs[0].chdir is None
assert gm._execnet.specs[0].chdir is None

def test_default_chdir(self, config):
specs = ["ssh=noco", "socket=xyz"]
for spec in NodeManager(config, specs).specs:
for spec in NodeManager(config, specs)._execnet.specs:
assert spec.chdir == "pyexecnetcache"
for spec in NodeManager(config, specs, defaultchdir="abc").specs:
for spec in NodeManager(config, specs, defaultchdir="abc")._execnet.specs:
assert spec.chdir == "abc"

def test_popen_makegateway_events(self, config, hookrecorder, workercontroller):
Expand All @@ -68,16 +69,16 @@ def test_popen_makegateway_events(self, config, hookrecorder, workercontroller):
assert call.gateway.id == "gw0"
call = hookrecorder.popcall("pytest_xdist_newgateway")
assert call.gateway.id == "gw1"
assert len(hm.group) == 2
assert len(hm._execnet.group) == 2
hm.teardown_nodes()
assert not len(hm.group)
assert not len(hm._execnet.group)

def test_popens_rsync(self, config, mysetup, workercontroller):
source = mysetup.source
hm = NodeManager(config, ["popen"] * 2)
hm.setup_nodes(None)
assert len(hm.group) == 2
for gw in hm.group:
assert len(hm._execnet.group) == 2
for gw in hm._execnet.group:

class pseudoexec:
args = []
Expand All @@ -90,11 +91,11 @@ def waitclose(self):

gw.remote_exec = pseudoexec
notifications = []
for gw in hm.group:
for gw in hm._execnet.group:
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
assert not notifications
hm.teardown_nodes()
assert not len(hm.group)
assert not len(hm._execnet.group)
assert "sys.path.insert" in gw.remote_exec.args[0]

def test_rsync_popen_with_path(self, config, mysetup, workercontroller):
Expand All @@ -103,10 +104,14 @@ def test_rsync_popen_with_path(self, config, mysetup, workercontroller):
hm.setup_nodes(None)
source.ensure("dir1", "dir2", "hello")
notifications = []
for gw in hm.group:
for gw in hm._execnet.group:
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
assert len(notifications) == 1
assert notifications[0] == ("rsyncrootready", hm.group["gw0"].spec, source)
assert notifications[0] == (
"rsyncrootready",
hm._execnet.group["gw0"].spec,
source,
)
hm.teardown_nodes()
dest = dest.join(source.basename)
assert dest.join("dir1").check()
Expand All @@ -121,12 +126,12 @@ def test_rsync_same_popen_twice(
hm.roots = []
hm.setup_nodes(None)
source.ensure("dir1", "dir2", "hello")
gw = hm.group[0]
gw = hm._execnet.group[0]
hm.rsync(gw, source)
call = hookrecorder.popcall("pytest_xdist_rsyncstart")
assert call.source == source
assert len(call.gateways) == 1
assert call.gateways[0] in hm.group
assert call.gateways[0] in hm._execnet.group
call = hookrecorder.popcall("pytest_xdist_rsyncfinish")


Expand All @@ -137,7 +142,9 @@ def test_hrsync_filter(self, mysetup):
source.ensure(".svn", "entries")
source.ensure(".somedotfile", "moreentries")
source.ensure("somedir", "editfile~")
syncer = HostRSync(source, ignores=NodeManager.DEFAULT_IGNORES)
syncer = ExecnetNodeControl.get_rsync(
source, ignores=NodeManager.DEFAULT_IGNORES
)
files = list(source.visit(rec=syncer.filter, fil=syncer.filter))
assert len(files) == 3
basenames = [x.basename for x in files]
Expand All @@ -149,7 +156,7 @@ def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync = ExecnetNodeControl.get_rsync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
source.join("hello.py").write("world")
rsync.send()
Expand Down Expand Up @@ -272,7 +279,7 @@ def test_optimise_popen(self, testdir, mysetup, workercontroller):
config = testdir.parseconfig(source)
nodemanager = NodeManager(config, specs)
nodemanager.setup_nodes(None) # calls .rysnc_roots()
for gwspec in nodemanager.specs:
for gwspec in nodemanager._execnet.specs:
assert gwspec._samefilesystem()
assert not gwspec.chdir

Expand Down