Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: merging of pipe groups when multiple rules are chained together via pipes #1173

Merged
merged 6 commits into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 33 additions & 8 deletions snakemake/dag.py
Expand Up @@ -1224,6 +1224,8 @@ def postprocess(self, update_needrun=True):
def handle_pipes(self):
"""Use pipes to determine job groups. Check if every pipe has exactly
one consumer"""

visited = set()
for job in self.needrun_jobs:
candidate_groups = set()
if job.group is not None:
Expand Down Expand Up @@ -1283,22 +1285,31 @@ def handle_pipes(self):
continue

if len(candidate_groups) > 1:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
)
if all(isinstance(group, CandidateGroup) for group in candidate_groups):
for g in candidate_groups:
g.merge(group)
else:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
)
elif candidate_groups:
# extend the candidate group to all involved jobs
group = candidate_groups.pop()
else:
# generate a random unique group name
group = str(uuid.uuid4())
group = CandidateGroup() # str(uuid.uuid4())
job.group = group
visited.add(job)
for j in all_depending:
j.group = group
visited.add(j)

for job in visited:
job.group = group.id if isinstance(group, CandidateGroup) else group

def _ready(self, job):
"""Return whether the given job is ready to execute."""
Expand Down Expand Up @@ -2181,3 +2192,17 @@ def __str__(self):

def __len__(self):
return self._len


class CandidateGroup:
def __init__(self):
self.id = str(uuid.uuid4())

def __eq__(self, other):
return self.id == other.id

def __hash__(self):
return hash(self.id)

def merge(self, other):
self.id = other.id
1 change: 1 addition & 0 deletions snakemake/jobs.py
Expand Up @@ -289,6 +289,7 @@ def group(self):

@group.setter
def group(self, group):
print(group, type(group))
self._group = group

@property
Expand Down
27 changes: 27 additions & 0 deletions tests/test_pipes_multiple/Snakefile
@@ -0,0 +1,27 @@
shell.executable("bash")

rule all:
input:
"test.out"

rule a:
output:
pipe("testa.{i}.txt")
shell:
"echo {wildcards.i} > {output}"

rule b:
input:
rules.a.output
output:
pipe("testb.{i}.txt")
shell:
"cat {input} > {output}"

rule c:
input:
expand(rules.b.output, i=range(2))
output:
"test.out"
shell:
"cat {input} > {output}"
2 changes: 2 additions & 0 deletions tests/test_pipes_multiple/expected-results/test.out
@@ -0,0 +1,2 @@
0
1
6 changes: 6 additions & 0 deletions tests/tests.py
Expand Up @@ -832,6 +832,12 @@ def test_pipes():
run(dpath("test_pipes"))


@skip_on_windows
def test_pipes_multiple():
# see github issue #975
run(dpath("test_pipes_multiple"))


def test_pipes_fail():
run(dpath("test_pipes_fail"), shouldfail=True)

Expand Down