Skip to content

Commit

Permalink
fix: propagate rerun trigger info to cluster jobs; fix a bug leading …
Browse files Browse the repository at this point in the history
…to software stack trigger generating false positives in case of conda environments; fixed display of info message in case of provenance triggered reruns (#1686)

* fix: propagate rerun trigger info to cluster jobs and fix a bug leading to software stack trigger generating false positives in case of conda environments

* fix: ensure that info message is shown after provenance triggered reruns

* fmt

* add missing env files

* pull singularity images first

* fix: avoid duplicate work when updating conda envs and container images

* iter over values

* more efficient singularity check

* fix: lazy checking of conda and singularity executables

* add missing envs
  • Loading branch information
johanneskoester committed May 31, 2022
1 parent 35d72ee commit 503c70c
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 83 deletions.
76 changes: 42 additions & 34 deletions snakemake/dag.py
Expand Up @@ -199,6 +199,9 @@ def init(self, progress=False):

self.check_incomplete()

self.update_container_imgs()
self.update_conda_envs()

self.update_needrun(create_inventory=True)
self.set_until_jobs()
self.delete_omitfrom_jobs()
Expand Down Expand Up @@ -227,7 +230,7 @@ def check_directory_outputs(self):

@property
def checkpoint_jobs(self):
for job in self.needrun_jobs:
for job in self.needrun_jobs():
if job.is_checkpoint:
yield job

Expand Down Expand Up @@ -279,52 +282,53 @@ def cleanup(self):
except KeyError:
pass

def create_conda_envs(
self, dryrun=False, forceall=False, init_only=False, quiet=False
):
def update_conda_envs(self):
# First deduplicate based on job.conda_env_spec
jobs = self.jobs if forceall else self.needrun_jobs
env_set = {
(job.conda_env_spec, job.container_img_url)
for job in jobs
for job in self.jobs
if job.conda_env_spec and (self.workflow.assume_shared_fs or job.is_local)
}

# Then based on md5sum values
self.conda_envs = dict()
for (env_spec, simg_url) in env_set:
simg = None
if simg_url and self.workflow.use_singularity:
assert (
simg_url in self.container_imgs
), "bug: must first pull singularity images"
simg = self.container_imgs[simg_url]
env = env_spec.get_conda_env(
self.workflow,
container_img=simg,
cleanup=self.workflow.conda_cleanup_pkgs,
)
self.conda_envs[(env_spec, simg_url)] = env
key = (env_spec, simg_url)
if key not in self.conda_envs:
env = env_spec.get_conda_env(
self.workflow,
container_img=simg,
cleanup=self.workflow.conda_cleanup_pkgs,
)
self.conda_envs[key] = env

if not init_only:
for env in self.conda_envs.values():
if (not dryrun or not quiet) and not env.is_named:
env.create(dryrun)
def create_conda_envs(self, dryrun=False, quiet=False):
for env in self.conda_envs.values():
if (not dryrun or not quiet) and not env.is_named:
env.create(dryrun)

def pull_container_imgs(self, dryrun=False, forceall=False, quiet=False):
def update_container_imgs(self):
# First deduplicate based on job.conda_env_spec
jobs = self.jobs if forceall else self.needrun_jobs
img_set = {
(job.container_img_url, job.is_containerized)
for job in jobs
for job in self.jobs
if job.container_img_url
}

for img_url, is_containerized in img_set:
img = singularity.Image(img_url, self, is_containerized)
if img_url not in self.container_imgs:
img = singularity.Image(img_url, self, is_containerized)
self.container_imgs[img_url] = img

def pull_container_imgs(self, dryrun=False, quiet=False):
for img in self.container_imgs.values():
if not dryrun or not quiet:
img.pull(dryrun)
self.container_imgs[img_url] = img

def update_output_index(self):
"""Update the OutputIndex."""
Expand Down Expand Up @@ -388,15 +392,17 @@ def jobs(self):
"""All jobs in the DAG."""
return self.dependencies.keys()

@property
def needrun_jobs(self):
def needrun_jobs(self, exclude_finished=True):
"""Jobs that need to be executed."""
return filterfalse(self.finished, self._needrun)
if exclude_finished:
return filterfalse(self.finished, self._needrun)
else:
return iter(self._needrun)

@property
def local_needrun_jobs(self):
"""Iterate over all jobs that need to be run and are marked as local."""
return filter(lambda job: job.is_local, self.needrun_jobs)
return filter(lambda job: job.is_local, self.needrun_jobs())

@property
def finished_jobs(self):
Expand Down Expand Up @@ -1233,18 +1239,18 @@ def update_priority(self):
lambda job: job.rule in self.priorityrules
or not self.priorityfiles.isdisjoint(job.output)
)
for job in self.needrun_jobs:
for job in self.needrun_jobs():
self._priority[job] = job.rule.priority
for job in self.bfs(
self.dependencies,
*filter(prioritized, self.needrun_jobs),
*filter(prioritized, self.needrun_jobs()),
stop=self.noneedrun_finished,
):
self._priority[job] = Job.HIGHEST_PRIORITY

