From 1038cadd3839292e4732a95a615919f9344d6b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Wed, 16 Mar 2022 13:26:51 +0100 Subject: [PATCH] fix issue #1475 --- snakemake/executors/__init__.py | 71 +++++++++++++++++---------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index ea0ca2db0..e617d240e 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -345,7 +345,11 @@ def format_job_pattern(self, pattern, job=None, **kwargs): if not job.is_branched and not job.is_updated: # Restrict considered rules. This does not work for updated jobs # because they need to be updated in the spawned process as well. - format_cli_arg("--allowed-rules", job.rules, quote=False) + rules = format_cli_arg("--allowed-rules", job.rules, quote=False) + + job_specific_args = format_cli_arg( + "--local-groupid", job.jobid, skip=not job.is_group() + ) target = format_cli_pos_arg(kwargs.get("target", job.get_targets())) snakefile = format_cli_pos_arg(kwargs.get("snakefile", self.snakefile)) @@ -357,6 +361,34 @@ def format_job_pattern(self, pattern, job=None, **kwargs): if "cores" in kwargs: del kwargs["cores"] + # wait for files and scheduler + scheduler_solver_path = "" + waitfiles_parameter = "" + if self.assume_shared_fs: + wait_for_files = [] + wait_for_files.append(self.tmpdir) + wait_for_files.extend(job.get_wait_for_files()) + # Prepend PATH of current python executable to PATH. + # This way, we ensure that the snakemake process in the cluster node runs + # in the same environment as the current process. + # This is necessary in order to find the pulp solver backends (e.g. coincbc). + scheduler_solver_path = format_cli_arg( + "--scheduler-solver-path", os.path.dirname(sys.executable) + ) + + # Only create extra file if we have more than 20 input files. + # This should not require the file creation in most cases. + if len(wait_for_files) > 20: + wait_for_files_file = self.get_jobscript(job) + ".waitforfilesfile.txt" + with open(wait_for_files_file, "w") as fd: + print(wait_for_files, sep="\n", file=fd) + + waitfiles_parameter = format_cli_arg( + "--wait-for-files-file", wait_for_files_file + ) + else: + waitfiles_parameter = format_cli_arg("--wait-for-files", wait_for_files) + cmd = format( pattern, job=job, @@ -370,6 +402,9 @@ def format_job_pattern(self, pattern, job=None, **kwargs): benchmark_repeats=job.benchmark_repeats if not job.is_group() else None, target=target, rules=rules, + job_specific_args=job_specific_args, + scheduler_solver_path=scheduler_solver_path, + waitfiles_parameter=waitfiles_parameter, **kwargs, ) return cmd @@ -769,50 +804,18 @@ def get_jobscript(self, job): return os.path.join(self.tmpdir, f) def format_job(self, pattern, job, **kwargs): - wait_for_files = [] - scheduler_solver_path = "" - if self.assume_shared_fs: - wait_for_files.append(self.tmpdir) - wait_for_files.extend(job.get_wait_for_files()) - # Prepend PATH of current python executable to PATH. - # This way, we ensure that the snakemake process in the cluster node runs - # in the same environment as the current process. - # This is necessary in order to find the pulp solver backends (e.g. coincbc). - scheduler_solver_path = format_cli_arg( - "--scheduler-solver-path", os.path.dirname(sys.executable) - ) - - # Only create extra file if we have more than 20 input files. - # This should not require the file creation in most cases. - if len(wait_for_files) > 20: - wait_for_files_file = self.get_jobscript(job) + ".waitforfilesfile.txt" - with open(wait_for_files_file, "w") as fd: - print(wait_for_files, sep="\n", file=fd) - - waitfiles_parameter = format_cli_arg( - "--wait-for-files-file", wait_for_files_file - ) - else: - waitfiles_parameter = format_cli_arg("--wait-for-files", wait_for_files) - job_specific_args = format_cli_arg( - "--local-groupid", job.jobid, skip=not job.is_group() - ) - format_p = partial( self.format_job_pattern, job=job, properties=job.properties(cluster=self.cluster_params(job)), latency_wait=self.latency_wait, - waitfiles_parameter=waitfiles_parameter, - scheduler_solver_path=scheduler_solver_path, - job_specific_args=job_specific_args, **kwargs, ) try: return format_p(pattern) except KeyError as e: raise WorkflowError( - "Error formatting jobscript: {} not found\n" + "Error formatting job: {} not found\n" "Make sure that your custom jobscript is up to date.".format(e) )