Skip to content

Commit

Permalink
fix: correct handling of exceptions in input functions that are gener…
Browse files Browse the repository at this point in the history
…ators (#1536)

* fix: correct handling of exceptions in input functions that are generators

* fmt
  • Loading branch information
johanneskoester committed Mar 30, 2022
1 parent 5b394c0 commit d9a56aa
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 2 deletions.
2 changes: 1 addition & 1 deletion snakemake/exceptions.py
Expand Up @@ -510,7 +510,7 @@ class CheckSumMismatchException(WorkflowError):
class IncompleteCheckpointException(Exception):
def __init__(self, rule, targetfile):
super().__init__(
"The requested checkpoint output is not yet created."
"The requested checkpoint output is not yet created. "
"If you see this error, you have likely tried to use "
"checkpoint output outside of an input function, or "
"you have tried to call an input function directly "
Expand Down
4 changes: 3 additions & 1 deletion snakemake/executors/__init__.py
Expand Up @@ -465,6 +465,9 @@ def get_python_executable(self):
def get_envvar_declarations(self):
return ""

def get_job_args(self, job, **kwargs):
return f"{super().get_job_args(job, **kwargs)} --quiet"

def run(self, job, callback=None, submit_callback=None, error_callback=None):
super()._run(job)

Expand Down Expand Up @@ -581,7 +584,6 @@ def run_group_job(self, job):

def spawn_job(self, job):
cmd = self.format_job_exec(job)
print(cmd)
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as e:
Expand Down
6 changes: 6 additions & 0 deletions snakemake/rules.py
Expand Up @@ -5,6 +5,7 @@

import os
import re
import types
from snakemake.path_modifier import PATH_MODIFIER_FLAG
import sys
import inspect
Expand Down Expand Up @@ -753,6 +754,11 @@ def apply_input_function(

try:
value = func(Wildcards(fromdict=wildcards), **_aux_params)
if isinstance(value, types.GeneratorType):
# generators should be immediately collected here,
# otherwise we would miss any exceptions and
# would have to capture them again later.
value = list(value)
except IncompleteCheckpointException as e:
value = incomplete_checkpoint_func(e)
incomplete = True
Expand Down
25 changes: 25 additions & 0 deletions tests/test_github_issue261/Snakefile
@@ -0,0 +1,25 @@
import random

checkpoint random:
output:
target = 'test1/{target}/{config}.txt'

run:
with open(output.target, 'w') as ftg:
for i in range(2, 10):
ftg.write(f'test1/{wildcards.target}/{wildcards.config}/v{i}\n')

rule process:
output:
touch('test1/{target}/{config}/v{index}')

def genetate_inputs(wildcards):
with checkpoints.random.get(**wildcards).output[0].open() as fran:
for line in fran.readlines():
yield line.rstrip()

rule generate:
input:
genetate_inputs
output:
touch('test1/{target}/{config}.done')
Empty file.
5 changes: 5 additions & 0 deletions tests/tests.py
Expand Up @@ -1567,6 +1567,11 @@ def test_incomplete_params():
run(dpath("test_incomplete_params"), dryrun=True, printshellcmds=True)


@skip_on_windows
def test_github_issue261():
run(dpath("test_github_issue261"), targets=["test1/target1/config1.done"])


@skip_on_windows # no pipe support on windows
def test_pipe_depend():
run(dpath("test_pipe_depend"), shouldfail=True)
Expand Down

0 comments on commit d9a56aa

Please sign in to comment.