Skip to content

Commit

Permalink
fix: potential memory corruption caused by Google storage objects acc…
Browse files Browse the repository at this point in the history
…essed from different threads (#1174)

* fix: memory corruption caused by Google storage objects accessed from different threads

* invoke error job handling

* clear toerror jobs
  • Loading branch information
johanneskoester committed Sep 24, 2021
1 parent bdb75f8 commit 41a5071
Showing 1 changed file with 47 additions and 47 deletions.
94 changes: 47 additions & 47 deletions snakemake/scheduler.py
Expand Up @@ -118,6 +118,11 @@ def __init__(
self.scheduler_type = scheduler_type
self.scheduler_ilp_solver = scheduler_ilp_solver
self._tofinish = []
self._toerror = []
self.handle_job_success = True
self.update_resources = True
self.print_progress = not self.quiet and not self.dryrun
self.update_dynamic = not self.dryrun

self.global_resources = {
name: (sys.maxsize if res is None else res)
Expand Down Expand Up @@ -145,11 +150,7 @@ def __init__(
self._job_queue = None
self._last_job_selection_empty = False
self._submit_callback = self._noop
self._finish_callback = partial(
self._proceed,
update_dynamic=not self.dryrun,
print_progress=not self.quiet and not self.dryrun,
)
self._finish_callback = self._proceed

self._local_executor = None
if dryrun:
Expand Down Expand Up @@ -212,13 +213,10 @@ def __init__(
keepmetadata=keepmetadata,
)
if workflow.immediate_submit:
self._submit_callback = partial(
self._proceed,
update_dynamic=False,
print_progress=False,
update_resources=False,
handle_job_success=False,
)
self.update_dynamic = False
self.print_progress = False
self.update_resources = False
self.handle_job_success = False
else:
self._executor = DRMAAExecutor(
workflow,
Expand Down Expand Up @@ -440,6 +438,7 @@ def schedule(self):
# obtain needrun and running jobs in a thread-safe way
with self._lock:
self._finish_jobs()
self._error_jobs()
needrun = set(self.open_jobs)
running = list(self.running)
errors = self._errors
Expand Down Expand Up @@ -524,10 +523,40 @@ def schedule(self):

def _finish_jobs(self):
# must be called from within lock
for job, update_dynamic in self._tofinish:
self.dag.finish(job, update_dynamic=update_dynamic)
for job in self._tofinish:
if self.handle_job_success:
try:
self.get_executor(job).handle_job_success(job)
except (RuleException, WorkflowError) as e:
# if an error occurs while processing job output,
# we do the same as in case of errors during execution
print_exception(e, self.workflow.linemaps)
self._handle_error(job)
return

if self.update_resources:
# normal jobs have len=1, group jobs have len>1
self.finished_jobs += len(job)
self.running.remove(job)
self._free_resources(job)

if self.print_progress:
if job.is_group():
for j in job:
logger.job_finished(jobid=j.jobid)
else:
logger.job_finished(jobid=job.jobid)
self.progress()

self.dag.finish(job, update_dynamic=self.update_dynamic)
self._tofinish.clear()

def _error_jobs(self):
# must be called from within lock
for job in self._toerror:
self._handle_error(job)
self._toerror.clear()

def run(self, jobs, executor=None):
if executor is None:
executor = self._executor
Expand Down Expand Up @@ -555,42 +584,13 @@ def _free_resources(self, job):
def _proceed(
self,
job,
update_dynamic=True,
print_progress=False,
update_resources=True,
handle_job_success=True,
):
"""Do stuff after job is finished."""
with self._lock:
if handle_job_success:
# by calling this behind the lock, we avoid race conditions
try:
self.get_executor(job).handle_job_success(job)
except (RuleException, WorkflowError) as e:
# if an error occurs while processing job output,
# we do the same as in case of errors during execution
print_exception(e, self.workflow.linemaps)
self._handle_error(job)
return

self._tofinish.append((job, update_dynamic))

if update_resources:
# normal jobs have len=1, group jobs have len>1
self.finished_jobs += len(job)
self.running.remove(job)
self._free_resources(job)

if print_progress:
if job.is_group():
for j in job:
logger.job_finished(jobid=j.jobid)
else:
logger.job_finished(jobid=job.jobid)
self.progress()
self._tofinish.append(job)

if self.dryrun:
if not self.running:
if len(self.running) - len(self._tofinish) - len(self._toerror) <= 0:
# During dryrun, only release when all running jobs are done.
# This saves a lot of time, as self.open_jobs has to be
# evaluated less frequently.
Expand All @@ -601,7 +601,8 @@ def _proceed(

def _error(self, job):
with self._lock:
self._handle_error(job)
self._toerror.append(job)
self._open_jobs.release()

def _handle_error(self, job):
"""Clear jobs and stop the workflow.
Expand All @@ -625,7 +626,6 @@ def _handle_error(self, job):
self.failed.add(job)
if self.keepgoing:
logger.info("Job failed, going on with independent jobs.")
self._open_jobs.release()

def exit_gracefully(self, *args):
with self._lock:
Expand Down

0 comments on commit 41a5071

Please sign in to comment.