Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct handling of exceptions in input functions that are generators #1536

Merged
merged 5 commits into from Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion snakemake/exceptions.py
Expand Up @@ -510,7 +510,7 @@ class CheckSumMismatchException(WorkflowError):
class IncompleteCheckpointException(Exception):
def __init__(self, rule, targetfile):
super().__init__(
"The requested checkpoint output is not yet created."
"The requested checkpoint output is not yet created. "
"If you see this error, you have likely tried to use "
"checkpoint output outside of an input function, or "
"you have tried to call an input function directly "
Expand Down
4 changes: 3 additions & 1 deletion snakemake/executors/__init__.py
Expand Up @@ -465,6 +465,9 @@ def get_python_executable(self):
def get_envvar_declarations(self):
return ""

def get_job_args(self, job, **kwargs):
return f"{super().get_job_args(job, **kwargs)} --quiet"

def run(self, job, callback=None, submit_callback=None, error_callback=None):
super()._run(job)

Expand Down Expand Up @@ -581,7 +584,6 @@ def run_group_job(self, job):

def spawn_job(self, job):
cmd = self.format_job_exec(job)
print(cmd)
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as e:
Expand Down
6 changes: 6 additions & 0 deletions snakemake/rules.py
Expand Up @@ -5,6 +5,7 @@

import os
import re
import types
from snakemake.path_modifier import PATH_MODIFIER_FLAG
import sys
import inspect
Expand Down Expand Up @@ -753,6 +754,11 @@ def apply_input_function(

try:
value = func(Wildcards(fromdict=wildcards), **_aux_params)
if isinstance(value, types.GeneratorType):
# generators should be immediately collected here,
# otherwise we would miss any exceptions and
# would have to capture them again later.
value = list(value)
except IncompleteCheckpointException as e:
value = incomplete_checkpoint_func(e)
incomplete = True
Expand Down
25 changes: 25 additions & 0 deletions tests/test_github_issue261/Snakefile
@@ -0,0 +1,25 @@
import random

checkpoint random:
output:
target = 'test1/{target}/{config}.txt'

run:
with open(output.target, 'w') as ftg:
for i in range(2, 10):
ftg.write(f'test1/{wildcards.target}/{wildcards.config}/v{i}\n')

rule process:
output:
touch('test1/{target}/{config}/v{index}')

def genetate_inputs(wildcards):
with checkpoints.random.get(**wildcards).output[0].open() as fran:
for line in fran.readlines():
yield line.rstrip()

rule generate:
input:
genetate_inputs
output:
touch('test1/{target}/{config}.done')
Empty file.
5 changes: 5 additions & 0 deletions tests/tests.py
Expand Up @@ -1567,6 +1567,11 @@ def test_incomplete_params():
run(dpath("test_incomplete_params"), dryrun=True, printshellcmds=True)


@skip_on_windows
def test_github_issue261():
run(dpath("test_github_issue261"), targets=["test1/target1/config1.done"])


@skip_on_windows # no pipe support on windows
def test_pipe_depend():
run(dpath("test_pipe_depend"), shouldfail=True)
Expand Down