Skip to content

Commit

Permalink
perf: more extensive caching of source files, including wrappers. (#1182
Browse files Browse the repository at this point in the history
)

* initial draft of more caching of source files

* fixes

* fixes

* add retry behavior

* fixes

* fixes

* current basedir joining

* fixes

* fix config schema handling

* fix

* fix path

* fix

* fix path simplification

* fmt

* fix lints

* fix lints

* handle different path types

* fixes

* fix arg name

* handle local file URLs

* fix

* fixes

* fixes

* remove retry code

* fix archive

* handle is_local

* exception handling fix

* dbg

* fixes

* gitlab support, docs

* minor
  • Loading branch information
johanneskoester committed Sep 24, 2021
1 parent 75a544b commit bdb75f8
Show file tree
Hide file tree
Showing 17 changed files with 538 additions and 204 deletions.
18 changes: 14 additions & 4 deletions docs/snakefiles/deployment.rst
Expand Up @@ -69,14 +69,20 @@ Consider the following example:
configfile: "config/config.yaml"
module dna_seq:
snakefile: "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/Snakefile"
config: config
snakefile:
# here, it is also possible to provide a plain raw URL like "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/workflow/Snakefile"
github("snakemake-workflows/dna-seq-gatk-variant-calling", path="workflow/Snakefile" tag="v2.0.1")
config:
config
use rule * from dna_seq
First, we load a local configuration file.
Next, we define the module ``dna_seq`` to be loaded from the URL ``https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/blob/v2.0.1/Snakefile``, while using the contents of the local configuration file.
Next, we define the module ``dna_seq`` to be loaded from the URL ``https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/workflow/Snakefile``, while using the contents of the local configuration file.
Note that it is possible to either specify the full URL pointing to the raw Snakefile as a string or to use the github marker as done here.
With the latter, Snakemake can however cache the used source files persistently (if a tag is given), such that they don't have to be downloaded on each invocation.
Finally we declare all rules of the dna_seq module to be used.

This kind of deployment is equivalent to just cloning the original repository and modifying the configuration in it.
However, the advantage here is that we are (a) able to easily extend of modify the workflow, while making the changes transparent, and (b) we can store this workflow in a separate (e.g. private) git repository, along with for example configuration and meta data, without the need to duplicate the workflow code.
Finally, we are always able to later combine another module into the current workflow, e.g. when further kinds of analyses are needed.
Expand All @@ -92,7 +98,9 @@ For example, we can easily add another rule to extend the given workflow:
configfile: "config/config.yaml"
module dna_seq:
snakefile: "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/Snakefile"
snakefile:
# here, it is also possible to provide a plain raw URL like "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/workflow/Snakefile"
github("snakemake-workflows/dna-seq-gatk-variant-calling", path="workflow/Snakefile" tag="v2.0.1")
config: config
use rule * from dna_seq
Expand All @@ -106,6 +114,8 @@ For example, we can easily add another rule to extend the given workflow:
notebook:
"notebooks/plot-vafs.py.ipynb"
Moreover, it is possible to further extend the workflow with other modules, thereby generating an integrative analysis.