def update_groups(self):
groups = dict()
for job in self.needrun_jobs:
for job in self.needrun_jobs():
if job.group is None:
continue
stop = lambda j: j.group != job.group
Expand Down Expand Up @@ -1315,7 +1321,7 @@ def update_ready(self, jobs=None):
"""

if jobs is None:
jobs = self.needrun_jobs
jobs = self.needrun_jobs()

potential_new_ready_jobs = False
candidate_groups = set()
Expand Down Expand Up @@ -1365,6 +1371,8 @@ def postprocess(
self.cleanup()
self.update_jobids()
if update_needrun:
self.update_container_imgs()
self.update_conda_envs()
self.update_needrun()
self.update_priority()
self.handle_pipes_and_services()
Expand Down Expand Up @@ -1393,7 +1401,7 @@ def handle_pipes_and_services(self):
one consumer"""

visited = set()
for job in self.needrun_jobs:
for job in self.needrun_jobs():
candidate_groups = set()
if job.group is not None:
candidate_groups.add(job.group)
Expand Down Expand Up @@ -2217,7 +2225,7 @@ def archive(self, path):
if os.path.exists(path):
raise WorkflowError("Archive already exists:\n" + path)

self.create_conda_envs(forceall=True)
self.create_conda_envs()

try:
workdir = Path(os.path.abspath(os.getcwd()))
Expand Down Expand Up @@ -2342,12 +2350,12 @@ def stats(self):
from tabulate import tabulate

rules = Counter()
rules.update(job.rule for job in self.needrun_jobs)
rules.update(job.rule for job in self.needrun_jobs())
rules.update(job.rule for job in self.finished_jobs)

max_threads = defaultdict(int)
min_threads = defaultdict(lambda: sys.maxsize)
for job in chain(self.needrun_jobs, self.finished_jobs):
for job in chain(self.needrun_jobs(), self.finished_jobs):
max_threads[job.rule] = max(max_threads[job.rule], job.threads)
min_threads[job.rule] = min(min_threads[job.rule], job.threads)
rows = [
Expand Down
56 changes: 34 additions & 22 deletions snakemake/deployment/conda.py
Expand Up @@ -32,7 +32,13 @@

from snakemake.exceptions import CreateCondaEnvironmentException, WorkflowError
from snakemake.logging import logger
from snakemake.common import is_local_file, parse_uri, strip_prefix, ON_WINDOWS
from snakemake.common import (
is_local_file,
lazy_property,
parse_uri,
strip_prefix,
ON_WINDOWS,
)
from snakemake import utils
from snakemake.deployment import singularity, containerize
from snakemake.io import (
Expand Down Expand Up @@ -65,30 +71,12 @@ def __init__(
container_img=None,
cleanup=None,
):
self._conda = Conda(container_img)

self.file = None
self.name = None
self.post_deploy_file = None
self.pin_file = None
self.file = env_file
if env_file is not None:
self.file = infer_source_file(env_file)

deploy_file = Path(self.file.get_path_or_uri()).with_suffix(
".post-deploy.sh"
)
if deploy_file.exists():
self.post_deploy_file = infer_source_file(deploy_file)

pin_file = Path(self.file.get_path_or_uri()).with_suffix(
f".{self._conda.platform}.pin.txt"
)

if pin_file.exists():
self.pin_file = infer_source_file(pin_file)
self.name = env_name
if env_name is not None:
assert env_file is None, "bug: both env_file and env_name specified"
self.name = env_name

self.frontend = workflow.conda_frontend
self.workflow = workflow
Expand All @@ -109,6 +97,30 @@ def __init__(
self._cleanup = cleanup
self._singularity_args = workflow.singularity_args

@lazy_property
def conda(self):
return Conda(self._container_img)

@lazy_property
def pin_file(self):
pin_file = Path(self.file.get_path_or_uri()).with_suffix(
f".{self.conda.platform}.pin.txt"
)

if pin_file.exists():
return infer_source_file(pin_file)
else:
return None

@lazy_property
def post_deploy_file(self):
if self.file:
deploy_file = Path(self.file.get_path_or_uri()).with_suffix(
".post-deploy.sh"
)
if deploy_file.exists():
return infer_source_file(deploy_file)

def _get_content(self):
if self.is_named:
from snakemake.shell import shell
Expand Down Expand Up @@ -334,7 +346,7 @@ def execute_deployment_script(self, env_file, deploy_file):
)
)
shell.check_output(
self._conda.shellcmd(self.address, "sh {}".format(deploy_file)),
self.conda.shellcmd(self.address, "sh {}".format(deploy_file)),
stderr=subprocess.STDOUT,
)

Expand Down
67 changes: 48 additions & 19 deletions snakemake/deployment/singularity.py
Expand Up @@ -31,25 +31,7 @@ def __init__(self, url, dag, is_containerized):
"Invalid singularity image URL containing " "whitespace."
)

