Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: specify conda environments via their name #1340

Merged
merged 14 commits into from Jan 25, 2022
33 changes: 33 additions & 0 deletions docs/snakefiles/deployment.rst
Expand Up @@ -246,6 +246,8 @@ with the following `environment definition <https://conda.io/projects/conda/en/l

The path to the environment definition is interpreted as **relative to the Snakefile that contains the rule** (unless it is an absolute path, which is discouraged).

Instead of using a concrete path, it is also possible to provide a path containing wildcards (which must also occur in the output files of the rule), analogous to the specification of input files.

.. sidebar:: Note

Note that conda environments are only used with ``shell``, ``script`` and the ``wrapper`` directive, not the ``run`` directive.
Expand All @@ -260,6 +262,37 @@ Note that you need to clean up environments manually for now. However, in many c

Conda deployment also works well for offline or air-gapped environments. Running ``snakemake --use-conda --conda-create-envs-only`` will only install the required conda environments without running the full workflow. Subsequent runs with ``--use-conda`` will make use of the local environments without requiring internet access.


.. _conda_named_env:

Using already existing named conda environments
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Sometimes it can be handy to refer to an already existing named conda environment from a rule, instead of defining a new one from scratch.
Importantly, one should be aware that this can **hamper reproducibility**, because the workflow then relies on this environment to be present
**in exactly the same way** on any new system where the workflow is executed. Essentially, you will have to take care of this manually in such a case.
Therefore, the approach using environment definition files described above is highly recommended and preferred.

Nevertheless, in case you are still sure that you want to use an existing named environment, it can simply be put into the conda directive, e.g.

.. code-block:: python
rule NAME:
input:
"table.txt"
output:
"plots/myplot.pdf"
conda:
"some-env-name"
script:
"scripts/plot-stuff.R"

For such a rule, Snakemake will just activate the given environment, instead of automatically deploying anything.
Instead of using a concrete name, it is also possible to provide a name containing wildcards (which must also occur in the output files of the rule), analogous to the specification of input files.

Note that Snakemake distinguishes file based environments from named ones as follows:
if the given specification ends on ``.yaml`` or ``.yml``, Snakemake assumes it to be a path to an environment definition file; otherwise, it assumes the given specification
to be the name of an existing environment.

.. _singularity:


Expand Down
19 changes: 9 additions & 10 deletions snakemake/dag.py
Expand Up @@ -275,37 +275,36 @@ def cleanup(self):
def create_conda_envs(
self, dryrun=False, forceall=False, init_only=False, quiet=False
):
# First deduplicate based on job.conda_env_file
# First deduplicate based on job.conda_env_spec
jobs = self.jobs if forceall else self.needrun_jobs
env_set = {
(job.conda_env_file, job.container_img_url)
(job.conda_env_spec, job.container_img_url)
for job in jobs
if job.conda_env_file
if job.conda_env_spec
}
# Then based on md5sum values
self.conda_envs = dict()
for (env_file, simg_url) in env_set:
for (env_spec, simg_url) in env_set:
simg = None
if simg_url and self.workflow.use_singularity:
assert (
simg_url in self.container_imgs
), "bug: must first pull singularity images"
simg = self.container_imgs[simg_url]
env = conda.Env(
env_file,
env = env_spec.get_conda_env(
self.workflow,
container_img=simg,
cleanup=self.workflow.conda_cleanup_pkgs,
)
self.conda_envs[(env_file, simg_url)] = env
self.conda_envs[(env_spec, simg_url)] = env

if not init_only:
for env in self.conda_envs.values():
if not dryrun or not quiet:
if (not dryrun or not quiet) and not env.is_named:
env.create(dryrun)

def pull_container_imgs(self, dryrun=False, forceall=False, quiet=False):
# First deduplicate based on job.conda_env_file
# First deduplicate based on job.conda_env_spec
jobs = self.jobs if forceall else self.needrun_jobs
img_set = {
(job.container_img_url, job.is_containerized)
Expand Down Expand Up @@ -2075,7 +2074,7 @@ def add(path):
logger.info("Archiving conda environments...")
envs = set()
for job in self.jobs:
if job.conda_env_file:
if job.conda_env_spec:
env_archive = job.archive_conda_env()
envs.add(env_archive)
for env in envs:
Expand Down
184 changes: 156 additions & 28 deletions snakemake/deployment/conda.py
Expand Up @@ -22,14 +22,15 @@
from enum import Enum
import threading
import shutil
from abc import ABC, abstractmethod


from snakemake.exceptions import CreateCondaEnvironmentException, WorkflowError
from snakemake.logging import logger
from snakemake.common import is_local_file, parse_uri, strip_prefix, ON_WINDOWS
from snakemake import utils
from snakemake.deployment import singularity, containerize
from snakemake.io import git_content
from snakemake.io import IOFile, apply_wildcards, git_content, _IOFile


class CondaCleanupMode(Enum):
Expand All @@ -45,9 +46,21 @@ class Env:
"""Conda environment from a given specification file."""

def __init__(
self, env_file, workflow, env_dir=None, container_img=None, cleanup=None
self,
workflow,
env_file=None,
env_name=None,
env_dir=None,
container_img=None,
cleanup=None,
):
self.file = infer_source_file(env_file)
self.file = None
self.name = None
if env_file is not None:
self.file = infer_source_file(env_file)
if env_name is not None:
assert env_file is None, "bug: both env_file and env_name specified"
self.name = env_name

self.frontend = workflow.conda_frontend
self.workflow = workflow
Expand All @@ -68,9 +81,20 @@ def __init__(
self._singularity_args = workflow.singularity_args

def _get_content(self):
return self.workflow.sourcecache.open(self.file, "rb").read()
if self.is_named:
from snakemake.shell import shell

content = shell.check_output(
"conda env export {}".format(self.address_argument),
stderr=subprocess.STDOUT,
universal_newlines=True,
)
return content.encode()
else:
return self.workflow.sourcecache.open(self.file, "rb").read()

def _get_content_deploy(self):
self.check_is_file_based()
deploy_file = Path(self.file).with_suffix(".post-deploy.sh")
return self.workflow.sourcecache.open(deploy_file, "rb").read()

Expand Down Expand Up @@ -123,29 +147,50 @@ def content_hash(self):

@property
def is_containerized(self):

if not self._container_img:
return False
return self._container_img.is_containerized

@property
def path(self):
def is_named(self):
return self.file is None

def check_is_file_based(self):
assert (
self.file is not None
), "bug: trying to access conda env file based functionality for named environment"

@property
def address(self):
"""Path to directory of the conda environment.

First tries full hash, if it does not exist, (8-prefix) is used
as default.

"""
hash = self.hash
env_dir = self._env_dir
get_path = lambda h: os.path.join(env_dir, h)
hash_candidates = [hash[:8], hash] # [0] is the old fallback hash (shortened)
exists = [os.path.exists(get_path(h)) for h in hash_candidates]
if self.is_containerized or exists[1] or (not exists[0]):
# containerizes, full hash exists or fallback hash does not exist: use full hash
return get_path(hash_candidates[1])
# use fallback hash
return get_path(hash_candidates[0])
if self.is_named:
return self.name
else:
hash = self.hash
env_dir = self._env_dir
get_path = lambda h: os.path.join(env_dir, h)
hash_candidates = [
hash[:8],
hash,
] # [0] is the old fallback hash (shortened)
exists = [os.path.exists(get_path(h)) for h in hash_candidates]
if self.is_containerized or exists[1] or (not exists[0]):
# containerizes, full hash exists or fallback hash does not exist: use full hash
return get_path(hash_candidates[1])
# use fallback hash
return get_path(hash_candidates[0])

@property
def address_argument(self):
if self.is_named:
return "--name '{}'".format(self.address)
else:
return "--prefix '{}'".format(self.address)

@property
def archive_file(self):
Expand All @@ -167,6 +212,8 @@ def create_archive(self):
# importing requests locally because it interferes with instantiating conda environments
import requests

self.check_is_file_based()

env_archive = self.archive_file
if os.path.exists(env_archive):
return env_archive
Expand All @@ -181,7 +228,7 @@ def create_archive(self):
os.makedirs(env_archive, exist_ok=True)
try:
out = shell.check_output(
"conda list --explicit --prefix '{}'".format(self.path),
"conda list --explicit {}".format(self.address_argument),
stderr=subprocess.STDOUT,
universal_newlines=True,
)
Expand Down Expand Up @@ -240,14 +287,16 @@ def execute_deployment_script(self, env_file, deploy_file):
)
conda = Conda(self._container_img)
shell.check_output(
conda.shellcmd(self.path, "sh {}".format(deploy_file)),
conda.shellcmd(self.address, "sh {}".format(deploy_file)),
stderr=subprocess.STDOUT,
)

def create(self, dryrun=False):
"""Create the conda enviroment."""
from snakemake.shell import shell

self.check_is_file_based()

# Read env file and create hash.
env_file = self.file
deploy_file = None
Expand Down Expand Up @@ -279,8 +328,7 @@ def create(self, dryrun=False):
if Path(env_file).with_suffix(".post-deploy.sh").exists():
deploy_file = Path(env_file).with_suffix(".post-deploy.sh")

env_hash = self.hash
env_path = self.path
env_path = self.address

if self.is_containerized:
if not dryrun:
Expand Down Expand Up @@ -462,11 +510,17 @@ def get_singularity_envvars(self):

def __hash__(self):
# this hash is only for object comparison, not for env paths
return hash(self.file)
if self.is_named:
return hash(self.name)
else:
return hash(self.file)

def __eq__(self, other):
if isinstance(other, Env):
return self.file == other.file
if self.is_named:
return self.name == other.name
else:
return self.file == other.file
return False


Expand Down Expand Up @@ -583,24 +637,98 @@ def bin_path(self):
else:
return os.path.join(self.prefix_path, "bin")

def shellcmd(self, env_path, cmd):
def shellcmd(self, env_address, cmd):
# get path to activate script
activate = os.path.join(self.bin_path(), "activate")

if ON_WINDOWS:
activate = activate.replace("\\", "/")
env_path = env_path.replace("\\", "/")
env_address = env_address.replace("\\", "/")

return "source {} '{}'; {}".format(activate, env_path, cmd)
return "source {} '{}'; {}".format(activate, env_address, cmd)

def shellcmd_win(self, env_path, cmd):
def shellcmd_win(self, env_address, cmd):
"""Prepend the windows activate bat script."""
# get path to activate script
activate = os.path.join(self.bin_path(), "activate.bat").replace("\\", "/")
env_path = env_path.replace("\\", "/")
env_address = env_address.replace("\\", "/")

return '"{}" "{}"&&{}'.format(activate, env_path, cmd)
return '"{}" "{}"&&{}'.format(activate, env_address, cmd)


def is_mamba_available():
return shutil.which("mamba") is not None


class CondaEnvSpec(ABC):
@abstractmethod
def apply_wildcards(self, wildcards):
...

@abstractmethod
def get_conda_env(self, workflow, env_dir=None, container_img=None, cleanup=None):
...

@abstractmethod
def check(self):
...

@property
def is_file(self):
return False


class CondaEnvFileSpec(CondaEnvSpec):
def __init__(self, filepath: str, rule=None):
if isinstance(filepath, _IOFile):
self.file = filepath
else:
self.file = IOFile(filepath, rule=rule)

def apply_wildcards(self, wildcards):
filepath = self.file.apply_wildcards(wildcards)
if is_local_file(filepath):
# Normalize 'file:///my/path.yml' to '/my/path.yml'
filepath = parse_uri(filepath).uri_path
return CondaEnvFileSpec(filepath)

def check(self):
self.file.check()

def get_conda_env(self, workflow, env_dir=None, container_img=None, cleanup=None):
return Env(
workflow,
env_file=self.file,
env_dir=env_dir,
container_img=container_img,
cleanup=cleanup,
)

@property
def is_file(self):
return True


class CondaEnvNameSpec(CondaEnvSpec):
def __init__(self, name: str):
self.name = name

def apply_wildcards(self, wildcards):
return CondaEnvNameSpec(apply_wildcards(self.name, wildcards))
johanneskoester marked this conversation as resolved.
Show resolved Hide resolved

def get_conda_env(self, workflow, env_dir=None, container_img=None, cleanup=None):
return Env(
workflow,
env_name=self.name,
env_dir=env_dir,
container_img=container_img,
cleanup=cleanup,
)

def check(self):
# not a file, nothing to check here
pass


def is_conda_env_file(spec: str):
return spec.endswith(".yaml") or spec.endswith(".yml")