Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: potential memory corruption caused by Google storage objects accessed from different threads #1174

Merged
merged 4 commits into from Sep 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 47 additions & 47 deletions snakemake/scheduler.py
Expand Up @@ -118,6 +118,11 @@ def __init__(
self.scheduler_type = scheduler_type
self.scheduler_ilp_solver = scheduler_ilp_solver
self._tofinish = []
self._toerror = []
self.handle_job_success = True
self.update_resources = True
self.print_progress = not self.quiet and not self.dryrun
self.update_dynamic = not self.dryrun

self.global_resources = {
name: (sys.maxsize if res is None else res)
Expand Down Expand Up @@ -145,11 +150,7 @@ def __init__(
self._job_queue = None
self._last_job_selection_empty = False
self._submit_callback = self._noop
self._finish_callback = partial(
self._proceed,
update_dynamic=not self.dryrun,
print_progress=not self.quiet and not self.dryrun,
)
self._finish_callback = self._proceed

self._local_executor = None
if dryrun:
Expand Down Expand Up @@ -212,13 +213,10 @@ def __init__(
keepmetadata=keepmetadata,
)
if workflow.immediate_submit:
self._submit_callback = partial(
self._proceed,
update_dynamic=False,
print_progress=False,
update_resources=False,
handle_job_success=False,
)
self.update_dynamic = False
self.print_progress = False
self.update_resources = False
self.handle_job_success = False
else:
self._executor = DRMAAExecutor(
workflow,
Expand Down Expand Up @@ -440,6 +438,7 @@ def schedule(self):
# obtain needrun and running jobs in a thread-safe way
with self._lock:
self._finish_jobs()
self._error_jobs()
needrun = set(self.open_jobs)
running = list(self.running)
errors = self._errors
Expand Down Expand Up @@ -524,10 +523,40 @@ def schedule(self):

def _finish_jobs(self):
# must be called from within lock
for job, update_dynamic in self._tofinish:
self.dag.finish(job, update_dynamic=update_dynamic)
for job in self._tofinish:
if self.handle_job_success:
try:
self.get_executor(job).handle_job_success(job)
except (RuleException, WorkflowError) as e:
# if an error occurs while processing job output,
# we do the same as in case of errors during execution
print_exception(e, self.workflow.linemaps)
self._handle_error(job)
return

if self.update_resources:
# normal jobs have len=1, group jobs have len>1
self.finished_jobs += len(job)
self.running.remove(job)
self._free_resources(job)

if self.print_progress:
if job.is_group():
for j in job:
logger.job_finished(jobid=j.jobid)
else:
logger.job_finished(jobid=job.jobid)
self.progress()

self.dag.finish(job, update_dynamic=self.update_dynamic)
self._tofinish.clear()

def _error_jobs(self):
# must be called from within lock
for job in self._toerror:
self._handle_error(job)
self._toerror.clear()

def run(self, jobs, executor=None):
if executor is None:
executor = self._executor
Expand Down Expand Up @@ -555,42 +584,13 @@ def _free_resources(self, job):
def _proceed(
self,
job,
update_dynamic=True,
print_progress=False,
update_resources=True,
handle_job_success=True,
):
"""Do stuff after job is finished."""
with self._lock:
if handle_job_success:
# by calling this behind the lock, we avoid race conditions
try:
self.get_executor(job).handle_job_success(job)
except (RuleException, WorkflowError) as e:
# if an error occurs while processing job output,
# we do the same as in case of errors during execution
print_exception(e, self.workflow.linemaps)
self._handle_error(job)
return

self._tofinish.append((job, update_dynamic))

if update_resources:
# normal jobs have len=1, group jobs have len>1
self.finished_jobs += len(job)
self.running.remove(job)
self._free_resources(job)

if print_progress:
if job.is_group():
for j in job:
logger.job_finished(jobid=j.jobid)
else:
logger.job_finished(jobid=job.jobid)
self.progress()
self._tofinish.append(job)

if self.dryrun:
if not self.running:
if len(self.running) - len(self._tofinish) - len(self._toerror) <= 0:
# During dryrun, only release when all running jobs are done.
# This saves a lot of time, as self.open_jobs has to be
# evaluated less frequently.
Expand All @@ -601,7 +601,8 @@ def _proceed(

def _error(self, job):
with self._lock:
self._handle_error(job)
self._toerror.append(job)
self._open_jobs.release()

def _handle_error(self, job):
"""Clear jobs and stop the workflow.
Expand All @@ -625,7 +626,6 @@ def _handle_error(self, job):
self.failed.add(job)
if self.keepgoing:
logger.info("Job failed, going on with independent jobs.")
self._open_jobs.release()

def exit_gracefully(self, *args):
with self._lock:
Expand Down