diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c4ac13d9f..8127d0a0b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -114,18 +114,19 @@ jobs: echo CONTAINER_IMAGE=snakemake/snakemake:$GITHUB_SHA >> $GITHUB_ENV # TODO reactivate in April (we have no free resources left this month) - # - name: Test Kubernetes execution - # if: env.GCP_AVAILABLE - # env: - # CI: true - # run: | - # # activate conda env - # export PATH="/usr/share/miniconda/bin:$PATH" - # source activate snakemake - - # pytest -s -v -x tests/test_kubernetes.py - - - name: Test AWS execution + - name: Test Kubernetes execution + if: env.GCP_AVAILABLE + env: + CI: true + run: | + # activate conda env + export PATH="/usr/share/miniconda/bin:$PATH" + source activate snakemake + + pytest -s -v -x tests/test_kubernetes.py + + # TODO temporarily deactivate and fix in separate PR. + - name: Test Tibanna (AWS) execution if: env.AWS_AVAILABLE env: CI: true @@ -134,16 +135,16 @@ jobs: export PATH="/usr/share/miniconda/bin:$PATH" source activate snakemake - pytest -v -x tests/test_tibanna.py + # pytest -v -x -s tests/test_tibanna.py # TODO reactivate in April (we have no free resources left this month) - # - name: Test Google Life Sciences Executor - # if: env.GCP_AVAILABLE - # run: | - # # activate conda env - # export PATH="/usr/share/miniconda/bin:$PATH" - # source activate snakemake - # pytest -s -v -x tests/test_google_lifesciences.py + - name: Test Google Life Sciences Executor + if: env.GCP_AVAILABLE + run: | + # activate conda env + export PATH="/usr/share/miniconda/bin:$PATH" + source activate snakemake + pytest -s -v -x tests/test_google_lifesciences.py - name: Test GA4GH TES executor run: | diff --git a/snakemake/__init__.py b/snakemake/__init__.py index 62bc485af..68ba3aed1 100644 --- a/snakemake/__init__.py +++ b/snakemake/__init__.py @@ -436,6 +436,9 @@ def snakemake( # clean up all previously recorded jobids. shell.cleanup() else: + if default_resources is None: + # use full default resources if in cluster or cloud mode + default_resources = DefaultResources(mode="full") if edit_notebook: raise WorkflowError( "Notebook edit mode is only allowed with local execution." @@ -599,6 +602,7 @@ def snakemake( check_envvars=not lint, # for linting, we do not need to check whether requested envvars exist all_temp=all_temp, local_groupid=local_groupid, + keep_metadata=keep_metadata, latency_wait=latency_wait, ) success = True @@ -800,7 +804,6 @@ def snakemake( export_cwl=export_cwl, batch=batch, keepincomplete=keep_incomplete, - keepmetadata=keep_metadata, ) except BrokenPipeError: diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 34ed14b25..109f4a1c0 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -3,6 +3,7 @@ __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" +from abc import abstractmethod import os import sys import contextlib @@ -22,6 +23,7 @@ from functools import partial from itertools import chain from collections import namedtuple +from snakemake.executors.common import format_cli_arg, format_cli_pos_arg, join_cli_args from snakemake.io import _IOFile import random import base64 @@ -44,7 +46,13 @@ SpawnedJobError, CacheMissException, ) -from snakemake.common import Mode, __version__, get_container_image, get_uuid +from snakemake.common import ( + Mode, + __version__, + get_container_image, + get_uuid, + lazy_property, +) # TODO move each executor into a separate submodule @@ -54,6 +62,8 @@ def sleep(): # do not sleep on CI. In that case we just want to quickly test everything. if os.environ.get("CI") != "true": time.sleep(10) + else: + time.sleep(1) class AbstractExecutor: @@ -66,7 +76,6 @@ def __init__( printshellcmds=False, printthreads=True, keepincomplete=False, - keepmetadata=True, ): self.workflow = workflow self.dag = dag @@ -76,70 +85,29 @@ def __init__( self.printthreads = printthreads self.latency_wait = workflow.latency_wait self.keepincomplete = keepincomplete - self.keepmetadata = keepmetadata def get_default_remote_provider_args(self): - if self.workflow.default_remote_provider: - return ( - " --default-remote-provider {} " "--default-remote-prefix {} " - ).format( - self.workflow.default_remote_provider.__module__.split(".")[-1], - self.workflow.default_remote_prefix, - ) - return "" - - def _format_key_value_args(self, flag, kwargs): - if kwargs: - return " {} {} ".format( - flag, - " ".join("{}={}".format(key, value) for key, value in kwargs.items()), - ) - return "" - - def get_set_threads_args(self): - return self._format_key_value_args( - "--set-threads", self.workflow.overwrite_threads + return join_cli_args( + [ + self.workflow_property_to_arg("default_remote_prefix"), + self.workflow_property_to_arg("default_remote_provider", attr="name"), + ] ) def get_set_resources_args(self): - if self.workflow.overwrite_resources: - return " --set-resources {} ".format( - " ".join( - "{}:{}={}".format(rule, name, value) - for rule, res in self.workflow.overwrite_resources.items() - for name, value in res.items() - ) - ) - return "" - - def get_set_scatter_args(self): - return self._format_key_value_args( - "--set-scatter", self.workflow.overwrite_scatter + return format_cli_arg( + "--set-resources", + [ + f"{rule}:{name}={value}" + for rule, res in self.workflow.overwrite_resources.items() + for name, value in res.items() + ], + skip=not self.workflow.overwrite_resources, ) def get_default_resources_args(self, default_resources=None): - if default_resources is None: - default_resources = self.workflow.default_resources - if default_resources: - - def fmt(res): - if isinstance(res, str): - res = res.replace('"', r"\"") - return '"{}"'.format(res) - - args = " --default-resources {} ".format( - " ".join(map(fmt, self.workflow.default_resources.args)) - ) - return args - return "" - - def get_local_groupid_arg(self): - return f" --local-groupid {self.workflow.local_groupid} " - - def get_behavior_args(self): - if self.workflow.conda_not_block_search_path_envvars: - return " --conda-not-block-search-path-envvars " - return "" + default_resources = default_resources or self.workflow.default_resources + return format_cli_arg("--default-resources", default_resources.args) def run_jobs(self, jobs, callback=None, submit_callback=None, error_callback=None): """Run a list of jobs that is ready at a given point in time. @@ -222,7 +190,6 @@ def __init__( printshellcmds=False, assume_shared_fs=True, keepincomplete=False, - keepmetadata=False, ): super().__init__( workflow, @@ -231,7 +198,6 @@ def __init__( quiet=quiet, printshellcmds=printshellcmds, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) self.assume_shared_fs = assume_shared_fs self.stats = Stats() @@ -270,7 +236,7 @@ def handle_job_success( ignore_missing_output=ignore_missing_output, latency_wait=self.latency_wait, assume_shared_fs=self.assume_shared_fs, - keep_metadata=self.keepmetadata, + keep_metadata=self.workflow.keep_metadata, ) self.stats.report_job_end(job) @@ -281,110 +247,152 @@ def handle_job_error(self, job, upload_remote=True): latency_wait=self.latency_wait, ) - def get_additional_args(self): + def workflow_property_to_arg( + self, property, flag=None, quote=True, skip=False, invert=False, attr=None + ): + if skip: + return "" + + value = getattr(self.workflow, property) + + if value is not None and attr is not None: + value = getattr(value, attr) + + if flag is None: + flag = f"--{property.replace('_', '-')}" + + if invert and isinstance(value, bool): + value = not value + + return format_cli_arg(flag, value, quote=quote) + + @lazy_property + def general_args(self): """Return a string to add to self.exec_job that includes additional arguments from the command line. This is currently used in the ClusterExecutor and CPUExecutor, as both were using the same code. Both have base class of the RealExecutor. """ - additional = "" - if not self.workflow.cleanup_scripts: - additional += " --skip-script-cleanup " - if self.workflow.shadow_prefix: - additional += " --shadow-prefix {} ".format(self.workflow.shadow_prefix) - if self.workflow.use_conda: - additional += " --use-conda " - if self.workflow.conda_frontend: - additional += " --conda-frontend {} ".format( - self.workflow.conda_frontend - ) - if self.workflow.conda_prefix: - additional += " --conda-prefix {} ".format(self.workflow.conda_prefix) - if self.workflow.conda_base_path and self.assume_shared_fs: - additional += " --conda-base-path {} ".format( - self.workflow.conda_base_path - ) - if self.workflow.use_singularity: - additional += " --use-singularity " - if self.workflow.singularity_prefix: - additional += " --singularity-prefix {} ".format( - self.workflow.singularity_prefix - ) - if self.workflow.singularity_args: - additional += ' --singularity-args "{}"'.format( - self.workflow.singularity_args - ) - if not self.workflow.execute_subworkflows: - additional += " --no-subworkflows " - - if self.workflow.max_threads is not None: - additional += " --max-threads {} ".format(self.workflow.max_threads) - - additional += self.get_set_resources_args() - additional += self.get_set_scatter_args() - additional += self.get_set_threads_args() - additional += self.get_behavior_args() - - if self.workflow.use_env_modules: - additional += " --use-envmodules " - if not self.keepmetadata: - additional += " --drop-metadata " - - return additional - - def format_job_pattern(self, pattern, job=None, **kwargs): - overwrite_workdir = [] - if self.workflow.overwrite_workdir: - overwrite_workdir.extend(("--directory", self.workflow.overwrite_workdir)) - - overwrite_config = [] - if self.workflow.overwrite_configfiles: - # add each of the overwriting configfiles in the original order - if self.workflow.overwrite_configfiles: - overwrite_config.append("--configfiles") - overwrite_config.extend(self.workflow.overwrite_configfiles) - if self.workflow.config_args: - overwrite_config.append("--config") - overwrite_config.extend(self.workflow.config_args) - - printshellcmds = "" - if self.workflow.printshellcmds: - printshellcmds = "-p" - - if not job.is_branched and not job.is_updated: - # Restrict considered rules. This does not work for updated jobs - # because they need to be updated in the spawned process as well. - rules = ["--allowed-rules"] - rules.extend(job.rules) - else: - rules = [] - - target = kwargs.get("target", job.get_targets()) - snakefile = kwargs.get("snakefile", self.snakefile) - cores = kwargs.get("cores", self.cores) - if "target" in kwargs: - del kwargs["target"] - if "snakefile" in kwargs: - del kwargs["snakefile"] - if "cores" in kwargs: - del kwargs["cores"] - - cmd = format( - pattern, - job=job, - attempt=job.attempt, - overwrite_workdir=overwrite_workdir, - overwrite_config=overwrite_config, - printshellcmds=printshellcmds, - workflow=self.workflow, - snakefile=snakefile, - cores=cores, - benchmark_repeats=job.benchmark_repeats if not job.is_group() else None, - target=target, - rules=rules, - **kwargs, + w2a = self.workflow_property_to_arg + + return join_cli_args( + [ + "--force", + "--keep-target-files", + "--keep-remote", + "--max-inventory-time 0", + "--nocolor", + "--notemp", + "--no-hooks", + "--nolock", + "--ignore-incomplete", + w2a("cleanup_scripts", flag="--skip-script-cleanup"), + w2a("shadow_prefix"), + w2a("use_conda"), + w2a("conda_frontend"), + w2a("conda_prefix"), + w2a("conda_base_path", skip=not self.assume_shared_fs), + w2a("use_singularity"), + w2a("singularity_prefix"), + w2a("singularity_args"), + w2a("execute_subworkflows", flag="--no-subworkflows", invert=True), + w2a("max_threads"), + w2a("use_env_modules", flag="--use-envmodules"), + w2a("keep_metadata", flag="--drop-metadata", invert=True), + w2a("wrapper_prefix"), + w2a("overwrite_threads", flag="--set-threads"), + w2a("overwrite_scatter", flag="--set-scatter"), + w2a("local_groupid", skip=self.job_specific_local_groupid), + w2a("conda_not_block_search_path_envvars"), + w2a("overwrite_configfiles", flag="--configfiles"), + w2a("config_args", flag="--config"), + w2a("printshellcmds"), + w2a("latency_wait"), + w2a("scheduler_type", flag="--scheduler"), + format_cli_arg( + "--scheduler-solver-path", + os.path.dirname(sys.executable), + skip=not self.assume_shared_fs, + ), + self.get_set_resources_args(), + self.get_default_remote_provider_args(), + self.get_default_resources_args(), + self.get_workdir_arg(), + format_cli_arg("--mode", self.get_exec_mode()), + ] + ) + + def get_workdir_arg(self): + return self.workflow_property_to_arg("overwrite_workdir", flag="--directory") + + def get_job_args(self, job, **kwargs): + return join_cli_args( + [ + format_cli_pos_arg(kwargs.get("target", self.get_job_targets(job))), + # Restrict considered rules for faster DAG computation. + # This does not work for updated jobs because they need + # to be updated in the spawned process as well. + format_cli_arg( + "--allowed-rules", + job.rules, + quote=False, + skip=job.is_branched or job.is_updated, + ), + # Ensure that a group uses its proper local groupid. + format_cli_arg("--local-groupid", job.jobid, skip=not job.is_group()), + format_cli_arg("--cores", kwargs.get("cores", self.cores)), + format_cli_arg("--attempt", job.attempt), + format_cli_arg("--force-use-threads", not job.is_group()), + ] + ) + + @property + def job_specific_local_groupid(self): + return True + + def get_snakefile(self): + return self.snakefile + + def get_job_targets(self, job): + return job.get_targets() + + @abstractmethod + def get_python_executable(self): + ... + + @abstractmethod + def get_exec_mode(self): + ... + + @abstractmethod + def get_envvar_declarations(self): + ... + + def get_job_exec_prefix(self, job): + return "" + + def get_job_exec_suffix(self, job): + return "" + + def format_job_exec(self, job): + prefix = self.get_job_exec_prefix(job) + if prefix: + prefix += " &&" + suffix = self.get_job_exec_suffix(job) + if suffix: + suffix = f"&& {suffix}" + return join_cli_args( + [ + prefix, + self.get_envvar_declarations(), + self.get_python_executable(), + "-m snakemake", + format_cli_arg("--snakefile", self.get_snakefile()), + self.get_job_args(job), + self.general_args, + suffix, + ] ) - return cmd class TouchExecutor(RealExecutor): @@ -423,7 +431,6 @@ def __init__( use_threads=False, cores=1, keepincomplete=False, - keepmetadata=True, ): super().__init__( workflow, @@ -432,29 +439,8 @@ def __init__( quiet=quiet, printshellcmds=printshellcmds, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, - ) - - self.exec_job = "\\\n".join( - ( - "cd {workflow.workdir_init} && ", - "{sys.executable} -m snakemake {target} --snakefile {snakefile} ", - "--force --cores {cores} --keep-target-files --keep-remote ", - "--attempt {attempt} --scheduler {workflow.scheduler_type} ", - "--force-use-threads --wrapper-prefix {workflow.wrapper_prefix} ", - "--max-inventory-time 0 --ignore-incomplete ", - "--latency-wait {latency_wait} ", - self.get_default_remote_provider_args(), - self.get_default_resources_args(), - self.get_local_groupid_arg(), - "{overwrite_workdir} {overwrite_config} {printshellcmds} {rules} ", - "--notemp --quiet --no-hooks --nolock --mode {} ".format( - Mode.subprocess - ), - ) ) - self.exec_job += self.get_additional_args() self.use_threads = use_threads self.cores = cores @@ -463,6 +449,22 @@ def __init__( self.workers = workers + 5 self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) + @property + def job_specific_local_groupid(self): + return False + + def get_job_exec_prefix(self, job): + return f"cd {self.workflow.workdir_init}" + + def get_exec_mode(self): + return Mode.subprocess + + def get_python_executable(self): + return sys.executable + + def get_envvar_declarations(self): + return "" + def run(self, job, callback=None, submit_callback=None, error_callback=None): super()._run(job) @@ -578,10 +580,8 @@ def run_group_job(self, job): time.sleep(1) def spawn_job(self, job): - exec_job = self.exec_job - cmd = self.format_job_pattern( - exec_job, job=job, _quote_all=True, latency_wait=self.latency_wait - ) + cmd = self.format_job_exec(job) + print(cmd) try: subprocess.check_call(cmd, shell=True) except subprocess.CalledProcessError as e: @@ -661,9 +661,9 @@ def __init__( assume_shared_fs=True, max_status_checks_per_second=1, disable_default_remote_provider_args=False, - disable_get_default_resources_args=False, + disable_default_resources_args=False, + disable_envvar_declarations=False, keepincomplete=False, - keepmetadata=True, ): from ratelimiter import RateLimiter @@ -676,16 +676,17 @@ def __init__( printshellcmds=printshellcmds, assume_shared_fs=assume_shared_fs, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) if not self.assume_shared_fs: # use relative path to Snakefile self.snakefile = os.path.relpath(workflow.main_snakefile) + self.is_default_jobscript = False jobscript = workflow.jobscript if jobscript is None: jobscript = os.path.join(os.path.dirname(__file__), self.default_jobscript) + self.is_default_jobscript = True try: with open(jobscript) as f: self.jobscript = f.read() @@ -697,33 +698,6 @@ def __init__( 'Defined jobname ("{}") has to contain the wildcard {jobid}.' ) - if exec_job is None: - self.exec_job = "\\\n".join( - ( - "{envvars} " "cd {workflow.workdir_init} && " - if assume_shared_fs - else "", - "{sys.executable} " if assume_shared_fs else "python ", - "-m snakemake {target} --snakefile {snakefile} ", - "--force --cores {cores} --keep-target-files --keep-remote --max-inventory-time 0 ", - "{waitfiles_parameter:u} --latency-wait {latency_wait} ", - " --attempt {attempt} {use_threads} --scheduler {workflow.scheduler_type} ", - "--wrapper-prefix {workflow.wrapper_prefix} ", - "{overwrite_workdir} {overwrite_config} {printshellcmds} {rules} " - "--nocolor --notemp --no-hooks --nolock {scheduler_solver_path:u} ", - "--mode {} ".format(Mode.cluster), - ) - ) - else: - self.exec_job = exec_job - - self.exec_job += self.get_additional_args() - self.exec_job += " {job_specific_args:u} " - if not disable_default_remote_provider_args: - self.exec_job += self.get_default_remote_provider_args() - if not disable_get_default_resources_args: - self.exec_job += self.get_default_resources_args() - self.jobname = jobname self._tmpdir = None self.cores = cores if cores else "all" @@ -738,12 +712,69 @@ def __init__( self.wait_thread.daemon = True self.wait_thread.start() + self.disable_default_remote_provider_args = disable_default_remote_provider_args + self.disable_default_resources_args = disable_default_resources_args + self.disable_envvar_declarations = disable_envvar_declarations + self.max_status_checks_per_second = max_status_checks_per_second self.status_rate_limiter = RateLimiter( max_calls=self.max_status_checks_per_second, period=1 ) + def get_default_remote_provider_args(self): + if not self.disable_default_remote_provider_args: + return super().get_default_remote_provider_args() + else: + return "" + + def get_default_resources_args(self, default_resources=None): + if not self.disable_default_resources_args: + return super().get_default_resources_args(default_resources) + else: + return "" + + def get_workdir_arg(self): + if self.assume_shared_fs: + return super().get_workdir_arg() + return "" + + def get_envvar_declarations(self): + if not self.disable_envvar_declarations: + return " ".join( + f"{var}={repr(os.environ[var])}" for var in self.workflow.envvars + ) + else: + return "" + + def get_python_executable(self): + return sys.executable if self.assume_shared_fs else "python" + + def get_exec_mode(self): + return Mode.cluster + + def get_job_args(self, job): + waitfiles_parameter = "" + if self.assume_shared_fs: + wait_for_files = [] + wait_for_files.append(self.tmpdir) + wait_for_files.extend(job.get_wait_for_files()) + + # Only create extra file if we have more than 20 input files. + # This should not require the file creation in most cases. + if len(wait_for_files) > 20: + wait_for_files_file = self.get_jobscript(job) + ".waitforfilesfile.txt" + with open(wait_for_files_file, "w") as fd: + print(wait_for_files, sep="\n", file=fd) + + waitfiles_parameter = format_cli_arg( + "--wait-for-files-file", wait_for_files_file + ) + else: + waitfiles_parameter = format_cli_arg("--wait-for-files", wait_for_files) + + return f"{super().get_job_args(job)} {waitfiles_parameter}" + def _wait_thread(self): try: self._wait_for_jobs() @@ -787,76 +818,23 @@ def get_jobscript(self, job): return os.path.join(self.tmpdir, f) - def format_job(self, pattern, job, **kwargs): - wait_for_files = [] - scheduler_solver_path = "" - if self.assume_shared_fs: - wait_for_files.append(self.tmpdir) - wait_for_files.extend(job.get_wait_for_files()) - # Prepend PATH of current python executable to PATH. - # This way, we ensure that the snakemake process in the cluster node runs - # in the same environment as the current process. - # This is necessary in order to find the pulp solver backends (e.g. coincbc). - scheduler_solver_path = "--scheduler-solver-path {}".format( - os.path.dirname(sys.executable) - ) - - # Only create extra file if we have more than 20 input files. - # This should not require the file creation in most cases. - if len(wait_for_files) > 20: - wait_for_files_file = self.get_jobscript(job) + ".waitforfilesfile.txt" - with open(wait_for_files_file, "w") as fd: - fd.write("\n".join(wait_for_files)) - - waitfiles_parameter = format( - "--wait-for-files-file {wait_for_files_file}", - wait_for_files_file=repr(wait_for_files_file), - ) - else: - waitfiles_parameter = format( - "--wait-for-files {wait_for_files}", - wait_for_files=[repr(f) for f in wait_for_files], - ) - job_specific_args = "" - if job.is_group(): - job_specific_args = f"--local-groupid {job.jobid}" + def write_jobscript(self, job, jobscript): + exec_job = self.format_job_exec(job) - format_p = partial( - self.format_job_pattern, - job=job, - properties=job.properties(cluster=self.cluster_params(job)), - latency_wait=self.latency_wait, - waitfiles_parameter=waitfiles_parameter, - scheduler_solver_path=scheduler_solver_path, - job_specific_args=job_specific_args, - **kwargs, - ) try: - return format_p(pattern) - except KeyError as e: - raise WorkflowError( - "Error formatting jobscript: {} not found\n" - "Make sure that your custom jobscript is up to date.".format(e) + content = self.jobscript.format( + properties=job.properties(cluster=self.cluster_params(job)), + exec_job=exec_job, ) + except KeyError as e: + if self.is_default_jobscript: + raise e + else: + raise WorkflowError( + f"Error formatting custom jobscript {self.workflow.jobscript}: value for {e} not found.\n" + "Make sure that your custom jobscript is defined as expected." + ) - def write_jobscript(self, job, jobscript, **kwargs): - # only force threads if this is not a group job - # otherwise we want proper process handling - use_threads = "--force-use-threads" if not job.is_group() else "" - - envvars = " ".join( - "{}={}".format(var, os.environ[var]) for var in self.workflow.envvars - ) - - exec_job = self.format_job( - self.exec_job, - job, - _quote_all=True, - use_threads=use_threads, - envvars=envvars, - **kwargs, - ) - content = self.format_job(self.jobscript, job, exec_job=exec_job, **kwargs) logger.debug("Jobscript:\n{}".format(content)) with open(jobscript, "w") as f: print(content, file=f) @@ -954,7 +932,6 @@ def __init__( assume_shared_fs=True, max_status_checks_per_second=1, keepincomplete=False, - keepmetadata=True, ): self.submitcmd = submitcmd @@ -986,25 +963,43 @@ def __init__( assume_shared_fs=assume_shared_fs, max_status_checks_per_second=max_status_checks_per_second, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) self.sidecar_vars = None if self.sidecarcmd: self._launch_sidecar() - if statuscmd: - self.exec_job += " && exit 0 || exit 1" - elif assume_shared_fs: - # TODO wrap with watch and touch {jobrunning} - # check modification date of {jobrunning} in the wait_for_job method - self.exec_job += " && touch {jobfinished} || (touch {jobfailed}; exit 1)" - else: + if not statuscmd and not assume_shared_fs: raise WorkflowError( "If no shared filesystem is used, you have to " "specify a cluster status command." ) + def get_job_exec_prefix(self, job): + if self.assume_shared_fs: + return f"cd {self.workflow.workdir_init}" + else: + return "" + + def get_job_exec_suffix(self, job): + if self.statuscmd: + return "exit 0 || exit 1" + elif self.assume_shared_fs: + # TODO wrap with watch and touch {jobrunning} + # check modification date of {jobrunning} in the wait_for_job method + + return ( + f"touch {repr(self.get_jobfinished_marker(job))} || " + f"(touch {repr(self.get_jobfailed_marker(job))}; exit 1)" + ) + assert False, "bug: neither statuscmd defined nor shared FS" + + def get_jobfinished_marker(self, job): + return os.path.join(self.tmpdir, "{}.jobfinished".format(job.jobid)) + + def get_jobfailed_marker(self, job): + return os.path.join(self.tmpdir, "{}.jobfailed".format(job.jobid)) + def _launch_sidecar(self): def copy_stdout(executor, process): """Run sidecar process and copy it's stdout to our stdout.""" @@ -1092,15 +1087,13 @@ def register_job(self, job): def run(self, job, callback=None, submit_callback=None, error_callback=None): super()._run(job) - workdir = os.getcwd() jobid = job.jobid jobscript = self.get_jobscript(job) - jobfinished = os.path.join(self.tmpdir, "{}.jobfinished".format(jobid)) - jobfailed = os.path.join(self.tmpdir, "{}.jobfailed".format(jobid)) - self.write_jobscript( - job, jobscript, jobfinished=jobfinished, jobfailed=jobfailed - ) + self.write_jobscript(job, jobscript) + + jobfinished = self.get_jobfinished_marker(job) + jobfailed = self.get_jobfailed_marker(job) if self.statuscmd: ext_jobid = self.dag.incomplete_external_jobid(job) @@ -1316,7 +1309,6 @@ def __init__( restart_times=0, assume_shared_fs=True, keepincomplete=False, - keepmetadata=True, ): super().__init__( workflow, @@ -1331,19 +1323,22 @@ def __init__( assume_shared_fs=assume_shared_fs, max_status_checks_per_second=10, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) self.submitcmd = submitcmd self.external_jobid = dict() + def get_job_exec_prefix(self, job): + if self.assume_shared_fs: + return f"cd {self.workflow.workdir_init}" + else: + return "" + def cancel(self): logger.info("Will exit after finishing currently running jobs.") self.shutdown() def run(self, job, callback=None, submit_callback=None, error_callback=None): super()._run(job) - workdir = os.getcwd() - jobid = job.jobid jobscript = self.get_jobscript(job) self.write_jobscript(job, jobscript) @@ -1426,7 +1421,6 @@ def __init__( assume_shared_fs=True, max_status_checks_per_second=1, keepincomplete=False, - keepmetadata=True, ): super().__init__( workflow, @@ -1441,7 +1435,6 @@ def __init__( assume_shared_fs=assume_shared_fs, max_status_checks_per_second=max_status_checks_per_second, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) try: import drmaa @@ -1458,6 +1451,12 @@ def __init__( self.session.initialize() self.submitted = list() + def get_job_exec_prefix(self, job): + if self.assume_shared_fs: + return f"cd {self.workflow.workdir_init}" + else: + return "" + def cancel(self): from drmaa.const import JobControlAction from drmaa.errors import InvalidJobException, InternalException @@ -1626,21 +1625,9 @@ def __init__( local_input=None, restart_times=None, keepincomplete=False, - keepmetadata=True, ): self.workflow = workflow - exec_job = ( - "cp -rf /source/. . && " - "snakemake {target} --snakefile {snakefile} " - "--force --cores {cores} --keep-target-files --keep-remote " - "--latency-wait {latency_wait} --scheduler {workflow.scheduler_type} " - " --attempt {attempt} {use_threads} --max-inventory-time 0 " - "--wrapper-prefix {workflow.wrapper_prefix} " - "{overwrite_config} {printshellcmds} {rules} --nocolor " - "--notemp --no-hooks --nolock " - ) - super().__init__( workflow, dag, @@ -1652,9 +1639,9 @@ def __init__( cluster_config=cluster_config, local_input=local_input, restart_times=restart_times, - exec_job=exec_job, assume_shared_fs=False, max_status_checks_per_second=10, + disable_envvar_declarations=True, ) # use relative path to Snakefile self.snakefile = os.path.relpath(workflow.main_snakefile) @@ -1681,6 +1668,9 @@ def __init__( self.container_image = container_image or get_container_image() logger.info(f"Using {self.container_image} for Kubernetes jobs.") + def get_job_exec_prefix(self, job): + return "cp -rf /source/. ." + def register_secret(self): import kubernetes.client @@ -1814,12 +1804,8 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None): import kubernetes.client super()._run(job) - exec_job = self.format_job( - self.exec_job, - job, - _quote_all=True, - use_threads="--force-use-threads" if not job.is_group() else "", - ) + exec_job = self.format_job_exec(job) + # Kubernetes silently does not submit a job if the name is too long # therefore, we ensure that it is not longer than snakejob+uuid. jobid = "snakejob-{}".format( @@ -2090,7 +2076,6 @@ def __init__( restart_times=None, max_status_checks_per_second=1, keepincomplete=False, - keepmetadata=True, ): self.workflow = workflow self.workflow_sources = [] @@ -2124,14 +2109,6 @@ def __init__( logger.debug("bucket=" + self.s3_bucket) logger.debug("subdir=" + self.s3_subdir) self.quiet = quiet - exec_job = ( - "snakemake {target} --snakefile {snakefile} " - "--force --cores {cores} --keep-target-files --keep-remote " - "--latency-wait 0 --scheduler {workflow.scheduler_type} " - "--attempt 1 {use_threads} --max-inventory-time 0 " - "{overwrite_config} {rules} --nocolor " - "--notemp --no-hooks --nolock " - ) super().__init__( workflow, @@ -2142,11 +2119,11 @@ def __init__( printshellcmds=printshellcmds, local_input=local_input, restart_times=restart_times, - exec_job=exec_job, assume_shared_fs=False, max_status_checks_per_second=max_status_checks_per_second, disable_default_remote_provider_args=True, - disable_get_default_resources_args=True, + disable_default_resources_args=True, + disable_envvar_declarations=True, ) self.container_image = container_image or get_container_image() logger.info(f"Using {self.container_image} for Tibanna jobs.") @@ -2193,31 +2170,22 @@ def split_filename(self, filename, checkdir=None): def remove_prefix(self, s): return re.sub("^{}/{}/".format(self.s3_bucket, self.s3_subdir), "", s) - def handle_remote(self, target): - if isinstance(target, _IOFile) and target.remote_object.provider.is_default: - return self.remove_prefix(target) - else: - return target + def get_job_targets(self, job): + def handle_target(target): + if isinstance(target, _IOFile) and target.remote_object.provider.is_default: + return self.remove_prefix(target) + else: + return target + + return [handle_target(target) for target in job.get_targets()] + + def get_snakefile(self): + return os.path.basename(self.snakefile) def add_command(self, job, tibanna_args, tibanna_config): - # snakefile, with file name remapped - snakefile_fname = tibanna_args.snakemake_main_filename - # targets, with file name remapped - targets = job.get_targets() - if not isinstance(targets, list): - targets = [targets] - targets_default = " ".join([self.handle_remote(t) for t in targets]) - # use_threads - use_threads = "--force-use-threads" if not job.is_group() else "" # format command - command = self.format_job_pattern( - self.exec_job, - job, - target=targets_default, - snakefile=snakefile_fname, - use_threads=use_threads, - cores=tibanna_config["cpu"], - ) + command = self.format_job_exec(job) + if self.precommand: command = self.precommand + "; " + command logger.debug("command = " + str(command)) diff --git a/snakemake/executors/common.py b/snakemake/executors/common.py new file mode 100644 index 000000000..8f0145dd2 --- /dev/null +++ b/snakemake/executors/common.py @@ -0,0 +1,24 @@ +from snakemake.io import not_iterable + + +def format_cli_arg(flag, value, quote=True, skip=False): + if not skip and value: + if isinstance(value, bool): + value = "" + else: + value = format_cli_pos_arg(value, quote=quote) + return f"{flag} {value}" + return "" + + +def format_cli_pos_arg(value, quote=True): + if not_iterable(value): + return repr(value) + elif isinstance(value, dict): + return join_cli_args(repr(f"{key}={val}") for key, val in value) + else: + return join_cli_args(repr(v) for v in value) + + +def join_cli_args(args): + return " ".join(arg for arg in args if arg) diff --git a/snakemake/executors/ga4gh_tes.py b/snakemake/executors/ga4gh_tes.py index bb5a3799d..e7af062b6 100644 --- a/snakemake/executors/ga4gh_tes.py +++ b/snakemake/executors/ga4gh_tes.py @@ -52,28 +52,6 @@ def __init__( logger.info("[TES] Job execution on TES: {url}".format(url=self.tes_url)) - exec_job = "\\\n".join( - ( - "{envvars} ", - "mkdir /tmp/conda && cd /tmp && ", - "snakemake {target} ", - "--snakefile {snakefile} ", - "--verbose ", - "--force --cores {cores} ", - "--keep-target-files ", - "--keep-remote ", - "--latency-wait 10 ", - "--attempt 1 ", - "{use_threads}", - "{overwrite_config} {rules} ", - "--nocolor ", - "--notemp ", - "--no-hooks ", - "--nolock ", - "--mode {} ".format(Mode.cluster), - ) - ) - super().__init__( workflow, dag, @@ -85,32 +63,12 @@ def __init__( cluster_config=cluster_config, local_input=local_input, restart_times=restart_times, - exec_job=exec_job, assume_shared_fs=assume_shared_fs, max_status_checks_per_second=max_status_checks_per_second, ) - def write_jobscript(self, job, jobscript, **kwargs): - - use_threads = "--force-use-threads" if not job.is_group() else "" - envvars = "\\\n".join( - "export {}={};".format(var, os.environ[var]) - for var in self.workflow.envvars - ) - - exec_job = self.format_job( - self.exec_job, - job, - _quote_all=False, - use_threads=use_threads, - envvars=envvars, - **kwargs, - ) - content = self.format_job(self.jobscript, job, exec_job=exec_job, **kwargs) - logger.debug("Jobscript:\n{}".format(content)) - with open(jobscript, "w") as f: - print(content, file=f) - os.chmod(jobscript, os.stat(jobscript).st_mode | stat.S_IXUSR) + def get_job_exec_prefix(self, job): + return "mkdir /tmp/conda && cd /tmp" def shutdown(self): # perform additional steps on shutdown if necessary diff --git a/snakemake/executors/google_lifesciences.py b/snakemake/executors/google_lifesciences.py index 878a09119..c2b976165 100644 --- a/snakemake/executors/google_lifesciences.py +++ b/snakemake/executors/google_lifesciences.py @@ -53,7 +53,6 @@ def __init__( cache=False, local_input=None, restart_times=None, - exec_job=None, max_status_checks_per_second=1, preemption_default=None, preemptible_rules=None, @@ -65,21 +64,9 @@ def __init__( self.workdir = os.path.realpath(os.path.dirname(self.workflow.persistence.path)) self._save_storage_cache = cache - # Relative path for running on instance - self._set_snakefile() - # Prepare workflow sources for build package self._set_workflow_sources() - exec_job = exec_job or ( - "snakemake {target} --snakefile %s " - "--force --cores {cores} --keep-target-files --keep-remote " - "--latency-wait {latency_wait} --scheduler {workflow.scheduler_type} " - "--attempt 1 {use_threads} --max-inventory-time 0 " - "{overwrite_config} {rules} --nocolor " - "--notemp --no-hooks --nolock " % self.snakefile - ) - # Set preemptible instances self._set_preemptible_rules(preemption_default, preemptible_rules) @@ -127,11 +114,16 @@ def __init__( quiet=quiet, printshellcmds=printshellcmds, restart_times=restart_times, - exec_job=exec_job, assume_shared_fs=False, max_status_checks_per_second=10, ) + def get_default_resources_args(self, default_resources=None): + assert default_resources is None + return super().get_default_resources_args( + default_resources=self.default_resources + ) + def _get_services(self): """use the Google Discovery Build to generate API clients for Life Sciences, and use the google storage python client @@ -448,7 +440,7 @@ def _generate_job_resources(self, job): gpu_count = 1 # Update default resources using decided memory and disk - + # TODO why is this needed?? self.default_resources = DefaultResources( from_other=self.workflow.default_resources ) @@ -623,14 +615,9 @@ def _get_accelerator(self, gpu_count, zone, gpu_model=None): return keepers[smallest] - def _set_snakefile(self): - """The snakefile must be a relative path, which should be derived - from the self.workflow.main_snakefile. - """ + def get_snakefile(self): assert os.path.exists(self.workflow.main_snakefile) - self.snakefile = self.workflow.main_snakefile.replace(self.workdir, "").strip( - os.sep - ) + return self.workflow.main_snakefile.replace(self.workdir, "").strip(os.sep) def _set_workflow_sources(self): """We only add files from the working directory that are config related @@ -742,26 +729,23 @@ def _generate_log_action(self, job): def _generate_job_action(self, job): """generate a single action to execute the job.""" - # Derive the entrypoint command, the same content that might be written by self.get_jobscript(job) - use_threads = "--force-use-threads" if not job.is_group() else "" - - exec_job = self.format_job( - self.exec_job, job, _quote_all=True, use_threads=use_threads - ) + exec_job = self.format_job_exec(job) - # Now that we've parsed the job resource requirements, add to exec - exec_job += self.get_default_resources_args(self.default_resources) - - # script should be changed to this when added to version control! - # https://raw.githubusercontent.com/snakemake/snakemake/main/snakemake/executors/google_lifesciences_helper.py # The full command to download the archive, extract, and run # For snakemake bases, we must activate the conda environment, but # for custom images we must allow this to fail (hence || true) commands = [ "/bin/bash", "-c", - "mkdir -p /workdir && cd /workdir && wget -O /download.py https://raw.githubusercontent.com/snakemake/snakemake/main/snakemake/executors/google_lifesciences_helper.py && chmod +x /download.py && source activate snakemake || true && python /download.py download %s %s /tmp/workdir.tar.gz && tar -xzvf /tmp/workdir.tar.gz && %s" - % (self.bucket.name, self.pipeline_package, exec_job), + "mkdir -p /workdir && " + "cd /workdir && " + "wget -O /download.py " + "https://raw.githubusercontent.com/snakemake/snakemake/main/snakemake/executors/google_lifesciences_helper.py && " + "chmod +x /download.py && " + "source activate snakemake || true && " + f"python /download.py download {self.bucket.name} {self.pipeline_package} " + "/tmp/workdir.tar.gz && " + f"tar -xzvf /tmp/workdir.tar.gz && {exec_job}", ] # We are only generating one action, one job per run diff --git a/snakemake/jobs.py b/snakemake/jobs.py index 2483e3fef..56c0f5af5 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -1100,7 +1100,7 @@ def products(self): return products def get_targets(self): - return self.targetfile or [self.rule.name] + return [self.targetfile or self.rule.name] @property def is_branched(self): diff --git a/snakemake/remote/GS.py b/snakemake/remote/GS.py index 98c6f3920..ea92e7994 100644 --- a/snakemake/remote/GS.py +++ b/snakemake/remote/GS.py @@ -14,6 +14,7 @@ from snakemake.common import lazy_property import snakemake.io from snakemake.utils import os_sync +from snakemake.logging import logger try: import google.cloud @@ -123,7 +124,7 @@ def __init__( keep_local=keep_local, stay_on_remote=stay_on_remote, is_default=is_default, - **kwargs + **kwargs, ) self.client = storage.Client(*args, **kwargs) @@ -186,6 +187,7 @@ async def inventory(self, cache: snakemake.io.IOCache): cache.exists_remote[name] = True cache.mtime[name] = snakemake.io.Mtime(remote=blob.updated.timestamp()) cache.size[name] = blob.size + # TODO cache "is directory" information cache.remaining_wait_time -= time.time() - start_time @@ -200,22 +202,36 @@ def get_inventory_parent(self): @retry.Retry(predicate=google_cloud_retry_predicate) def exists(self): - return self.blob.exists() + if self.blob.exists(): + return True + elif any(self.directory_entries()): + return True + else: + return False + @retry.Retry(predicate=google_cloud_retry_predicate) def mtime(self): if self.exists(): - self.update_blob() - t = self.blob.updated - return t.timestamp() + if self.is_directory(): + return max( + blob.updated.timestamp() for blob in self.directory_entries() + ) + else: + self.update_blob() + return self.blob.updated.timestamp() else: raise WorkflowError( "The file does not seem to exist remotely: %s" % self.local_file() ) + @retry.Retry(predicate=google_cloud_retry_predicate) def size(self): if self.exists(): - self.update_blob() - return self.blob.size // 1024 + if self.is_directory(): + return 0 + else: + self.update_blob() + return self.blob.size // 1024 else: return self._iofile.size_local @@ -226,10 +242,19 @@ def _download(self): return None # Create just a directory, or a file itself - if snakemake.io.is_flagged(self.local_file(), "directory"): + if self.is_directory(): return self._download_directory() return download_blob(self.blob, self.local_file()) + @retry.Retry(predicate=google_cloud_retry_predicate) + def is_directory(self): + if snakemake.io.is_flagged(self.file(), "directory"): + return True + elif self.blob.exists(): + return False + else: + return any(self.directory_entries()) + @retry.Retry(predicate=google_cloud_retry_predicate) def _download_directory(self): """A 'private' function to handle download of a storage folder, which @@ -238,7 +263,7 @@ def _download_directory(self): # Create the directory locally os.makedirs(self.local_file(), exist_ok=True) - for blob in self.client.list_blobs(self.bucket_name, prefix=self.key): + for blob in self.directory_entries(): local_name = "{}/{}".format(blob.bucket.name, blob.name) # Don't try to create "directory blob" @@ -310,11 +335,7 @@ def bucket_name(self): @property def key(self): - key = self.parse().group("key") - f = self.local_file() - if snakemake.io.is_flagged(f, "directory"): - key = key if f.endswith("/") else key + "/" - return key + return self.parse().group("key") def parse(self): m = re.search("(?P[^/]*)/(?P.*)", self.local_file()) @@ -324,3 +345,10 @@ def parse(self): "/.".format(self.local_file()) ) return m + + def directory_entries(self): + prefix = self.key + if not prefix.endswith("/"): + prefix += "/" + + return self.client.list_blobs(self.bucket_name, prefix=prefix) diff --git a/snakemake/remote/__init__.py b/snakemake/remote/__init__.py index 2f8112d12..19a4bf7f5 100644 --- a/snakemake/remote/__init__.py +++ b/snakemake/remote/__init__.py @@ -154,6 +154,10 @@ def available_protocols(self): def remote_interface(self): pass + @property + def name(self): + return self.__module__.split(".")[-1] + class AbstractRemoteObject: """This is an abstract class to be used to derive remote object classes for diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index a165530be..f485fa7b4 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -98,7 +98,6 @@ def __init__( force_use_threads=False, assume_shared_fs=True, keepincomplete=False, - keepmetadata=True, scheduler_type=None, scheduler_ilp_solver=None, ): @@ -122,7 +121,6 @@ def __init__( self.greediness = 1 self.max_jobs_per_second = max_jobs_per_second self.keepincomplete = keepincomplete - self.keepmetadata = keepmetadata self.scheduler_type = scheduler_type self.scheduler_ilp_solver = scheduler_ilp_solver self._tofinish = [] @@ -142,13 +140,7 @@ def __init__( self.global_resources["_cores"] = sys.maxsize self.resources = dict(self.global_resources) - use_threads = ( - force_use_threads - or (os.name != "posix") - or cluster - or cluster_sync - or drmaa - ) + use_threads = force_use_threads or (os.name != "posix") self._open_jobs = threading.Semaphore(0) self._lock = threading.Lock() @@ -190,7 +182,6 @@ def __init__( printshellcmds=printshellcmds, cores=local_cores, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) if cluster or cluster_sync: if cluster_sync: @@ -217,7 +208,6 @@ def __init__( printshellcmds=printshellcmds, assume_shared_fs=assume_shared_fs, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) if workflow.immediate_submit: self.update_dynamic = False @@ -239,7 +229,6 @@ def __init__( assume_shared_fs=assume_shared_fs, max_status_checks_per_second=max_status_checks_per_second, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) elif kubernetes: self._local_executor = CPUExecutor( @@ -251,7 +240,6 @@ def __init__( printshellcmds=printshellcmds, cores=local_cores, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) self._executor = KubernetesExecutor( @@ -264,7 +252,6 @@ def __init__( printshellcmds=printshellcmds, cluster_config=cluster_config, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) elif tibanna: self._local_executor = CPUExecutor( @@ -277,7 +264,6 @@ def __init__( use_threads=use_threads, cores=local_cores, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) self._executor = TibannaExecutor( @@ -292,7 +278,6 @@ def __init__( quiet=quiet, printshellcmds=printshellcmds, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) elif google_lifesciences: self._local_executor = CPUExecutor( @@ -353,7 +338,6 @@ def __init__( use_threads=use_threads, cores=cores, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, ) if self.max_jobs_per_second and not self.dryrun: max_jobs_frac = Fraction(self.max_jobs_per_second).limit_denominator() diff --git a/snakemake/workflow.py b/snakemake/workflow.py index d15dec62a..2084a0795 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -149,6 +149,7 @@ def __init__( max_threads=None, all_temp=False, local_groupid="local", + keep_metadata=True, latency_wait=3, ): """ @@ -235,6 +236,7 @@ def __init__( self.all_temp = all_temp self.scheduler = None self.local_groupid = local_groupid + self.keep_metadata = keep_metadata self.latency_wait = latency_wait _globals = globals() @@ -631,7 +633,6 @@ def execute( export_cwl=False, batch=None, keepincomplete=False, - keepmetadata=True, ): self.check_localrules() @@ -1030,7 +1031,6 @@ def files(items): force_use_threads=force_use_threads, assume_shared_fs=self.assume_shared_fs, keepincomplete=keepincomplete, - keepmetadata=keepmetadata, scheduler_type=scheduler_type, scheduler_ilp_solver=scheduler_ilp_solver, ) @@ -1159,6 +1159,16 @@ def register_envvars(self, *envvars): Register environment variables that shall be passed to jobs. If used multiple times, union is taken. """ + invalid_envvars = [ + envvar + for envvar in envvars + if re.match("^\w+$", envvar, flags=re.ASCII) is None + ] + if invalid_envvars: + raise WorkflowError( + f"Invalid environment variables requested: {', '.join(map(repr, invalid_envvars))}. " + "Environment variable names may only contain alphanumeric characters and the underscore. " + ) undefined = set(var for var in envvars if var not in os.environ) if self.check_envvars and undefined: raise WorkflowError( @@ -1249,7 +1259,7 @@ def func(*args, **wildcards): return expand( *args, scatteritem=map("{{}}-of-{}".format(n).format, range(1, n + 1)), - **wildcards + **wildcards, ) for key in content: @@ -1369,7 +1379,7 @@ def decorate(ruleinfo): if ruleinfo.wildcard_constraints: rule.set_wildcard_constraints( *ruleinfo.wildcard_constraints[0], - **ruleinfo.wildcard_constraints[1] + **ruleinfo.wildcard_constraints[1], ) if ruleinfo.name: rule.name = ruleinfo.name diff --git a/tests/test14/Snakefile.nonstandard b/tests/test14/Snakefile.nonstandard index 02a08c26f..0a0f5c6f6 100644 --- a/tests/test14/Snakefile.nonstandard +++ b/tests/test14/Snakefile.nonstandard @@ -3,7 +3,8 @@ from snakemake import shell chromosomes = [1,2,3,4,5] envvars: - "TESTVAR" + "TESTVAR", + "TESTVAR2" diff --git a/tests/test_kubernetes/Snakefile b/tests/test_kubernetes/Snakefile index af2cfd821..1aeae1ab8 100644 --- a/tests/test_kubernetes/Snakefile +++ b/tests/test_kubernetes/Snakefile @@ -1,39 +1,52 @@ import os from snakemake.remote.GS import RemoteProvider as GSRemoteProvider + GS = GSRemoteProvider() rule all: input: - "landsat-data.txt.bz2" + "landsat-data.txt.bz2", + "testdir", rule copy: input: - GS.remote("gcp-public-data-landsat/LC08/01/001/003/LC08_L1GT_001003_20170430_20170501_01_RT/LC08_L1GT_001003_20170430_20170501_01_RT_MTL.txt") + GS.remote( + "gcp-public-data-landsat/LC08/01/001/003/LC08_L1GT_001003_20170430_20170501_01_RT/LC08_L1GT_001003_20170430_20170501_01_RT_MTL.txt" + ), output: - "landsat-data.txt" + "landsat-data.txt", resources: - mem_mb=100 + mem_mb=100, run: # we could test volume size like this but it is currently unclear what f1-micro instances provide as boot disk size - #stats = os.statvfs('.') - #volume_gib = stats.f_bsize * stats.f_blocks / 1.074e9 - #assert volume_gib > 90 + # stats = os.statvfs('.') + # volume_gib = stats.f_bsize * stats.f_blocks / 1.074e9 + # assert volume_gib > 90 shell("cp {input} {output}") rule pack: input: - "landsat-data.txt" + "landsat-data.txt", output: - "landsat-data.txt.bz2" + "landsat-data.txt.bz2", conda: "envs/gzip.yaml" singularity: "docker://continuumio/miniconda3:4.4.10" log: - "logs/pack.log" + "logs/pack.log", shell: "bzip2 -c {input} > {output}; echo successful > {log}" + + +rule directory: + output: + directory("testdir"), + log: + "logs/directory.log", + shell: + "mkdir -p {output}; touch {output}/test.txt" diff --git a/tests/test_tibanna.py b/tests/test_tibanna.py index 0eb550cc6..9227c115d 100644 --- a/tests/test_tibanna.py +++ b/tests/test_tibanna.py @@ -16,8 +16,8 @@ def test_tibanna(): run( workdir, use_conda=True, - configfiles=[os.path.join(workdir, "config.json")], default_remote_prefix="snakemake-tibanna-test/1", + tibanna=True, tibanna_sfn="tibanna_unicorn_johannes", - tibanna_config="spot_instance=true", + tibanna_config=["spot_instance=true"], ) diff --git a/tests/test_tibanna/Snakefile b/tests/test_tibanna/Snakefile index 61faeb810..7fec1e9ef 100644 --- a/tests/test_tibanna/Snakefile +++ b/tests/test_tibanna/Snakefile @@ -12,6 +12,10 @@ from snakemake.remote.S3 import RemoteProvider as S3RemoteProvider S3 = S3RemoteProvider() + +configfile: "config.json" + + # envvars: # "TEST_ENVVAR1", # "TEST_ENVVAR2" diff --git a/tests/tests.py b/tests/tests.py index da4a7d2ef..be1516c44 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -101,6 +101,7 @@ def test13(): @skip_on_windows def test14(): os.environ["TESTVAR"] = "test" + os.environ["TESTVAR2"] = "test" run(dpath("test14"), snakefile="Snakefile.nonstandard", cluster="./qsub")