Skip to content

Commit

Permalink
fix issue #1475
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneskoester committed Mar 16, 2022
1 parent febdb19 commit 1038cad
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions snakemake/executors/__init__.py
Expand Up @@ -345,7 +345,11 @@ def format_job_pattern(self, pattern, job=None, **kwargs):
if not job.is_branched and not job.is_updated:
# Restrict considered rules. This does not work for updated jobs
# because they need to be updated in the spawned process as well.
format_cli_arg("--allowed-rules", job.rules, quote=False)
rules = format_cli_arg("--allowed-rules", job.rules, quote=False)

job_specific_args = format_cli_arg(
"--local-groupid", job.jobid, skip=not job.is_group()
)

target = format_cli_pos_arg(kwargs.get("target", job.get_targets()))
snakefile = format_cli_pos_arg(kwargs.get("snakefile", self.snakefile))
Expand All @@ -357,6 +361,34 @@ def format_job_pattern(self, pattern, job=None, **kwargs):
if "cores" in kwargs:
del kwargs["cores"]

# wait for files and scheduler
scheduler_solver_path = ""
waitfiles_parameter = ""
if self.assume_shared_fs:
wait_for_files = []
wait_for_files.append(self.tmpdir)
wait_for_files.extend(job.get_wait_for_files())
# Prepend PATH of current python executable to PATH.
# This way, we ensure that the snakemake process in the cluster node runs
# in the same environment as the current process.
# This is necessary in order to find the pulp solver backends (e.g. coincbc).
scheduler_solver_path = format_cli_arg(
"--scheduler-solver-path", os.path.dirname(sys.executable)
)

# Only create extra file if we have more than 20 input files.
# This should not require the file creation in most cases.
if len(wait_for_files) > 20:
wait_for_files_file = self.get_jobscript(job) + ".waitforfilesfile.txt"
with open(wait_for_files_file, "w") as fd:
print(wait_for_files, sep="\n", file=fd)

waitfiles_parameter = format_cli_arg(
"--wait-for-files-file", wait_for_files_file
)
else:
waitfiles_parameter = format_cli_arg("--wait-for-files", wait_for_files)

cmd = format(
pattern,
job=job,
Expand All @@ -370,6 +402,9 @@ def format_job_pattern(self, pattern, job=None, **kwargs):
benchmark_repeats=job.benchmark_repeats if not job.is_group() else None,
target=target,
rules=rules,
job_specific_args=job_specific_args,
scheduler_solver_path=scheduler_solver_path,
waitfiles_parameter=waitfiles_parameter,
**kwargs,
)
return cmd
Expand Down Expand Up @@ -769,50 +804,18 @@ def get_jobscript(self, job):
return os.path.join(self.tmpdir, f)

def format_job(self, pattern, job, **kwargs):
wait_for_files = []
scheduler_solver_path = ""
if self.assume_shared_fs:
wait_for_files.append(self.tmpdir)
wait_for_files.extend(job.get_wait_for_files())
# Prepend PATH of current python executable to PATH.
# This way, we ensure that the snakemake process in the cluster node runs
# in the same environment as the current process.
# This is necessary in order to find the pulp solver backends (e.g. coincbc).
scheduler_solver_path = format_cli_arg(
"--scheduler-solver-path", os.path.dirname(sys.executable)
)

# Only create extra file if we have more than 20 input files.
# This should not require the file creation in most cases.
if len(wait_for_files) > 20:
wait_for_files_file = self.get_jobscript(job) + ".waitforfilesfile.txt"
with open(wait_for_files_file, "w") as fd:
print(wait_for_files, sep="\n", file=fd)

waitfiles_parameter = format_cli_arg(
"--wait-for-files-file", wait_for_files_file
)
else:
waitfiles_parameter = format_cli_arg("--wait-for-files", wait_for_files)
job_specific_args = format_cli_arg(
"--local-groupid", job.jobid, skip=not job.is_group()
)

format_p = partial(
self.format_job_pattern,
job=job,
properties=job.properties(cluster=self.cluster_params(job)),
latency_wait=self.latency_wait,
waitfiles_parameter=waitfiles_parameter,
scheduler_solver_path=scheduler_solver_path,
job_specific_args=job_specific_args,
**kwargs,
)
try:
return format_p(pattern)
except KeyError as e:
raise WorkflowError(
"Error formatting jobscript: {} not found\n"
"Error formatting job: {} not found\n"
"Make sure that your custom jobscript is up to date.".format(e)
)

Expand Down

0 comments on commit 1038cad

Please sign in to comment.