if not shutil.which("singularity"):
raise WorkflowError(
"The singularity command has to be "
"available in order to use singularity "
"integration."
)
try:
v = subprocess.check_output(
["singularity", "--version"], stderr=subprocess.PIPE
).decode()
except subprocess.CalledProcessError as e:
raise WorkflowError(
"Failed to get singularity version:\n{}".format(e.stderr.decode())
)
v = v.rsplit(" ", 1)[-1]
if v.startswith("v"):
v = v[1:]
if not LooseVersion(v) >= LooseVersion("2.4.1"):
raise WorkflowError("Minimum singularity version is 2.4.1.")
self.singularity = Singularity()

self.url = url
self._img_dir = dag.workflow.persistence.container_img_path
Expand All @@ -66,6 +48,7 @@ def hash(self):
return md5hash.hexdigest()

def pull(self, dryrun=False):
self.singularity.check()
if self.is_local:
return
if dryrun:
Expand Down Expand Up @@ -150,3 +133,49 @@ def shellcmd(
)
logger.debug(cmd)
return cmd


class Singularity:
instance = None

def __new__(cls):
if cls.instance is not None:
return cls.instance
else:
inst = super().__new__(cls)
cls.instance = inst
return inst

def __init__(self):
self.checked = False
self._version = None

@property
def version(self):
assert (
self._version is not None
), "bug: singularity version accessed before check() has been called"
return self._version

def check(self):
if not self.checked:
if not shutil.which("singularity"):
raise WorkflowError(
"The singularity command has to be "
"available in order to use singularity "
"integration."
)
try:
v = subprocess.check_output(
["singularity", "--version"], stderr=subprocess.PIPE
).decode()
except subprocess.CalledProcessError as e:
raise WorkflowError(
"Failed to get singularity version:\n{}".format(e.stderr.decode())
)
v = v.rsplit(" ", 1)[-1]
if v.startswith("v"):
v = v[1:]
if not LooseVersion(v) >= LooseVersion("2.4.1"):
raise WorkflowError("Minimum singularity version is 2.4.1.")
self._version = v
1 change: 1 addition & 0 deletions snakemake/executors/__init__.py
Expand Up @@ -286,6 +286,7 @@ def general_args(self):
"--no-hooks",
"--nolock",
"--ignore-incomplete",
w2a("rerun_triggers"),
w2a("cleanup_scripts", flag="--skip-script-cleanup"),
w2a("shadow_prefix"),
w2a("use_conda"),
Expand Down
4 changes: 3 additions & 1 deletion snakemake/jobs.py
Expand Up @@ -1676,7 +1676,9 @@ def format_files(files):
if self.params_changed:
s.append("Params have changed since last execution")
if self.software_stack_changed:
s.append("Software stack has changed since last execution")
s.append(
"Software environment definition has changed since last execution"
)

s = "; ".join(s)
if self.finished:
Expand Down
2 changes: 1 addition & 1 deletion snakemake/persistence.py
Expand Up @@ -502,7 +502,7 @@ def _record_path(self, subject, id):

def all_outputfiles(self):
# we only look at output files that will be updated
return jobfiles(self.dag.needrun_jobs, "output")
return jobfiles(self.dag.needrun_jobs(), "output")

def all_inputfiles(self):
# we consider all input files, also of not running jobs
Expand Down
2 changes: 1 addition & 1 deletion snakemake/scheduler.py
Expand Up @@ -403,7 +403,7 @@ def remaining_jobs(self):
"""Return jobs to be scheduled including not yet ready ones."""
return [
job
for job in self.dag.needrun_jobs
for job in self.dag.needrun_jobs()
if job not in self.running and not self.dag.finished(job)
]

Expand Down

0 comments on commit 503c70c

Please sign in to comment.