Skip to content


remove unasync
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Jun 27, 2021
1 parent 72be7f2 commit 7f6c20b
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,5 +1,5 @@
requires = ["setuptools", "wheel", "unasync~=0.5.0"]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

Expand Down
4 changes: 2 additions & 2 deletions pytest_bdd/
@@ -1,8 +1,8 @@
"""pytest-bdd public API."""

from pytest_bdd.steps import given, when, then
from pytest_bdd.scenario import scenario, scenarios, async_scenario, async_scenarios
from pytest_bdd.scenario import scenario, scenarios

__version__ = "4.0.2"

__all__ = ["given", "when", "then", "scenario", "scenarios", "async_scenario", "async_scenarios"]
__all__ = ["given", "when", "then", "scenario", "scenarios"]
360 changes: 339 additions & 21 deletions pytest_bdd/
@@ -1,22 +1,340 @@
__all__ = [

from ._async.scenario import scenario as async_scenario, scenarios as async_scenarios
from ._sync.scenario import (
"""Scenario implementation.
The pytest will collect the test case and the steps will be executed
line by line.
test_publish_article = scenario(
scenario_name="Publishing the article",
import contextlib
import collections
import os
import re

import pytest
from _pytest.fixtures import FixtureLookupError

from . import exceptions
from .feature import get_feature, get_features
from .steps import get_step_fixture_name, inject_fixture
from .utils import CONFIG_STACK, get_args, get_caller_module_locals, get_caller_module_path

PYTHON_REPLACE_REGEX = re.compile(r"\W")
ALPHA_REGEX = re.compile(r"^\d+_*")

def find_argumented_step_fixture_name(name, type_, fixturemanager, request=None):
"""Find argumented step fixture name."""
# happens to be that _arg2fixturedefs is changed during the iteration so we use a copy
for fixturename, fixturedefs in list(fixturemanager._arg2fixturedefs.items()):
for fixturedef in fixturedefs:
parser = getattr(fixturedef.func, "parser", None)
if parser is None:
match = parser.is_matching(name)
if not match:

converters = getattr(fixturedef.func, "converters", {})
for arg, value in parser.parse_arguments(name).items():
if arg in converters:
value = converters[arg](value)
if request:
inject_fixture(request, arg, value)
parser_name = get_step_fixture_name(, type_)
if request:
except FixtureLookupError:
return parser_name

def _find_step_function(request, step, scenario):
"""Match the step defined by the regular expression pattern.
:param request: PyTest request object.
:param step: Step.
:param scenario: Scenario.
:return: Function of the step.
:rtype: function
name =
# Simple case where no parser is used for the step
return request.getfixturevalue(get_step_fixture_name(name, step.type))
except FixtureLookupError:
# Could not find a fixture with the same name, let's see if there is a parser involved
name = find_argumented_step_fixture_name(name, step.type, request._fixturemanager, request)
if name:
return request.getfixturevalue(name)
except FixtureLookupError:
raise exceptions.StepDefinitionNotFoundError(
f"Step definition is not found: {step}. "
f'Line {step.line_number} in scenario "{}" in the feature "{scenario.feature.filename}"'

async def _execute_step_function(request, scenario, step, step_func, sync):
"""Execute step function.
:param request: PyTest request.
:param scenario: Scenario.
:param step: Step.
:param function step_func: Step function.
:param example: Example table.
kw = dict(request=request, feature=scenario.feature, scenario=scenario, step=step, step_func=step_func)


kw["step_func_args"] = {}
# Get the step argument values.
kwargs = {arg: request.getfixturevalue(arg) for arg in get_args(step_func)}
kw["step_func_args"] = kwargs

target_fixture = getattr(step_func, "target_fixture", None)
# Execute the step.
if sync:
return_value = step_func(**kwargs)
return_value = await step_func(**kwargs)
if target_fixture:
inject_fixture(request, target_fixture, return_value)

except Exception as exception:
request.config.hook.pytest_bdd_step_error(exception=exception, **kw)

async def _execute_scenario(feature, scenario, request, sync):
"""Execute the scenario.
:param feature: Feature.
:param scenario: Scenario.
:param request: request.
:param encoding: Encoding.
request.config.hook.pytest_bdd_before_scenario(request=request, feature=feature, scenario=scenario)

# Execute scenario steps
for step in scenario.steps:
step_func = _find_step_function(request, step, scenario)
except exceptions.StepDefinitionNotFoundError as exception:
request=request, feature=feature, scenario=scenario, step=step, exception=exception
await _execute_step_function(request, scenario, step, step_func, sync)
request.config.hook.pytest_bdd_after_scenario(request=request, feature=feature, scenario=scenario)

FakeRequest = collections.namedtuple("FakeRequest", ["module"])

def await_(fn, *args):
v = fn(*args)
with contextlib.closing(v.__await__()) as gen:
except StopIteration as e:
return e.value
raise RuntimeError("coro did not stop")

def _get_scenario_decorator(feature, feature_name, scenario, scenario_name, *, sync):
# HACK: Ideally we would use `def decorator(fn)`, but we want to return a custom exception
# when the decorator is misused.
# Pytest inspect the signature to determine the required fixtures, and in that case it would look
# for a fixture called "fn" that doesn't exist (if it exists then it's even worse).
# It will error with a "fixture 'fn' not found" message instead.
# We can avoid this hack by using a pytest hook and check for misuse instead.
def decorator(*args):
if not args:
raise exceptions.ScenarioIsDecoratorOnly(
"scenario function can only be used as a decorator. Refer to the documentation."
[fn] = args
args = get_args(fn)
function_args = list(args)
for arg in scenario.get_example_params():
if arg not in function_args:

if sync:

def scenario_wrapper(request):
await_(_execute_scenario, feature, scenario, request, sync)
return fn(*[request.getfixturevalue(arg) for arg in args])


async def scenario_wrapper(request):
await _execute_scenario(feature, scenario, request, sync)
return await fn(*[request.getfixturevalue(arg) for arg in args])

for param_set in scenario.get_params():
if param_set:
scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper)
for tag in scenario.tags.union(feature.tags):
config = CONFIG_STACK[-1]
config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper)

scenario_wrapper.__doc__ = f"{feature_name}: {scenario_name}"
scenario_wrapper.__scenario__ = scenario
scenario.test_function = scenario_wrapper
return scenario_wrapper

return decorator

def scenario(feature_name, scenario_name, encoding="utf-8", example_converters=None, features_base_dir=None, sync=True):
"""Scenario decorator.
:param str feature_name: Feature file name. Absolute or relative to the configured feature base path.
:param str scenario_name: Scenario name.
:param str encoding: Feature file encoding.
:param dict example_converters: optional `dict` of example converter function, where key is the name of the
example parameter, and value is the converter function.

scenario_name = str(scenario_name)
caller_module_path = get_caller_module_path()

# Get the feature
if features_base_dir is None:
features_base_dir = get_features_base_dir(caller_module_path)
feature = get_feature(features_base_dir, feature_name, encoding=encoding)

# Get the scenario
scenario = feature.scenarios[scenario_name]
except KeyError:
feature_name = or "[Empty]"
raise exceptions.ScenarioNotFound(
f'Scenario "{scenario_name}" in feature "{feature_name}" in {feature.filename} is not found.'

scenario.example_converters = example_converters

# Validate the scenario

return _get_scenario_decorator(
feature=feature, feature_name=feature_name, scenario=scenario, scenario_name=scenario_name, sync=sync

def get_features_base_dir(caller_module_path):
default_base_dir = os.path.dirname(caller_module_path)
return get_from_ini("bdd_features_base_dir", default_base_dir)

def get_from_ini(key, default):
"""Get value from ini config. Return default if value has not been set.
Use if the default value is dynamic. Otherwise set default on addini call.
config = CONFIG_STACK[-1]
value = config.getini(key)
return value if value != "" else default

def make_python_name(string):
"""Make python attribute name out of a given string."""
string = re.sub(PYTHON_REPLACE_REGEX, "", string.replace(" ", "_"))
return re.sub(ALPHA_REGEX, "", string).lower()

def make_python_docstring(string):
"""Make a python docstring literal out of a given string."""
return '"""{}."""'.format(string.replace('"""', '\\"\\"\\"'))

def make_string_literal(string):
"""Make python string literal out of a given string."""
return "'{}'".format(string.replace("'", "\\'"))

def get_python_name_generator(name):
"""Generate a sequence of suitable python names out of given arbitrary string name."""
python_name = make_python_name(name)
suffix = ""
index = 0

def get_name():
return f"test_{python_name}{suffix}"

while True:
yield get_name()
index += 1
suffix = f"_{index}"

def scenarios(*feature_paths, sync=True, **kwargs):
"""Parse features from the paths and put all found scenarios in the caller module.
:param *feature_paths: feature file paths to use for scenarios
caller_locals = get_caller_module_locals()
caller_path = get_caller_module_path()

features_base_dir = kwargs.get("features_base_dir")
if features_base_dir is None:
features_base_dir = get_features_base_dir(caller_path)

abs_feature_paths = []
for path in feature_paths:
if not os.path.isabs(path):
path = os.path.abspath(os.path.join(features_base_dir, path))
found = False

module_scenarios = frozenset(
for name, attr in caller_locals.items()
if hasattr(attr, "__scenario__")

for feature in get_features(abs_feature_paths):
for scenario_name, scenario_object in feature.scenarios.items():
# skip already bound scenarios
if (scenario_object.feature.filename, scenario_name) not in module_scenarios:

decorator = scenario(feature.filename, scenario_name, sync=sync, **kwargs)
if sync:

def _scenario():
pass # pragma: no cover


async def _scenario():
pass # pragma: no cover

for test_name in get_python_name_generator(scenario_name):
if test_name not in caller_locals:
# found an unique test name
caller_locals[test_name] = _scenario
found = True
if not found:
raise exceptions.NoScenariosFound(abs_feature_paths)
11 changes: 1 addition & 10 deletions
@@ -1,12 +1,3 @@
import unasync
from setuptools import setup

"build_py": unasync.cmdclass_build_py(
unasync.Rule("/pytest_bdd/_async/", "/pytest_bdd/_sync/"),

0 comments on commit 7f6c20b

Please sign in to comment.