----------------------------------
Uploading workflows to WorkflowHub
----------------------------------
Expand Down
39 changes: 37 additions & 2 deletions docs/snakefiles/modularization.rst
Expand Up @@ -126,12 +126,14 @@ With Snakemake 6.0 and later, it is possible to define external workflows as mod
min_version("6.0")
module other_workflow:
snakefile: "other_workflow/Snakefile"
snakefile:
# here, plain paths, URLs and the special markers for code hosting providers (see below) are possible.
"other_workflow/Snakefile"
use rule * from other_workflow as other_*
The first statement registers the external workflow as a module, by defining the path to the main snakefile.
The snakefile property of the module can either take a local path or a HTTP/HTTPS url.
Here, plain paths, HTTP/HTTPS URLs and special markers for code hosting providers like Github or Gitlab are possible (see :ref:`snakefile-code-hosting-providers`).
The second statement declares all rules of that module to be used in the current one.
Thereby, the ``as other_*`` at the end renames all those rule with a common prefix.
This can be handy to avoid rule name conflicts (note that rules from modules can otherwise overwrite rules from your current workflow or other modules).
Expand All @@ -149,6 +151,7 @@ It is possible to overwrite the global config dictionary for the module, which i
configfile: "config/config.yaml"
module other_workflow:
# here, plain paths, URLs and the special markers for code hosting providers (see below) are possible.
snakefile: "other_workflow/Snakefile"
config: config["other-workflow"]
Expand All @@ -167,6 +170,7 @@ This modification can be performed after a general import, and will overwrite an
min_version("6.0")
module other_workflow:
# here, plain paths, URLs and the special markers for code hosting providers (see below) are possible.
snakefile: "other_workflow/Snakefile"
config: config["other-workflow"]
Expand Down Expand Up @@ -261,3 +265,34 @@ This function automatically determines the absolute path to the file (here ``../
When executing, snakemake first tries to create (or update, if necessary) ``test.txt`` (and all other possibly mentioned dependencies) by executing the subworkflow.
Then the current workflow is executed.
This can also happen recursively, since the subworkflow may have its own subworkflows as well.


.. _snakefile-code-hosting-providers:

----------------------
Code hosting providers
----------------------

To obtain the correct URL to an external source code resource (e.g. a snakefile, see :ref:`snakefiles-modules`), Snakemake provides markers for code hosting providers.
Currently, Github

.. code-block:: python
github("owner/repo", path="workflow/Snakefile", tag="v1.0.0")
and Gitlab are supported:

.. code-block:: python
gitlab("owner/repo", path="workflow/Snakefile", tag="v1.0.0")
For the latter, it is also possible to specify an alternative host, e.g.

.. code-block:: python
gitlab("owner/repo", path="workflow/Snakefile", tag="v1.0.0", host="somecustomgitlab.org")
While specifying a tag is highly encouraged, it is alternatively possible to specify a `commit` or a `branch` via respective keyword arguments.
Note that only when specifying a tag or a commit, Snakemake is able to persistently cache the source, thereby avoiding to repeatedly query it in case of multiple executions.
1 change: 1 addition & 0 deletions snakemake/cwl.py
Expand Up @@ -35,6 +35,7 @@ def cwl(
use_singularity,
bench_record,
jobid,
runtime_sourcecache_path,
):
"""
Load cwl from the given basedir + path and execute it.
Expand Down
24 changes: 15 additions & 9 deletions snakemake/deployment/conda.py
Expand Up @@ -5,6 +5,7 @@

import os
import re
from snakemake.sourcecache import LocalGitFile, LocalSourceFile, infer_source_file
import subprocess
import tempfile
from urllib.request import urlopen
Expand Down Expand Up @@ -45,7 +46,7 @@ class Env:
def __init__(
self, env_file, workflow, env_dir=None, container_img=None, cleanup=None
):
self.file = env_file
self.file = infer_source_file(env_file)

self.frontend = workflow.conda_frontend
self.workflow = workflow
Expand Down Expand Up @@ -161,7 +162,9 @@ def create_archive(self):
try:
# Download
logger.info(
"Downloading packages for conda environment {}...".format(self.file)
"Downloading packages for conda environment {}...".format(
self.file.get_path_or_uri()
)
)
os.makedirs(env_archive, exist_ok=True)
try:
Expand Down Expand Up @@ -216,11 +219,16 @@ def create(self, dryrun=False):
env_file = self.file
tmp_file = None

if not is_local_file(env_file) or env_file.startswith("git+file:/"):
if not isinstance(env_file, LocalSourceFile) or isinstance(
env_file, LocalGitFile
):
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp:
# write to temp file such that conda can open it
tmp.write(self.content)
env_file = tmp.name
tmp_file = tmp.name
else:
env_file = env_file.get_path_or_uri()

env_hash = self.hash
env_path = self.path
Expand Down Expand Up @@ -258,13 +266,13 @@ def create(self, dryrun=False):
if dryrun:
logger.info(
"Incomplete Conda environment {} will be recreated.".format(
utils.simplify_path(self.file)
self.file.simplify_path()
)
)
else:
logger.info(
"Removing incomplete Conda environment {}...".format(
utils.simplify_path(self.file)
self.file.simplify_path()
)
)
shutil.rmtree(env_path, ignore_errors=True)
Expand All @@ -274,15 +282,13 @@ def create(self, dryrun=False):
if dryrun:
logger.info(
"Conda environment {} will be created.".format(
utils.simplify_path(self.file)
self.file.simplify_path()
)
)
return env_path
conda = Conda(self._container_img)
logger.info(
"Creating conda environment {}...".format(
utils.simplify_path(self.file)
)
"Creating conda environment {}...".format(self.file.simplify_path())
)
# Check if env archive exists. Use that if present.
env_archive = self.archive_file
Expand Down
5 changes: 5 additions & 0 deletions snakemake/exceptions.py
Expand Up @@ -164,6 +164,11 @@ def __init__(self, *args, lineno=None, snakefile=None, rule=None):
self.rule = rule


class SourceFileError(WorkflowError):
def __init__(self, msg):
super().__init__("Error in source file definition: {}".format(msg))


class WildcardError(WorkflowError):
pass

Expand Down
22 changes: 16 additions & 6 deletions snakemake/executors/__init__.py
Expand Up @@ -515,6 +515,7 @@ def job_args_and_prepare(self, job):
self.workflow.edit_notebook,
self.workflow.conda_base_path,
job.rule.basedir,
self.workflow.sourcecache.runtime_cache_path,
)

def run_single_job(self, job):
Expand Down Expand Up @@ -2296,6 +2297,7 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
):
"""
Wrapper around the run method that handles exceptions and benchmarking.
Expand Down Expand Up @@ -2376,6 +2378,7 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
)
else:
# The benchmarking is started here as we have a run section
Expand Down Expand Up @@ -2406,6 +2409,7 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
)
# Store benchmark record for this iteration
bench_records.append(bench_record)
Expand Down Expand Up @@ -2434,20 +2438,26 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
)
except (KeyboardInterrupt, SystemExit) as e:
# Re-raise the keyboard interrupt in order to record an error in the
# scheduler but ignore it
raise e
except (Exception, BaseException) as ex:
log_verbose_traceback(ex)
# this ensures that exception can be re-raised in the parent thread
lineno, file = get_exception_origin(ex, linemaps)
raise RuleException(
format_error(
ex, lineno, linemaps=linemaps, snakefile=file, show_traceback=True
origin = get_exception_origin(ex, linemaps)
if origin is not None:
log_verbose_traceback(ex)
lineno, file = origin
raise RuleException(
format_error(
ex, lineno, linemaps=linemaps, snakefile=file, show_traceback=True
)
)
)
else:
# some internal bug, just reraise
raise ex

if benchmark is not None:
try:
Expand Down
4 changes: 2 additions & 2 deletions snakemake/io.py
Expand Up @@ -1345,11 +1345,11 @@ def git_content(git_file):
"""
This function will extract a file from a git repository, one located on
the filesystem.
Expected format is git+file:///path/to/your/repo/path_to_file@@version
Expected format is git+file:///path/to/your/repo/path_to_file@version
Args:
env_file (str): consist of path to repo, @, version and file information
Ex: git+file:////home/smeds/snakemake-wrappers/bio/fastqc/wrapper.py@0.19.3
Ex: git+file:///home/smeds/snakemake-wrappers/bio/fastqc/wrapper.py@0.19.3
Returns:
file content or None if the expected format isn't meet
"""
Expand Down
11 changes: 9 additions & 2 deletions snakemake/notebook.py
Expand Up @@ -10,6 +10,7 @@
from snakemake.logging import logger
from snakemake.common import is_local_file
from snakemake.common import ON_WINDOWS
from snakemake.sourcecache import SourceCache

KERNEL_STARTED_RE = re.compile(r"Kernel started: (?P<kernel_id>\S+)")
KERNEL_SHUTDOWN_RE = re.compile(r"Kernel shutdown: (?P<kernel_id>\S+)")
Expand Down Expand Up @@ -152,6 +153,7 @@ def get_preamble(self):
self.bench_iteration,
self.cleanup_scripts,
self.shadow_dir,
self.is_local,
preamble_addendum=preamble_addendum,
)

