Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixed temp file deletion for group jobs #1487

Merged
merged 2 commits into from Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions snakemake/dag.py
Expand Up @@ -605,6 +605,11 @@ def handle_temp(self, job):
if self.notemp:
return

if job.is_group():
for j in job:
self.handle_temp(j)
return

is_temp = lambda f: is_flagged(f, "temp")

# handle temp input
Expand Down Expand Up @@ -1466,6 +1471,8 @@ def finish(self, job, update_dynamic=True):
self.create_conda_envs()
potential_new_ready_jobs = True

self.handle_temp(job)

return potential_new_ready_jobs

def new_job(self, rule, targetfile=None, format_wildcards=None):
Expand Down
2 changes: 1 addition & 1 deletion snakemake/executors/__init__.py
Expand Up @@ -818,7 +818,7 @@ def format_job(self, pattern, job, **kwargs):
wait_for_files=[repr(f) for f in wait_for_files],
)
job_specific_args = ""
if job.is_group:
if job.is_group():
job_specific_args = f"--local-groupid {job.jobid}"

format_p = partial(
Expand Down
15 changes: 1 addition & 14 deletions snakemake/jobs.py
Expand Up @@ -1042,7 +1042,6 @@ def postprocess(
upload_remote=True,
handle_log=True,
handle_touch=True,
handle_temp=True,
error=False,
ignore_missing_output=False,
assume_shared_fs=True,
Expand Down Expand Up @@ -1083,11 +1082,6 @@ def postprocess(
"({}). Please ensure write permissions for the "
"directory {}".format(e, self.dag.workflow.persistence.path)
)
if handle_temp:
# temp handling has to happen after calling finished(),
# because we need to access temp output files to record
# start and end times.
self.dag.handle_temp(self)

@property
def name(self):
Expand Down Expand Up @@ -1409,14 +1403,7 @@ def cleanup(self):

def postprocess(self, error=False, **kwargs):
for job in self.jobs:
job.postprocess(handle_temp=False, error=error, **kwargs)
# Handle temp after per-job postprocess.
# This is necessary because group jobs are not topologically sorted,
# and we might otherwise delete a temp input file before it has been
# postprocessed by the outputting job in the same group.
if not error:
for job in self.jobs:
self.dag.handle_temp(job)
job.postprocess(error=error, **kwargs)
# remove all pipe and service outputs since all jobs of this group are done and the
# outputs are no longer needed
for job in self.jobs:
Expand Down