From 1a9b483a6c675315d74bff791502c2bdd74609c1 Mon Sep 17 00:00:00 2001 From: Maarten-vd-Sande Date: Fri, 18 Feb 2022 20:17:02 +0100 Subject: [PATCH] fix: bug in pipe group handling that led to multiple assignments of the same group id to different groups; bug that accidentally added already running groups of the list of ready jobs (issue #1331) (#1332) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * issue 1331 * Update Snakefile * Update Snakefile * fix: bug in pipe group handling that led to multiple assignments of the same group id to different groups; bug that accidentally added already running groups of the list of ready jobs * fmt * skip on win Co-authored-by: Johannes Köster --- snakemake/dag.py | 16 ++++++--- snakemake/scheduler.py | 3 +- tests/test_issue1331/Snakefile | 35 +++++++++++++++++++ .../expected-results/aligned_and_sort/1.txt | 0 .../expected-results/aligned_and_sort/2.txt | 0 .../expected-results/aligned_and_sort/3.txt | 0 .../expected-results/aligned_and_sort/4.txt | 0 .../expected-results/aligned_and_sort/5.txt | 0 .../expected-results/aligned_and_sort/6.txt | 0 tests/tests.py | 7 ++++ 10 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 tests/test_issue1331/Snakefile create mode 100644 tests/test_issue1331/expected-results/aligned_and_sort/1.txt create mode 100644 tests/test_issue1331/expected-results/aligned_and_sort/2.txt create mode 100644 tests/test_issue1331/expected-results/aligned_and_sort/3.txt create mode 100644 tests/test_issue1331/expected-results/aligned_and_sort/4.txt create mode 100644 tests/test_issue1331/expected-results/aligned_and_sort/5.txt create mode 100644 tests/test_issue1331/expected-results/aligned_and_sort/6.txt diff --git a/snakemake/dag.py b/snakemake/dag.py index ee45228e0..11b57e6b7 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -1149,7 +1149,6 @@ def _update_group_components(self): for groupid, conn_components in groups_by_id.items(): n_components = self.workflow.group_components.get(groupid, 1) if n_components > 1: - print(n_components) for chunk in group_into_chunks(n_components, conn_components): if len(chunk) > 1: primary = chunk[0] @@ -1180,7 +1179,8 @@ def update_ready(self, jobs=None): else: group = self._group[job] group.finalize() - candidate_groups.add(group) + if group not in self._running: + candidate_groups.add(group) self._ready_jobs.update( group @@ -1286,8 +1286,9 @@ def handle_pipes(self): if len(candidate_groups) > 1: if all(isinstance(group, CandidateGroup) for group in candidate_groups): + group = candidate_groups.pop() for g in candidate_groups: - g.merge(group) + group.merge(g) else: raise WorkflowError( "An output file is marked as " @@ -1301,15 +1302,20 @@ def handle_pipes(self): group = candidate_groups.pop() else: # generate a random unique group name - group = CandidateGroup() # str(uuid.uuid4()) + group = CandidateGroup() + + # set group for job and all downstreams job.group = group visited.add(job) for j in all_depending: j.group = group visited.add(j) + # convert candidate groups to plain string IDs for job in visited: - job.group = group.id if isinstance(group, CandidateGroup) else group + job.group = ( + job.group.id if isinstance(job.group, CandidateGroup) else job.group + ) def _ready(self, job): """Return whether the given job is ready to execute.""" diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index e27a6f234..c26f39e9a 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -530,6 +530,7 @@ def schedule(self): def _finish_jobs(self): # must be called from within lock + # clear the global tofinish such that parallel calls do not interfere for job in self._tofinish: if self.handle_job_success: try: @@ -539,7 +540,7 @@ def _finish_jobs(self): # we do the same as in case of errors during execution print_exception(e, self.workflow.linemaps) self._handle_error(job) - return + continue if self.update_resources: # normal jobs have len=1, group jobs have len>1 diff --git a/tests/test_issue1331/Snakefile b/tests/test_issue1331/Snakefile new file mode 100644 index 000000000..3108db7ab --- /dev/null +++ b/tests/test_issue1331/Snakefile @@ -0,0 +1,35 @@ +rule all: + input: + [ + "aligned_and_sort/1.txt", + "aligned_and_sort/2.txt", + "aligned_and_sort/3.txt", + "aligned_and_sort/4.txt", + "aligned_and_sort/5.txt", + "aligned_and_sort/6.txt", + ] + + +checkpoint trimming: + output: + "trimmed/{sample}.txt" + shell: + "touch {output}; sleep 1" + + +rule align: + input: + "trimmed/{sample}.txt" + output: + pipe("aligned/{sample}.txt") + shell: + "touch {output}; sleep 1" + + +rule sort: + input: + "aligned/{sample}.txt" + output: + "aligned_and_sort/{sample}.txt" + shell: + "touch {output}; sleep 1" diff --git a/tests/test_issue1331/expected-results/aligned_and_sort/1.txt b/tests/test_issue1331/expected-results/aligned_and_sort/1.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_issue1331/expected-results/aligned_and_sort/2.txt b/tests/test_issue1331/expected-results/aligned_and_sort/2.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_issue1331/expected-results/aligned_and_sort/3.txt b/tests/test_issue1331/expected-results/aligned_and_sort/3.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_issue1331/expected-results/aligned_and_sort/4.txt b/tests/test_issue1331/expected-results/aligned_and_sort/4.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_issue1331/expected-results/aligned_and_sort/5.txt b/tests/test_issue1331/expected-results/aligned_and_sort/5.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_issue1331/expected-results/aligned_and_sort/6.txt b/tests/test_issue1331/expected-results/aligned_and_sort/6.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/tests.py b/tests/tests.py index a2b62b76f..bfa2503fa 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1455,6 +1455,13 @@ def test_modules_ruledeps_inheritance(): run(dpath("test_modules_ruledeps_inheritance")) +@skip_on_windows +def test_issue1331(): + # not guaranteed to fail, so let's try multiple times + for i in range(10): + run(dpath("test_issue1331"), cores=4) + + @skip_on_windows def test_conda_named(): run(dpath("test_conda_named"), use_conda=True)