diff --git a/luigi/worker.py b/luigi/worker.py index c3ea777b8a..a11f808ae5 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -55,7 +55,7 @@ from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED from luigi.target import Target -from luigi.task import Task, Config, DynamicRequirements +from luigi.task import Task, Config, DynamicRequirements, flatten from luigi.task_register import TaskClassException from luigi.task_status import RUNNING from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter @@ -185,7 +185,7 @@ def run(self): missing = [] for dep in self.task.deps(): if not self.check_complete(dep): - nonexistent_outputs = [output for output in dep.output() if not output.exists()] + nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()] if nonexistent_outputs: missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})') else: diff --git a/test/worker_task_test.py b/test/worker_task_test.py index af51defe6c..d58f740ae3 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -26,6 +26,7 @@ import luigi import luigi.date_interval import luigi.notifications +from luigi.mock import MockTarget from luigi.worker import TaskException, TaskProcess from luigi.scheduler import DONE, FAILED @@ -106,6 +107,42 @@ def complete(self): None )) + def test_fail_on_unfulfilled_dependencies(self): + class NeverCompleteTask(luigi.Task): + def complete(self): + return False + + class A(NeverCompleteTask): + def output(self): + return [] + + class B(NeverCompleteTask): + def output(self): + return MockTarget("foo-B") + + class C(NeverCompleteTask): + def output(self): + return [MockTarget("foo-C1"), MockTarget("foo-C2")] + + class Main(NeverCompleteTask): + def requires(self): + return [A(), B(), C()] + + task = Main() + result_queue = multiprocessing.Queue() + task_process = TaskProcess(task, 1, result_queue, mock.Mock()) + + with mock.patch.object(result_queue, 'put') as mock_put: + task_process.run() + expected_missing = [A().task_id, f"{B().task_id} (foo-B)", f"{C().task_id} (foo-C1, foo-C2)"] + mock_put.assert_called_once_with(( + task.task_id, + FAILED, + StringContaining(f"Unfulfilled dependencies at run time: {', '.join(expected_missing)}"), + expected_missing, + [], + )) + def test_cleanup_children_on_terminate(self): """ Subprocesses spawned by tasks should be terminated on terminate diff --git a/tox.ini b/tox.ini index 4208e98eca..12a23a93e2 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ deps = pytest<7.0 pytest-cov>=2.0,<3.0 mock<2.0 - moto>=1.3.10 + moto>=1.3.10,<5.0 HTTPretty==0.8.10 docker>=2.1.0 boto>=2.42,<3.0