From 503c70c7727e154f8fadf6ead088887d22a87a65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Tue, 31 May 2022 13:41:34 +0200 Subject: [PATCH] fix: propagate rerun trigger info to cluster jobs; fix a bug leading to software stack trigger generating false positives in case of conda environments; fixed display of info message in case of provenance triggered reruns (#1686) * fix: propagate rerun trigger info to cluster jobs and fix a bug leading to software stack trigger generating false positives in case of conda environments * fix: ensure that info message is shown after provenance triggered reruns * fmt * add missing env files * pull singularity images first * fix: avoid duplicate work when updating conda envs and container images * iter over values * more efficient singularity check * fix: lazy checking of conda and singularity executables * add missing envs --- snakemake/dag.py | 76 ++++++++++--------- snakemake/deployment/conda.py | 56 ++++++++------ snakemake/deployment/singularity.py | 67 +++++++++++----- snakemake/executors/__init__.py | 1 + snakemake/jobs.py | 4 +- snakemake/persistence.py | 2 +- snakemake/scheduler.py | 2 +- snakemake/workflow.py | 14 ++-- tests/test_get_log_both/environment.yaml | 3 + tests/test_get_log_complex/environment.yaml | 3 + tests/test_get_log_none/environment.yaml | 3 + tests/test_get_log_stderr/environment.yaml | 3 + tests/test_get_log_stdout/environment.yaml | 3 + .../my_wrapper/environment.yaml | 3 + tests/test_r_wrapper/environment.yaml | 3 + 15 files changed, 160 insertions(+), 83 deletions(-) create mode 100644 tests/test_get_log_both/environment.yaml create mode 100644 tests/test_get_log_complex/environment.yaml create mode 100644 tests/test_get_log_none/environment.yaml create mode 100644 tests/test_get_log_stderr/environment.yaml create mode 100644 tests/test_get_log_stdout/environment.yaml create mode 100644 tests/test_issue1046/my_wrapper/environment.yaml create mode 100644 tests/test_r_wrapper/environment.yaml diff --git a/snakemake/dag.py b/snakemake/dag.py index 03e300ea6..7af4614fc 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -199,6 +199,9 @@ def init(self, progress=False): self.check_incomplete() + self.update_container_imgs() + self.update_conda_envs() + self.update_needrun(create_inventory=True) self.set_until_jobs() self.delete_omitfrom_jobs() @@ -227,7 +230,7 @@ def check_directory_outputs(self): @property def checkpoint_jobs(self): - for job in self.needrun_jobs: + for job in self.needrun_jobs(): if job.is_checkpoint: yield job @@ -279,19 +282,15 @@ def cleanup(self): except KeyError: pass - def create_conda_envs( - self, dryrun=False, forceall=False, init_only=False, quiet=False - ): + def update_conda_envs(self): # First deduplicate based on job.conda_env_spec - jobs = self.jobs if forceall else self.needrun_jobs env_set = { (job.conda_env_spec, job.container_img_url) - for job in jobs + for job in self.jobs if job.conda_env_spec and (self.workflow.assume_shared_fs or job.is_local) } # Then based on md5sum values - self.conda_envs = dict() for (env_spec, simg_url) in env_set: simg = None if simg_url and self.workflow.use_singularity: @@ -299,32 +298,37 @@ def create_conda_envs( simg_url in self.container_imgs ), "bug: must first pull singularity images" simg = self.container_imgs[simg_url] - env = env_spec.get_conda_env( - self.workflow, - container_img=simg, - cleanup=self.workflow.conda_cleanup_pkgs, - ) - self.conda_envs[(env_spec, simg_url)] = env + key = (env_spec, simg_url) + if key not in self.conda_envs: + env = env_spec.get_conda_env( + self.workflow, + container_img=simg, + cleanup=self.workflow.conda_cleanup_pkgs, + ) + self.conda_envs[key] = env - if not init_only: - for env in self.conda_envs.values(): - if (not dryrun or not quiet) and not env.is_named: - env.create(dryrun) + def create_conda_envs(self, dryrun=False, quiet=False): + for env in self.conda_envs.values(): + if (not dryrun or not quiet) and not env.is_named: + env.create(dryrun) - def pull_container_imgs(self, dryrun=False, forceall=False, quiet=False): + def update_container_imgs(self): # First deduplicate based on job.conda_env_spec - jobs = self.jobs if forceall else self.needrun_jobs img_set = { (job.container_img_url, job.is_containerized) - for job in jobs + for job in self.jobs if job.container_img_url } for img_url, is_containerized in img_set: - img = singularity.Image(img_url, self, is_containerized) + if img_url not in self.container_imgs: + img = singularity.Image(img_url, self, is_containerized) + self.container_imgs[img_url] = img + + def pull_container_imgs(self, dryrun=False, quiet=False): + for img in self.container_imgs.values(): if not dryrun or not quiet: img.pull(dryrun) - self.container_imgs[img_url] = img def update_output_index(self): """Update the OutputIndex.""" @@ -388,15 +392,17 @@ def jobs(self): """All jobs in the DAG.""" return self.dependencies.keys() - @property - def needrun_jobs(self): + def needrun_jobs(self, exclude_finished=True): """Jobs that need to be executed.""" - return filterfalse(self.finished, self._needrun) + if exclude_finished: + return filterfalse(self.finished, self._needrun) + else: + return iter(self._needrun) @property def local_needrun_jobs(self): """Iterate over all jobs that need to be run and are marked as local.""" - return filter(lambda job: job.is_local, self.needrun_jobs) + return filter(lambda job: job.is_local, self.needrun_jobs()) @property def finished_jobs(self): @@ -1233,18 +1239,18 @@ def update_priority(self): lambda job: job.rule in self.priorityrules or not self.priorityfiles.isdisjoint(job.output) ) - for job in self.needrun_jobs: + for job in self.needrun_jobs(): self._priority[job] = job.rule.priority for job in self.bfs( self.dependencies, - *filter(prioritized, self.needrun_jobs), + *filter(prioritized, self.needrun_jobs()), stop=self.noneedrun_finished, ): self._priority[job] = Job.HIGHEST_PRIORITY def update_groups(self): groups = dict() - for job in self.needrun_jobs: + for job in self.needrun_jobs(): if job.group is None: continue stop = lambda j: j.group != job.group @@ -1315,7 +1321,7 @@ def update_ready(self, jobs=None): """ if jobs is None: - jobs = self.needrun_jobs + jobs = self.needrun_jobs() potential_new_ready_jobs = False candidate_groups = set() @@ -1365,6 +1371,8 @@ def postprocess( self.cleanup() self.update_jobids() if update_needrun: + self.update_container_imgs() + self.update_conda_envs() self.update_needrun() self.update_priority() self.handle_pipes_and_services() @@ -1393,7 +1401,7 @@ def handle_pipes_and_services(self): one consumer""" visited = set() - for job in self.needrun_jobs: + for job in self.needrun_jobs(): candidate_groups = set() if job.group is not None: candidate_groups.add(job.group) @@ -2217,7 +2225,7 @@ def archive(self, path): if os.path.exists(path): raise WorkflowError("Archive already exists:\n" + path) - self.create_conda_envs(forceall=True) + self.create_conda_envs() try: workdir = Path(os.path.abspath(os.getcwd())) @@ -2342,12 +2350,12 @@ def stats(self): from tabulate import tabulate rules = Counter() - rules.update(job.rule for job in self.needrun_jobs) + rules.update(job.rule for job in self.needrun_jobs()) rules.update(job.rule for job in self.finished_jobs) max_threads = defaultdict(int) min_threads = defaultdict(lambda: sys.maxsize) - for job in chain(self.needrun_jobs, self.finished_jobs): + for job in chain(self.needrun_jobs(), self.finished_jobs): max_threads[job.rule] = max(max_threads[job.rule], job.threads) min_threads[job.rule] = min(min_threads[job.rule], job.threads) rows = [ diff --git a/snakemake/deployment/conda.py b/snakemake/deployment/conda.py index fef6fa0d9..039750d92 100644 --- a/snakemake/deployment/conda.py +++ b/snakemake/deployment/conda.py @@ -32,7 +32,13 @@ from snakemake.exceptions import CreateCondaEnvironmentException, WorkflowError from snakemake.logging import logger -from snakemake.common import is_local_file, parse_uri, strip_prefix, ON_WINDOWS +from snakemake.common import ( + is_local_file, + lazy_property, + parse_uri, + strip_prefix, + ON_WINDOWS, +) from snakemake import utils from snakemake.deployment import singularity, containerize from snakemake.io import ( @@ -65,30 +71,12 @@ def __init__( container_img=None, cleanup=None, ): - self._conda = Conda(container_img) - - self.file = None - self.name = None - self.post_deploy_file = None - self.pin_file = None + self.file = env_file if env_file is not None: self.file = infer_source_file(env_file) - - deploy_file = Path(self.file.get_path_or_uri()).with_suffix( - ".post-deploy.sh" - ) - if deploy_file.exists(): - self.post_deploy_file = infer_source_file(deploy_file) - - pin_file = Path(self.file.get_path_or_uri()).with_suffix( - f".{self._conda.platform}.pin.txt" - ) - - if pin_file.exists(): - self.pin_file = infer_source_file(pin_file) + self.name = env_name if env_name is not None: assert env_file is None, "bug: both env_file and env_name specified" - self.name = env_name self.frontend = workflow.conda_frontend self.workflow = workflow @@ -109,6 +97,30 @@ def __init__( self._cleanup = cleanup self._singularity_args = workflow.singularity_args + @lazy_property + def conda(self): + return Conda(self._container_img) + + @lazy_property + def pin_file(self): + pin_file = Path(self.file.get_path_or_uri()).with_suffix( + f".{self.conda.platform}.pin.txt" + ) + + if pin_file.exists(): + return infer_source_file(pin_file) + else: + return None + + @lazy_property + def post_deploy_file(self): + if self.file: + deploy_file = Path(self.file.get_path_or_uri()).with_suffix( + ".post-deploy.sh" + ) + if deploy_file.exists(): + return infer_source_file(deploy_file) + def _get_content(self): if self.is_named: from snakemake.shell import shell @@ -334,7 +346,7 @@ def execute_deployment_script(self, env_file, deploy_file): ) ) shell.check_output( - self._conda.shellcmd(self.address, "sh {}".format(deploy_file)), + self.conda.shellcmd(self.address, "sh {}".format(deploy_file)), stderr=subprocess.STDOUT, ) diff --git a/snakemake/deployment/singularity.py b/snakemake/deployment/singularity.py index 5ebef81fc..801e09849 100644 --- a/snakemake/deployment/singularity.py +++ b/snakemake/deployment/singularity.py @@ -31,25 +31,7 @@ def __init__(self, url, dag, is_containerized): "Invalid singularity image URL containing " "whitespace." ) - if not shutil.which("singularity"): - raise WorkflowError( - "The singularity command has to be " - "available in order to use singularity " - "integration." - ) - try: - v = subprocess.check_output( - ["singularity", "--version"], stderr=subprocess.PIPE - ).decode() - except subprocess.CalledProcessError as e: - raise WorkflowError( - "Failed to get singularity version:\n{}".format(e.stderr.decode()) - ) - v = v.rsplit(" ", 1)[-1] - if v.startswith("v"): - v = v[1:] - if not LooseVersion(v) >= LooseVersion("2.4.1"): - raise WorkflowError("Minimum singularity version is 2.4.1.") + self.singularity = Singularity() self.url = url self._img_dir = dag.workflow.persistence.container_img_path @@ -66,6 +48,7 @@ def hash(self): return md5hash.hexdigest() def pull(self, dryrun=False): + self.singularity.check() if self.is_local: return if dryrun: @@ -150,3 +133,49 @@ def shellcmd( ) logger.debug(cmd) return cmd + + +class Singularity: + instance = None + + def __new__(cls): + if cls.instance is not None: + return cls.instance + else: + inst = super().__new__(cls) + cls.instance = inst + return inst + + def __init__(self): + self.checked = False + self._version = None + + @property + def version(self): + assert ( + self._version is not None + ), "bug: singularity version accessed before check() has been called" + return self._version + + def check(self): + if not self.checked: + if not shutil.which("singularity"): + raise WorkflowError( + "The singularity command has to be " + "available in order to use singularity " + "integration." + ) + try: + v = subprocess.check_output( + ["singularity", "--version"], stderr=subprocess.PIPE + ).decode() + except subprocess.CalledProcessError as e: + raise WorkflowError( + "Failed to get singularity version:\n{}".format(e.stderr.decode()) + ) + v = v.rsplit(" ", 1)[-1] + if v.startswith("v"): + v = v[1:] + if not LooseVersion(v) >= LooseVersion("2.4.1"): + raise WorkflowError("Minimum singularity version is 2.4.1.") + self._version = v diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index e99f4e792..9685aac3a 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -286,6 +286,7 @@ def general_args(self): "--no-hooks", "--nolock", "--ignore-incomplete", + w2a("rerun_triggers"), w2a("cleanup_scripts", flag="--skip-script-cleanup"), w2a("shadow_prefix"), w2a("use_conda"), diff --git a/snakemake/jobs.py b/snakemake/jobs.py index d6e5a11d6..b4d9d42a8 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -1676,7 +1676,9 @@ def format_files(files): if self.params_changed: s.append("Params have changed since last execution") if self.software_stack_changed: - s.append("Software stack has changed since last execution") + s.append( + "Software environment definition has changed since last execution" + ) s = "; ".join(s) if self.finished: diff --git a/snakemake/persistence.py b/snakemake/persistence.py index 31c0c7147..ceb9e1cb3 100755 --- a/snakemake/persistence.py +++ b/snakemake/persistence.py @@ -502,7 +502,7 @@ def _record_path(self, subject, id): def all_outputfiles(self): # we only look at output files that will be updated - return jobfiles(self.dag.needrun_jobs, "output") + return jobfiles(self.dag.needrun_jobs(), "output") def all_inputfiles(self): # we consider all input files, also of not running jobs diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index 23349d3b7..85798a697 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -403,7 +403,7 @@ def remaining_jobs(self): """Return jobs to be scheduled including not yet ready ones.""" return [ job - for job in self.dag.needrun_jobs + for job in self.dag.needrun_jobs() if job not in self.running and not self.dag.finished(job) ] diff --git a/snakemake/workflow.py b/snakemake/workflow.py index b36ddce32..05e522af5 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -890,7 +890,7 @@ def files(items): ) return False - updated_files.extend(f for job in dag.needrun_jobs for f in job.output) + updated_files.extend(f for job in dag.needrun_jobs() for f in job.output) if generate_unit_tests: from snakemake import unit_tests @@ -1107,15 +1107,19 @@ def files(items): if len(dag): logger.run_info("\n".join(dag.stats())) if any( - dag.reason(job).is_provencant_triggered - for job in dag.needrun_jobs + dag.reason(job).is_provenance_triggered() + for job in dag.needrun_jobs(exclude_finished=False) ): logger.info( "Some jobs were triggered by provenance information, " - "see 'reason' section in the rule displays above. " + "see 'reason' section in the rule displays above.\n" "If you prefer that only modification time is used to " "determine whether a job shall be executed, use the command " - "line option '--rerun-triggers mtime' (also see --help)." + "line option '--rerun-triggers mtime' (also see --help).\n" + "If you are sure that a change for a certain output file (say, ) won't " + "change the result (e.g. because you just changed the formatting of a script " + "or environment definition), you can also wipe its metadata to skip such a trigger via " + "'snakemake --cleanup-metadata '." ) logger.info("") logger.info( diff --git a/tests/test_get_log_both/environment.yaml b/tests/test_get_log_both/environment.yaml new file mode 100644 index 000000000..0c0ece54c --- /dev/null +++ b/tests/test_get_log_both/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: [] \ No newline at end of file diff --git a/tests/test_get_log_complex/environment.yaml b/tests/test_get_log_complex/environment.yaml new file mode 100644 index 000000000..0c0ece54c --- /dev/null +++ b/tests/test_get_log_complex/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: [] \ No newline at end of file diff --git a/tests/test_get_log_none/environment.yaml b/tests/test_get_log_none/environment.yaml new file mode 100644 index 000000000..0c0ece54c --- /dev/null +++ b/tests/test_get_log_none/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: [] \ No newline at end of file diff --git a/tests/test_get_log_stderr/environment.yaml b/tests/test_get_log_stderr/environment.yaml new file mode 100644 index 000000000..0c0ece54c --- /dev/null +++ b/tests/test_get_log_stderr/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: [] \ No newline at end of file diff --git a/tests/test_get_log_stdout/environment.yaml b/tests/test_get_log_stdout/environment.yaml new file mode 100644 index 000000000..0c0ece54c --- /dev/null +++ b/tests/test_get_log_stdout/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: [] \ No newline at end of file diff --git a/tests/test_issue1046/my_wrapper/environment.yaml b/tests/test_issue1046/my_wrapper/environment.yaml new file mode 100644 index 000000000..382736015 --- /dev/null +++ b/tests/test_issue1046/my_wrapper/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: [] diff --git a/tests/test_r_wrapper/environment.yaml b/tests/test_r_wrapper/environment.yaml new file mode 100644 index 000000000..382736015 --- /dev/null +++ b/tests/test_r_wrapper/environment.yaml @@ -0,0 +1,3 @@ +channels: + - conda-forge +dependencies: []