Expand Down Expand Up @@ -218,7 +220,8 @@ def notebook(
bench_iteration,
cleanup_scripts,
shadow_dir,
edit=None,
edit,
runtime_sourcecache_path,
):
"""
Load a script from the given basedir + path and execute it.
Expand Down Expand Up @@ -251,9 +254,12 @@ def notebook(
)

if not draft:
path, source, language = get_source(path, basedir, wildcards, params)
path, source, language, is_local = get_source(
path, SourceCache(runtime_sourcecache_path), basedir, wildcards, params
)
else:
source = None
is_local = True

exec_class = get_exec_class(language)

Expand All @@ -280,6 +286,7 @@ def notebook(
bench_iteration,
cleanup_scripts,
shadow_dir,
is_local,
)

if draft:
Expand Down
12 changes: 6 additions & 6 deletions snakemake/parser.py
Expand Up @@ -507,7 +507,7 @@ def start(self):
"resources, log, version, rule, conda_env, container_img, "
"singularity_args, use_singularity, env_modules, bench_record, jobid, "
"is_shell, bench_iteration, cleanup_scripts, shadow_dir, edit_notebook, "
"conda_base_path, basedir):".format(
"conda_base_path, basedir, runtime_sourcecache_path):".format(
rulename=self.rulename
if self.rulename is not None
else self.snakefile.rulecount
Expand Down Expand Up @@ -608,7 +608,7 @@ def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir"
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)


Expand All @@ -621,7 +621,7 @@ def args(self):
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, "
"edit_notebook"
"edit_notebook, runtime_sourcecache_path"
)


Expand All @@ -634,7 +634,7 @@ def args(self):
", input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, workflow.wrapper_prefix, jobid, bench_iteration, "
"cleanup_scripts, shadow_dir"
"cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)


Expand All @@ -645,7 +645,7 @@ class CWL(Script):
def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, use_singularity, bench_record, jobid"
"config, rule, use_singularity, bench_record, jobid, runtime_sourcecache_path"
)


Expand Down Expand Up @@ -1157,7 +1157,7 @@ def python(self, token):

class Snakefile:
def __init__(self, path, workflow, rulecount=0):
self.path = path
self.path = path.get_path_or_uri()
self.file = workflow.sourcecache.open(path)
self.tokens = tokenize.generate_tokens(self.file.readline)
self.rulecount = rulecount
Expand Down

0 comments on commit bdb75f8

Please sign in to comment.