Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bug in pipe group handling that led to multiple assignments of the same group id to different groups; bug that accidentally added already running groups of the list of ready jobs (issue #1331) #1332

Merged
merged 8 commits into from Feb 18, 2022
16 changes: 11 additions & 5 deletions snakemake/dag.py
Expand Up @@ -1149,7 +1149,6 @@ def _update_group_components(self):
for groupid, conn_components in groups_by_id.items():
n_components = self.workflow.group_components.get(groupid, 1)
if n_components > 1:
print(n_components)
for chunk in group_into_chunks(n_components, conn_components):
if len(chunk) > 1:
primary = chunk[0]
Expand Down Expand Up @@ -1180,7 +1179,8 @@ def update_ready(self, jobs=None):
else:
group = self._group[job]
group.finalize()
candidate_groups.add(group)
if group not in self._running:
candidate_groups.add(group)

self._ready_jobs.update(
group
Expand Down Expand Up @@ -1286,8 +1286,9 @@ def handle_pipes(self):

if len(candidate_groups) > 1:
if all(isinstance(group, CandidateGroup) for group in candidate_groups):
group = candidate_groups.pop()
for g in candidate_groups:
g.merge(group)
group.merge(g)
else:
raise WorkflowError(
"An output file is marked as "
Expand All @@ -1301,15 +1302,20 @@ def handle_pipes(self):
group = candidate_groups.pop()
else:
# generate a random unique group name
group = CandidateGroup() # str(uuid.uuid4())
group = CandidateGroup()

# set group for job and all downstreams
job.group = group
visited.add(job)
for j in all_depending:
j.group = group
visited.add(j)

# convert candidate groups to plain string IDs
for job in visited:
job.group = group.id if isinstance(group, CandidateGroup) else group
job.group = (
job.group.id if isinstance(job.group, CandidateGroup) else job.group
)

def _ready(self, job):
"""Return whether the given job is ready to execute."""
Expand Down
3 changes: 2 additions & 1 deletion snakemake/scheduler.py
Expand Up @@ -530,6 +530,7 @@ def schedule(self):

def _finish_jobs(self):
# must be called from within lock
# clear the global tofinish such that parallel calls do not interfere
for job in self._tofinish:
if self.handle_job_success:
try:
Expand All @@ -539,7 +540,7 @@ def _finish_jobs(self):
# we do the same as in case of errors during execution
print_exception(e, self.workflow.linemaps)
self._handle_error(job)
return
continue

if self.update_resources:
# normal jobs have len=1, group jobs have len>1
Expand Down
35 changes: 35 additions & 0 deletions tests/test_issue1331/Snakefile
@@ -0,0 +1,35 @@
rule all:
input:
[
"aligned_and_sort/1.txt",
"aligned_and_sort/2.txt",
"aligned_and_sort/3.txt",
"aligned_and_sort/4.txt",
"aligned_and_sort/5.txt",
"aligned_and_sort/6.txt",
]


checkpoint trimming:
output:
"trimmed/{sample}.txt"
shell:
"touch {output}; sleep 1"


rule align:
input:
"trimmed/{sample}.txt"
output:
pipe("aligned/{sample}.txt")
shell:
"touch {output}; sleep 1"


rule sort:
input:
"aligned/{sample}.txt"
output:
"aligned_and_sort/{sample}.txt"
shell:
"touch {output}; sleep 1"
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
6 changes: 6 additions & 0 deletions tests/tests.py
Expand Up @@ -1455,6 +1455,12 @@ def test_modules_ruledeps_inheritance():
run(dpath("test_modules_ruledeps_inheritance"))


def test_issue1331():
# not guaranteed to fail, so let's try multiple times
for i in range(10):
run(dpath("test_issue1331"), cores=4)


@skip_on_windows
def test_conda_named():
run(dpath("test_conda_named"), use_conda=True)
Expand Down