Skip to content

Commit

Permalink
fix: always wait for input files before starting jobs, also upon loca…
Browse files Browse the repository at this point in the history
…l execution and within group jobs. This should add further robustness against NFS latency issues. (#1486)

* fix: always wait for input files before starting jobs, also upon local execution and within group jobs. This should add further robustness against NFS latency issues.

* try downgrading irods client
  • Loading branch information
johanneskoester committed Mar 16, 2022
1 parent adae8f1 commit cab2adb
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 42 deletions.
2 changes: 1 addition & 1 deletion snakemake/__init__.py
Expand Up @@ -599,6 +599,7 @@ def snakemake(
check_envvars=not lint, # for linting, we do not need to check whether requested envvars exist
all_temp=all_temp,
local_groupid=local_groupid,
latency_wait=latency_wait,
)
success = True

Expand Down Expand Up @@ -771,7 +772,6 @@ def snakemake(
archive=archive,
delete_all_output=delete_all_output,
delete_temp_output=delete_temp_output,
latency_wait=latency_wait,
wait_for_files=wait_for_files,
detailed_summary=detailed_summary,
nolock=not lock,
Expand Down
19 changes: 1 addition & 18 deletions snakemake/executors/__init__.py
Expand Up @@ -65,7 +65,6 @@ def __init__(
quiet=False,
printshellcmds=False,
printthreads=True,
latency_wait=3,
keepincomplete=False,
keepmetadata=True,
):
Expand All @@ -75,7 +74,7 @@ def __init__(
self.printreason = printreason
self.printshellcmds = printshellcmds
self.printthreads = printthreads
self.latency_wait = latency_wait
self.latency_wait = workflow.latency_wait
self.keepincomplete = keepincomplete
self.keepmetadata = keepmetadata

Expand Down Expand Up @@ -221,7 +220,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
assume_shared_fs=True,
keepincomplete=False,
keepmetadata=False,
Expand All @@ -232,7 +230,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand Down Expand Up @@ -424,7 +421,6 @@ def __init__(
quiet=False,
printshellcmds=False,
use_threads=False,
latency_wait=3,
cores=1,
keepincomplete=False,
keepmetadata=True,
Expand All @@ -435,7 +431,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand Down Expand Up @@ -659,7 +654,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand All @@ -680,7 +674,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
assume_shared_fs=assume_shared_fs,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down Expand Up @@ -957,7 +950,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
restart_times=0,
assume_shared_fs=True,
max_status_checks_per_second=1,
Expand Down Expand Up @@ -989,7 +981,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1322,7 +1313,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
restart_times=0,
assume_shared_fs=True,
keepincomplete=False,
Expand All @@ -1336,7 +1326,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1432,7 +1421,6 @@ def __init__(
printshellcmds=False,
drmaa_args="",
drmaa_log_dir=None,
latency_wait=3,
cluster_config=None,
restart_times=0,
assume_shared_fs=True,
Expand All @@ -1448,7 +1436,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1635,7 +1622,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand Down Expand Up @@ -1663,7 +1649,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
local_input=local_input,
restart_times=restart_times,
Expand Down Expand Up @@ -2092,7 +2077,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
local_input=None,
restart_times=None,
max_status_checks_per_second=1,
Expand Down Expand Up @@ -2147,7 +2131,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
local_input=local_input,
restart_times=restart_times,
exec_job=exec_job,
Expand Down
2 changes: 0 additions & 2 deletions snakemake/executors/ga4gh_tes.py
Expand Up @@ -28,7 +28,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand Down Expand Up @@ -83,7 +82,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
local_input=local_input,
restart_times=restart_times,
Expand Down
2 changes: 0 additions & 2 deletions snakemake/executors/google_lifesciences.py
Expand Up @@ -51,7 +51,6 @@ def __init__(
regions=None,
location=None,
cache=False,
latency_wait=3,
local_input=None,
restart_times=None,
exec_job=None,
Expand Down Expand Up @@ -127,7 +126,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
restart_times=restart_times,
exec_job=exec_job,
assume_shared_fs=False,
Expand Down
4 changes: 4 additions & 0 deletions snakemake/jobs.py
Expand Up @@ -21,6 +21,7 @@
_IOFile,
is_flagged,
get_flag_value,
wait_for_files,
)
from snakemake.utils import format, listfiles
from snakemake.exceptions import RuleException, ProtectedOutputException, WorkflowError
Expand Down Expand Up @@ -764,6 +765,9 @@ def prepare(self):
if self.benchmark:
self.benchmark.prepare()

# wait for input files
wait_for_files(self.input, latency_wait=self.dag.workflow.latency_wait)

if not self.is_shadow:
return

Expand Down
15 changes: 0 additions & 15 deletions snakemake/scheduler.py
Expand Up @@ -94,7 +94,6 @@ def __init__(
keepgoing=False,
max_jobs_per_second=None,
max_status_checks_per_second=100,
latency_wait=3,
greediness=1.0,
force_use_threads=False,
assume_shared_fs=True,
Expand Down Expand Up @@ -169,7 +168,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
)
elif touch:
self._executor = TouchExecutor(
Expand All @@ -178,7 +176,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
)
elif cluster or cluster_sync or (drmaa is not None):
if not workflow.immediate_submit:
Expand All @@ -191,7 +188,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down Expand Up @@ -219,7 +215,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
assume_shared_fs=assume_shared_fs,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -240,7 +235,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
assume_shared_fs=assume_shared_fs,
max_status_checks_per_second=max_status_checks_per_second,
Expand All @@ -255,7 +249,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -269,7 +262,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -283,7 +275,6 @@ def __init__(
quiet=quiet,
printshellcmds=printshellcmds,
use_threads=use_threads,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -300,7 +291,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand All @@ -312,7 +302,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
)

Expand All @@ -327,7 +316,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
preemption_default=preemption_default,
preemptible_rules=preemptible_rules,
)
Expand All @@ -339,7 +327,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
)
Expand All @@ -351,7 +338,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
tes_url=tes,
container_image=container_image,
)
Expand All @@ -365,7 +351,6 @@ def __init__(
quiet=quiet,
printshellcmds=printshellcmds,
use_threads=use_threads,
latency_wait=latency_wait,
cores=cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down
8 changes: 5 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -149,6 +149,7 @@ def __init__(
max_threads=None,
all_temp=False,
local_groupid="local",
latency_wait=3,
):
"""
Create the controller.
Expand Down Expand Up @@ -234,6 +235,7 @@ def __init__(
self.all_temp = all_temp
self.scheduler = None
self.local_groupid = local_groupid
self.latency_wait = latency_wait

_globals = globals()
_globals["workflow"] = self
Expand Down Expand Up @@ -599,7 +601,6 @@ def execute(
delete_all_output=False,
delete_temp_output=False,
detailed_summary=False,
latency_wait=3,
wait_for_files=None,
nolock=False,
unlock=False,
Expand Down Expand Up @@ -701,7 +702,9 @@ def files(items):

if wait_for_files is not None:
try:
snakemake.io.wait_for_files(wait_for_files, latency_wait=latency_wait)
snakemake.io.wait_for_files(
wait_for_files, latency_wait=self.latency_wait
)
except IOError as e:
logger.error(str(e))
return False
Expand Down Expand Up @@ -1024,7 +1027,6 @@ def files(items):
container_image=container_image,
printreason=printreason,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
greediness=greediness,
force_use_threads=force_use_threads,
assume_shared_fs=self.assume_shared_fs,
Expand Down
2 changes: 1 addition & 1 deletion test-environment.yml
Expand Up @@ -38,7 +38,7 @@ dependencies:
- ratelimiter
- configargparse
- appdirs
- python-irodsclient
- python-irodsclient <1.1.2 # bug in 1.1.2 leads to KeyError
- cwltool
- jsonschema
- pandas
Expand Down

0 comments on commit cab2adb

Please sign in to comment.