Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: always wait for input files before starting jobs, also upon local execution and within group jobs. This should add further robustness against NFS latency issues. #1486

Merged
merged 3 commits into from Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion snakemake/__init__.py
Expand Up @@ -599,6 +599,7 @@ def snakemake(
check_envvars=not lint, # for linting, we do not need to check whether requested envvars exist
all_temp=all_temp,
local_groupid=local_groupid,
latency_wait=latency_wait,
)
success = True

Expand Down Expand Up @@ -771,7 +772,6 @@ def snakemake(
archive=archive,
delete_all_output=delete_all_output,
delete_temp_output=delete_temp_output,
latency_wait=latency_wait,
wait_for_files=wait_for_files,
detailed_summary=detailed_summary,
nolock=not lock,
Expand Down
19 changes: 1 addition & 18 deletions snakemake/executors/__init__.py
Expand Up @@ -65,7 +65,6 @@ def __init__(
quiet=False,
printshellcmds=False,
printthreads=True,
latency_wait=3,
keepincomplete=False,
keepmetadata=True,
):
Expand All @@ -75,7 +74,7 @@ def __init__(
self.printreason = printreason
self.printshellcmds = printshellcmds
self.printthreads = printthreads
self.latency_wait = latency_wait
self.latency_wait = workflow.latency_wait
self.keepincomplete = keepincomplete
self.keepmetadata = keepmetadata

Expand Down Expand Up @@ -221,7 +220,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
assume_shared_fs=True,
keepincomplete=False,
keepmetadata=False,
Expand All @@ -232,7 +230,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand Down Expand Up @@ -424,7 +421,6 @@ def __init__(
quiet=False,
printshellcmds=False,
use_threads=False,
latency_wait=3,
cores=1,
keepincomplete=False,
keepmetadata=True,
Expand All @@ -435,7 +431,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand Down Expand Up @@ -659,7 +654,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand All @@ -680,7 +674,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
assume_shared_fs=assume_shared_fs,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down Expand Up @@ -957,7 +950,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
restart_times=0,
assume_shared_fs=True,
max_status_checks_per_second=1,
Expand Down Expand Up @@ -989,7 +981,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1322,7 +1313,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
restart_times=0,
assume_shared_fs=True,
keepincomplete=False,
Expand All @@ -1336,7 +1326,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1432,7 +1421,6 @@ def __init__(
printshellcmds=False,
drmaa_args="",
drmaa_log_dir=None,
latency_wait=3,
cluster_config=None,
restart_times=0,
assume_shared_fs=True,
Expand All @@ -1448,7 +1436,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1635,7 +1622,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand Down Expand Up @@ -1663,7 +1649,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
local_input=local_input,
restart_times=restart_times,
Expand Down Expand Up @@ -2092,7 +2077,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
local_input=None,
restart_times=None,
max_status_checks_per_second=1,
Expand Down Expand Up @@ -2147,7 +2131,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
local_input=local_input,
restart_times=restart_times,
exec_job=exec_job,
Expand Down
2 changes: 0 additions & 2 deletions snakemake/executors/ga4gh_tes.py
Expand Up @@ -28,7 +28,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand Down Expand Up @@ -83,7 +82,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
local_input=local_input,
restart_times=restart_times,
Expand Down
2 changes: 0 additions & 2 deletions snakemake/executors/google_lifesciences.py
Expand Up @@ -51,7 +51,6 @@ def __init__(
regions=None,
location=None,
cache=False,
latency_wait=3,
local_input=None,
restart_times=None,
exec_job=None,
Expand Down Expand Up @@ -127,7 +126,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
restart_times=restart_times,
exec_job=exec_job,
assume_shared_fs=False,
Expand Down
4 changes: 4 additions & 0 deletions snakemake/jobs.py
Expand Up @@ -21,6 +21,7 @@
_IOFile,
is_flagged,
get_flag_value,
wait_for_files,
)
from snakemake.utils import format, listfiles
from snakemake.exceptions import RuleException, ProtectedOutputException, WorkflowError
Expand Down Expand Up @@ -764,6 +765,9 @@ def prepare(self):
if self.benchmark:
self.benchmark.prepare()

# wait for input files
wait_for_files(self.input, latency_wait=self.dag.workflow.latency_wait)

if not self.is_shadow:
return

Expand Down
15 changes: 0 additions & 15 deletions snakemake/scheduler.py
Expand Up @@ -94,7 +94,6 @@ def __init__(
keepgoing=False,
max_jobs_per_second=None,
max_status_checks_per_second=100,
latency_wait=3,
greediness=1.0,
force_use_threads=False,
assume_shared_fs=True,
Expand Down Expand Up @@ -169,7 +168,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
)
elif touch:
self._executor = TouchExecutor(
Expand All @@ -178,7 +176,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
)
elif cluster or cluster_sync or (drmaa is not None):
if not workflow.immediate_submit:
Expand All @@ -191,7 +188,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down Expand Up @@ -219,7 +215,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
assume_shared_fs=assume_shared_fs,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -240,7 +235,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
assume_shared_fs=assume_shared_fs,
max_status_checks_per_second=max_status_checks_per_second,
Expand All @@ -255,7 +249,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -269,7 +262,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -283,7 +275,6 @@ def __init__(
quiet=quiet,
printshellcmds=printshellcmds,
use_threads=use_threads,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -300,7 +291,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand All @@ -312,7 +302,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
)

Expand All @@ -327,7 +316,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
preemption_default=preemption_default,
preemptible_rules=preemptible_rules,
)
Expand All @@ -339,7 +327,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
)
Expand All @@ -351,7 +338,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
tes_url=tes,
container_image=container_image,
)
Expand All @@ -365,7 +351,6 @@ def __init__(
quiet=quiet,
printshellcmds=printshellcmds,
use_threads=use_threads,
latency_wait=latency_wait,
cores=cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down
8 changes: 5 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -149,6 +149,7 @@ def __init__(
max_threads=None,
all_temp=False,
local_groupid="local",
latency_wait=3,
):
"""
Create the controller.
Expand Down Expand Up @@ -234,6 +235,7 @@ def __init__(
self.all_temp = all_temp
self.scheduler = None
self.local_groupid = local_groupid
self.latency_wait = latency_wait

_globals = globals()
_globals["workflow"] = self
Expand Down Expand Up @@ -599,7 +601,6 @@ def execute(
delete_all_output=False,
delete_temp_output=False,
detailed_summary=False,
latency_wait=3,
wait_for_files=None,
nolock=False,
unlock=False,
Expand Down Expand Up @@ -701,7 +702,9 @@ def files(items):

if wait_for_files is not None:
try:
snakemake.io.wait_for_files(wait_for_files, latency_wait=latency_wait)
snakemake.io.wait_for_files(
wait_for_files, latency_wait=self.latency_wait
)
except IOError as e:
logger.error(str(e))
return False
Expand Down Expand Up @@ -1024,7 +1027,6 @@ def files(items):
container_image=container_image,
printreason=printreason,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
greediness=greediness,
force_use_threads=force_use_threads,
assume_shared_fs=self.assume_shared_fs,
Expand Down
2 changes: 1 addition & 1 deletion test-environment.yml
Expand Up @@ -38,7 +38,7 @@ dependencies:
- ratelimiter
- configargparse
- appdirs
- python-irodsclient
- python-irodsclient <1.1.2 # bug in 1.1.2 leads to KeyError
- cwltool
- jsonschema
- pandas
Expand Down