Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: merging of pipe groups when multiple rules are chained together …
…via pipes (#1173)

* when handling pipes, process jobs in BFS order

handle_pipes() depends on the jobs being in BFS order
otherwise, it won't merge groups together properly
see #975

* create test for #975 - multiple piped rules

* create output for multiple pipes test

* fix formatting and filter finished jobs

* register test_pipes_multiple with nosetests in tests.py

* perf: use mergeable CandidateGroup objects instead of topological sorting of the jobs (faster).

Co-authored-by: Johannes Köster <johannes.koester@tu-dortmund.de>
  • Loading branch information
aryarm and johanneskoester committed Sep 24, 2021
1 parent 51e5afc commit de91d2c
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
41 changes: 33 additions & 8 deletions snakemake/dag.py
Expand Up @@ -1224,6 +1224,8 @@ def postprocess(self, update_needrun=True):
def handle_pipes(self):
"""Use pipes to determine job groups. Check if every pipe has exactly
one consumer"""

visited = set()
for job in self.needrun_jobs:
candidate_groups = set()
if job.group is not None:
Expand Down Expand Up @@ -1283,22 +1285,31 @@ def handle_pipes(self):
continue

if len(candidate_groups) > 1:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
)
if all(isinstance(group, CandidateGroup) for group in candidate_groups):
for g in candidate_groups:
g.merge(group)
else:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
)
elif candidate_groups:
# extend the candidate group to all involved jobs
group = candidate_groups.pop()
else:
# generate a random unique group name
group = str(uuid.uuid4())
group = CandidateGroup() # str(uuid.uuid4())
job.group = group
visited.add(job)
for j in all_depending:
j.group = group
visited.add(j)

for job in visited:
job.group = group.id if isinstance(group, CandidateGroup) else group

def _ready(self, job):
"""Return whether the given job is ready to execute."""
Expand Down Expand Up @@ -2181,3 +2192,17 @@ def __str__(self):

def __len__(self):
return self._len


class CandidateGroup:
def __init__(self):
self.id = str(uuid.uuid4())

def __eq__(self, other):
return self.id == other.id

def __hash__(self):
return hash(self.id)

def merge(self, other):
self.id = other.id
1 change: 1 addition & 0 deletions snakemake/jobs.py
Expand Up @@ -289,6 +289,7 @@ def group(self):

@group.setter
def group(self, group):
print(group, type(group))
self._group = group

@property
Expand Down
27 changes: 27 additions & 0 deletions tests/test_pipes_multiple/Snakefile
@@ -0,0 +1,27 @@
shell.executable("bash")

rule all:
input:
"test.out"

rule a:
output:
pipe("testa.{i}.txt")
shell:
"echo {wildcards.i} > {output}"

rule b:
input:
rules.a.output
output:
pipe("testb.{i}.txt")
shell:
"cat {input} > {output}"

rule c:
input:
expand(rules.b.output, i=range(2))
output:
"test.out"
shell:
"cat {input} > {output}"
2 changes: 2 additions & 0 deletions tests/test_pipes_multiple/expected-results/test.out
@@ -0,0 +1,2 @@
0
1
6 changes: 6 additions & 0 deletions tests/tests.py
Expand Up @@ -832,6 +832,12 @@ def test_pipes():
run(dpath("test_pipes"))


@skip_on_windows
def test_pipes_multiple():
# see github issue #975
run(dpath("test_pipes_multiple"))


def test_pipes_fail():
run(dpath("test_pipes_fail"), shouldfail=True)

Expand Down

0 comments on commit de91d2c

Please sign in to comment.