diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst index 199333f4c..4911cf3a4 100644 --- a/docs/snakefiles/rules.rst +++ b/docs/snakefiles/rules.rst @@ -1036,6 +1036,61 @@ The timestamp of such files is ignored and always assumed to be older than any o Here, this means that the file ``path/to/outputfile`` will not be triggered for re-creation after it has been generated once, even when the input file is modified in the future. Note that any flag that forces re-creation of files still also applies to files marked as ``ancient``. +.. _snakefiles_ensure:: + +Ensuring output file properties like non-emptyness or checksum compliance +------------------------------------------------------------------------- + +It is possible to annotate certain additional criteria for output files to be ensured after they have been generated successfully. +For example, this can be used to check for output files to be non-empty, or to compare them against a given sha256 checksum. +If this functionality is used, Snakemake will check such annotated files before considering a job to be successfull. +Non-emptyness can be checked as follows: + +.. code-block:: python + + rule NAME: + output: + ensure("test.txt", non_empty=True) + shell: + "somecommand {output}" + +Above, the output file ``test.txt`` is marked as non-empty. +If the command ``somecommand`` happens to generate an empty output, +the job will fail with an error listing the unexpected empty file. + +A sha256 checksum can be compared as follows: + +.. code-block:: python + + my_checksum = "u98a9cjsd98saud090923ßkpoasköf9ß32" + + rule NAME: + output: + ensure("test.txt", sha256=my_checksum) + shell: + "somecommand {output}" + +In addition to providing the checksum as plain string, it is possible to provide a pointer to a function (similar to :ref:`input functions `). +The function has to accept a single argument that will be the wildcards object generated from the application of the rule to create some requested output files: + +.. code-block:: python + + def get_checksum(wildcards): + # e.g., look up the checksum with the value of the wildcard sample + # in some dictionary + return my_checksums[wildcards.sample] + + rule NAME: + output: + ensure("test/{sample}.txt", sha256=get_checksum) + shell: + "somecommand {output}" + + +Note that you can also use `lambda expressions `_ instead of full function definitions. + +Often, it is a good idea to combine ``ensure`` annotations with :ref:`retry definitions `, e.g. for retrying upon invalid checksums or empty files. + Shadow rules ------------ @@ -1069,7 +1124,7 @@ Consider running with the ``--cleanup-shadow`` argument every now and then to remove any remaining shadow directories from aborted jobs. The base shadow directory can be changed with the ``--shadow-prefix`` command line argument. -.. _snakefiles-retries: +.. _snakefiles_retries: Defining retries for fallible rules ----------------------------------- @@ -1087,6 +1142,8 @@ For such cases, it is possible to defined a number of automatic retries for each shell: "curl https://some.unreliable.server/test.txt > {output}" +Often, it is a good idea to combine retry functionality with :ref:`ensure annotations `, e.g. for retrying upon invalid checksums or empty files. + Note that it is also possible to define retries globally (via the ``--retries`` command line option, see :ref:`all_options`). The local definition of the rule thereby overwrites the global definition. diff --git a/snakemake/dag.py b/snakemake/dag.py index aa1d017b5..69ceb0273 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -17,7 +17,14 @@ import uuid import math -from snakemake.io import PeriodicityDetector, wait_for_files, is_flagged, IOFile +from snakemake.io import ( + PeriodicityDetector, + get_flag_value, + is_callable, + wait_for_files, + is_flagged, + IOFile, +) from snakemake.jobs import Reason, JobFactory, GroupJobFactory, Job from snakemake.exceptions import MissingInputException from snakemake.exceptions import MissingRuleException, AmbiguousRuleException @@ -479,6 +486,55 @@ def missing_temp(self, job): return True return False + def handle_ensure(self, job, expanded_output): + ensured_output = { + f: get_flag_value(f, "ensure") + for f in expanded_output + if is_flagged(f, "ensure") + } + # handle non_empty + empty_output = [ + f + for f, ensure in ensured_output.items() + if ensure["non_empty"] and f.size == 0 + ] + if empty_output: + raise WorkflowError( + "Detected unexpected empty output files. " + "Something went wrong in the rule without " + "an error being reported:\n{}".format("\n".join(empty_output)), + rule=job.rule, + ) + + # handle checksum + def is_not_same_checksum(f, checksum): + if checksum is None: + return False + if is_callable(checksum): + try: + checksum = checksum(job.wildcards) + except Exception as e: + raise WorkflowError( + "Error calling checksum function provided to ensure marker.", + e, + rule=job.rule, + ) + return not f.is_same_checksum(checksum, force=True) + + checksum_failed_output = [ + f + for f, ensure in ensured_output.items() + if is_not_same_checksum(f, ensure.get("sha256")) + ] + if checksum_failed_output: + raise WorkflowError( + "Output files have checksums that differ from the expected ones " + "defined in the workflow:\n{}".format( + "\n".join(checksum_failed_output) + ), + rule=job.rule, + ) + def check_and_touch_output( self, job, @@ -508,13 +564,16 @@ def check_and_touch_output( ) # Ensure that outputs are of the correct type (those flagged with directory() - # are directories and not files and vice versa). We can't check for remote objects + # are directories and not files and vice versa). We can't check for remote objects. for f in expanded_output: if (f.is_directory and not f.remote_object and not os.path.isdir(f)) or ( not f.remote_object and os.path.isdir(f) and not f.is_directory ): raise ImproperOutputException(job, [f]) + # Handle ensure flags + self.handle_ensure(job, expanded_output) + # It is possible, due to archive expansion or cluster clock skew, that # the files appear older than the input. But we know they must be new, # so touch them to update timestamps. This also serves to touch outputs diff --git a/snakemake/io.py b/snakemake/io.py index 4d91ec5e6..182b1d1d0 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -605,8 +605,8 @@ def checksum(self, force=False): else: return None - def is_same_checksum(self, other_checksum): - checksum = self.checksum() + def is_same_checksum(self, other_checksum, force=False): + checksum = self.checksum(force=force) if checksum is None or other_checksum is None: # if no checksum available or files too large, not the same return False @@ -1114,6 +1114,10 @@ def touch(value): return flag(value, "touch") +def ensure(value, non_empty=False, sha256=None): + return flag(value, "ensure", {"non_empty": non_empty, "sha256": sha256}) + + def unpack(value): return flag(value, "unpack") diff --git a/snakemake/rules.py b/snakemake/rules.py index 4d7ef7bea..151b36a95 100644 --- a/snakemake/rules.py +++ b/snakemake/rules.py @@ -560,6 +560,7 @@ def _set_inoutput_item(self, item, output=False, name=None): "touch", "pipe", "service", + "ensure", ]: logger.warning( "The flag '{}' used in rule {} is only valid for outputs, not inputs.".format( diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 7f020ea46..ef9f515e6 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -55,6 +55,7 @@ repeat, report, multiext, + ensure, IOFile, sourcecache_entry, ) diff --git a/tests/test_ensure/Snakefile b/tests/test_ensure/Snakefile new file mode 100644 index 000000000..3ddcbc621 --- /dev/null +++ b/tests/test_ensure/Snakefile @@ -0,0 +1,31 @@ +shell.executable("bash") + +rule a: + output: + ensure("test.txt", non_empty=True) + shell: + "touch {output}" + + +rule b: + output: + ensure("test2.txt", non_empty=True) + shell: + "echo test > {output}" + + +sha256 = "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" + + +rule c: + output: + ensure("test3.txt", sha256=sha256) + shell: + "echo -n test > {output}" + + +rule d: + output: + ensure("test4.txt", sha256=lambda w: sha256) + shell: + "echo -n test2 > {output}" diff --git a/tests/test_ensure/expected-results/test2.txt b/tests/test_ensure/expected-results/test2.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_ensure/expected-results/test2.txt @@ -0,0 +1 @@ +test diff --git a/tests/tests.py b/tests/tests.py index f3ab66d80..204ed4fea 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1616,6 +1616,18 @@ def test_github_issue1389(): run(dpath("test_github_issue1389"), resources={"foo": 4}, shouldfail=True) +def test_ensure_nonempty_fail(): + run(dpath("test_ensure"), targets=["a"], shouldfail=True) + + +def test_ensure_success(): + run(dpath("test_ensure"), targets=["b", "c"]) + + +def test_ensure_checksum_fail(): + run(dpath("test_ensure"), targets=["d"], shouldfail=True) + + @skip_on_windows def test_github_issue1261(): run(dpath("test_github_issue1261"), shouldfail=True, check_results=True)