diff --git a/setup.py b/setup.py index cbcd40e41..46ebfafbc 100644 --- a/setup.py +++ b/setup.py @@ -79,6 +79,7 @@ "tabulate", "yte >=1.0,<2.0", "jinja2 >=3.0,<4.0", + "retry", ], extras_require={ "reports": ["jinja2", "networkx", "pygments", "pygraphviz"], @@ -92,7 +93,7 @@ "pep": [ "peppy", "eido", - ] + ], }, classifiers=[ "Development Status :: 5 - Production/Stable", diff --git a/snakemake/dag.py b/snakemake/dag.py index 620772bf7..137c71d7c 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -2247,18 +2247,19 @@ def get_outputs_with_changes(self, change_type): changed.extend(list(job.outputs_older_than_script_or_notebook())) return changed - def warn_about_changes(self): - for change_type in ["code", "input", "params"]: - changed = self.get_outputs_with_changes(change_type) - if changed: - rerun_trigger = "" - if not ON_WINDOWS: - rerun_trigger = f"\n To trigger a re-run, use 'snakemake -R $(snakemake --list-{change_type}-changes)'." - logger.warning( - f"The {change_type} used to generate one or several output files has changed:\n" - f" To inspect which output files have changes, run 'snakemake --list-{change_type}-changes'." - f"{rerun_trigger}" - ) + def warn_about_changes(self, quiet=False): + if not quiet: + for change_type in ["code", "input", "params"]: + changed = self.get_outputs_with_changes(change_type) + if changed: + rerun_trigger = "" + if not ON_WINDOWS: + rerun_trigger = f"\n To trigger a re-run, use 'snakemake -R $(snakemake --list-{change_type}-changes)'." + logger.warning( + f"The {change_type} used to generate one or several output files has changed:\n" + f" To inspect which output files have changes, run 'snakemake --list-{change_type}-changes'." + f"{rerun_trigger}" + ) def __str__(self): return self.dot() diff --git a/snakemake/remote/AzBlob.py b/snakemake/remote/AzBlob.py index c54565274..e43b7431a 100644 --- a/snakemake/remote/AzBlob.py +++ b/snakemake/remote/AzBlob.py @@ -15,7 +15,11 @@ # module specific from snakemake.exceptions import WorkflowError, AzureFileException -from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider +from snakemake.remote import ( + AbstractRemoteObject, + AbstractRemoteProvider, + AbstractRemoteRetryObject, +) # service provider support try: @@ -60,7 +64,7 @@ def available_protocols(self): return ["ab://"] -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): def __init__(self, *args, keep_local=False, provider=None, **kwargs): super(RemoteObject, self).__init__( *args, keep_local=keep_local, provider=provider, **kwargs @@ -95,7 +99,7 @@ def size(self): return self._as.blob_size(self.container_name, self.blob_name) return self._iofile.size_local - def download(self): + def _download(self): if self.exists(): os.makedirs(os.path.dirname(self.local_file()), exist_ok=True) self._as.download_from_azure_storage( @@ -105,7 +109,7 @@ def download(self): return self.local_file() return None - def upload(self): + def _upload(self): self._as.upload_to_azure_storage( container_name=self.container_name, blob_name=self.blob_name, diff --git a/snakemake/remote/EGA.py b/snakemake/remote/EGA.py index 43e334305..c1b92c4cf 100644 --- a/snakemake/remote/EGA.py +++ b/snakemake/remote/EGA.py @@ -14,9 +14,14 @@ from requests.auth import HTTPBasicAuth -from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider +from snakemake.remote import ( + AbstractRemoteObject, + AbstractRemoteProvider, + check_deprecated_retry, +) from snakemake.exceptions import WorkflowError from snakemake.common import lazy_property +from snakemake.logging import logger EGAFileInfo = namedtuple("EGAFileInfo", ["size", "status", "id", "checksum"]) @@ -30,7 +35,7 @@ def __init__( keep_local=False, stay_on_remote=False, is_default=False, - retry=5, + retry=None, **kwargs ): super().__init__( @@ -40,7 +45,7 @@ def __init__( is_default=is_default, **kwargs ) - self.retry = retry + check_deprecated_retry(retry) self._token = None self._expires = None self._file_cache = dict() @@ -193,7 +198,7 @@ def _credentials(cls, name): ) -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): # === Implementations of abstract class members === def _stats(self): return self.provider.get_files(self.parts.dataset)[self.parts.path] @@ -209,7 +214,7 @@ def mtime(self): # Hence, the files are always considered to be "ancient". return 0 - def download(self): + def _download(self): stats = self._stats() r = self.provider.api_request( diff --git a/snakemake/remote/FTP.py b/snakemake/remote/FTP.py index cf92a517b..fffa0c51a 100644 --- a/snakemake/remote/FTP.py +++ b/snakemake/remote/FTP.py @@ -179,7 +179,7 @@ def size(self): else: return self._iofile.size_local - def download(self, make_dest_dirs=True): + def _download(self, make_dest_dirs=True): with self.connection_pool.item() as ftpc: if self.exists(): # if the destination path does not exist @@ -197,7 +197,7 @@ def download(self, make_dest_dirs=True): "The file does not seem to exist remotely: %s" % self.local_file() ) - def upload(self): + def _upload(self): with self.connection_pool.item() as ftpc: ftpc.synchronize_times() ftpc.upload(source=self.local_path, target=self.remote_path) diff --git a/snakemake/remote/GS.py b/snakemake/remote/GS.py index 16213777e..47eac9743 100644 --- a/snakemake/remote/GS.py +++ b/snakemake/remote/GS.py @@ -220,7 +220,7 @@ def size(self): return self._iofile.size_local @retry.Retry(predicate=google_cloud_retry_predicate, deadline=600) - def download(self): + def _download(self): """Download with maximum retry duration of 600 seconds (10 minutes)""" if not self.exists(): return None @@ -251,7 +251,7 @@ def _download_directory(self): return self.local_file() @retry.Retry(predicate=google_cloud_retry_predicate) - def upload(self): + def _upload(self): try: if not self.bucket.exists(): self.bucket.create() diff --git a/snakemake/remote/HTTP.py b/snakemake/remote/HTTP.py index 5b398d058..6be5757f3 100644 --- a/snakemake/remote/HTTP.py +++ b/snakemake/remote/HTTP.py @@ -209,7 +209,7 @@ def size(self): else: return self._iofile.size_local - def download(self, make_dest_dirs=True): + def _download(self, make_dest_dirs=True): with self.httpr(stream=True) as httpr: if self.exists(): # Find out if the source file is gzip compressed in order to keep @@ -237,7 +237,7 @@ def download(self, make_dest_dirs=True): "The file does not seem to exist remotely: %s" % self.remote_file() ) - def upload(self): + def _upload(self): raise HTTPFileException( "Upload is not permitted for the HTTP remote provider. Is an output set to HTTP.remote()?" ) diff --git a/snakemake/remote/NCBI.py b/snakemake/remote/NCBI.py index bbc5f0c22..79d8a0518 100644 --- a/snakemake/remote/NCBI.py +++ b/snakemake/remote/NCBI.py @@ -139,7 +139,7 @@ def size(self): else: return self._iofile.size_local - def download(self): + def _download(self): if self.exists(): self._ncbi.fetch_from_ncbi( [self.accession], @@ -155,7 +155,7 @@ def download(self): "The record does not seem to exist remotely: %s" % self.accession ) - def upload(self): + def _upload(self): raise NCBIFileException( "Upload is not permitted for the NCBI remote provider. Is an output set to NCBI.RemoteProvider.remote()?" ) diff --git a/snakemake/remote/S3.py b/snakemake/remote/S3.py index 5c32c9753..6ddd3d891 100644 --- a/snakemake/remote/S3.py +++ b/snakemake/remote/S3.py @@ -11,7 +11,11 @@ import concurrent.futures # module-specific -from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider +from snakemake.remote import ( + AbstractRemoteObject, + AbstractRemoteProvider, + AbstractRemoteRetryObject, +) from snakemake.exceptions import WorkflowError, S3FileException from snakemake.utils import os_sync @@ -59,7 +63,7 @@ def available_protocols(self): return ["s3://"] -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): """This is a class to interact with the AWS S3 object store.""" def __init__(self, *args, keep_local=False, provider=None, **kwargs): @@ -97,11 +101,11 @@ def size(self): else: return self._iofile.size_local - def download(self): + def _download(self): self._s3c.download_from_s3(self.s3_bucket, self.s3_key, self.local_file()) os_sync() # ensure flush to disk - def upload(self): + def _upload(self): self._s3c.upload_to_s3( self.s3_bucket, self.local_file(), diff --git a/snakemake/remote/SFTP.py b/snakemake/remote/SFTP.py index ed4462ced..66fce04dc 100644 --- a/snakemake/remote/SFTP.py +++ b/snakemake/remote/SFTP.py @@ -120,7 +120,7 @@ def size(self): else: return self._iofile.size_local - def download(self, make_dest_dirs=True): + def _download(self, make_dest_dirs=True): with self.connection_pool.item() as sftpc: if self.exists(): # if the destination path does not exist @@ -156,7 +156,7 @@ def mkdir_remote_path(self): sftpc.mkdir(part) sftpc.chdir(part) - def upload(self): + def _upload(self): if self.provider.mkdir_remote: self.mkdir_remote_path() diff --git a/snakemake/remote/XRootD.py b/snakemake/remote/XRootD.py index 7b430058b..cc226ad83 100644 --- a/snakemake/remote/XRootD.py +++ b/snakemake/remote/XRootD.py @@ -8,7 +8,11 @@ import re from stat import S_ISREG -from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider +from snakemake.remote import ( + AbstractRemoteObject, + AbstractRemoteProvider, + AbstractRemoteRetryObject, +) from snakemake.exceptions import WorkflowError, XRootDFileException try: @@ -51,7 +55,7 @@ def available_protocols(self): return ["root://", "roots://", "rootk://"] -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): """This is a class to interact with XRootD servers.""" def __init__( @@ -89,11 +93,11 @@ def size(self): else: return self._iofile.size_local - def download(self): + def _download(self): assert not self.stay_on_remote self._xrd.copy(self.remote_file(), self.file()) - def upload(self): + def _upload(self): assert not self.stay_on_remote self._xrd.copy(self.file(), self.remote_file()) diff --git a/snakemake/remote/__init__.py b/snakemake/remote/__init__.py index f3d2d0c1b..e8148c7f3 100644 --- a/snakemake/remote/__init__.py +++ b/snakemake/remote/__init__.py @@ -9,8 +9,11 @@ import re from functools import partial from abc import ABCMeta, abstractmethod -from wrapt import ObjectProxy from contextlib import contextmanager +import shutil + +from wrapt import ObjectProxy +from retry import retry try: from connection_pool import ConnectionPool @@ -22,6 +25,7 @@ import collections # module-specific +from snakemake.exceptions import WorkflowError import snakemake.io from snakemake.logging import logger from snakemake.common import parse_uri @@ -225,12 +229,29 @@ def mtime(self): def size(self): pass + def download(self): + try: + return self._download() + except Exception as e: + local_path = self.local_file() + if os.path.exists(local_path): + if os.path.isdir(local_path): + shutil.rmtree(local_path) + os.remove(local_path) + raise WorkflowError(e) + + def upload(self): + try: + self._upload() + except Exception as e: + raise WorkflowError(e) + @abstractmethod - def download(self, *args, **kwargs): + def _download(self, *args, **kwargs): pass @abstractmethod - def upload(self, *args, **kwargs): + def _upload(self, *args, **kwargs): pass @abstractmethod @@ -253,7 +274,17 @@ def local_touch_or_create(self): self._iofile.touch_or_create() -class DomainObject(AbstractRemoteObject): +class AbstractRemoteRetryObject(AbstractRemoteObject): + @retry(tries=3, delay=3, backoff=2, logger=logger) + def download(self): + return super().download() + + @retry(tries=3, delay=3, backoff=2, logger=logger) + def upload(self): + super().upload() + + +class DomainObject(AbstractRemoteRetryObject): """This is a mixin related to parsing components out of a location path specified as (host|IP):port/remote/location @@ -474,3 +505,10 @@ def remote(self, value, *args, provider_kws=None, **kwargs): AUTO = AutoRemoteProvider() + + +def check_deprecated_retry(retry): + if retry: + logger.warning( + "Using deprecated retry argument. Snakemake now always uses 3 retries on download and upload." + ) diff --git a/snakemake/remote/dropbox.py b/snakemake/remote/dropbox.py index 3468ae67d..44d0a5d54 100644 --- a/snakemake/remote/dropbox.py +++ b/snakemake/remote/dropbox.py @@ -55,7 +55,7 @@ def available_protocols(self): return ["dropbox://"] -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): """This is a class to interact with the Dropbox API.""" def __init__(self, *args, keep_local=False, provider=None, **kwargs): @@ -100,7 +100,7 @@ def size(self): else: return self._iofile.size_local - def download(self, make_dest_dirs=True): + def _download(self, make_dest_dirs=True): if self.exists(): # if the destination path does not exist, make it if make_dest_dirs: @@ -115,7 +115,7 @@ def download(self, make_dest_dirs=True): "The file does not seem to exist remotely: %s" % self.dropbox_file() ) - def upload(self, mode=dropbox.files.WriteMode("overwrite")): + def _upload(self, mode=dropbox.files.WriteMode("overwrite")): # Chunk file into 10MB slices because Dropbox does not accept more than 150MB chunks chunksize = 10000000 with open(self.local_file(), mode="rb") as f: diff --git a/snakemake/remote/gfal.py b/snakemake/remote/gfal.py index bfa3be924..605de209d 100644 --- a/snakemake/remote/gfal.py +++ b/snakemake/remote/gfal.py @@ -10,7 +10,12 @@ from datetime import datetime import time -from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider +from snakemake.remote import ( + AbstractRemoteObject, + AbstractRemoteProvider, + AbstractRemoteRetryObject, + check_deprecated_retry, +) from snakemake.exceptions import WorkflowError from snakemake.common import lazy_property from snakemake.logging import logger @@ -34,7 +39,7 @@ def __init__( keep_local=False, stay_on_remote=False, is_default=False, - retry=5, + retry=None, **kwargs ): super(RemoteProvider, self).__init__( @@ -44,7 +49,7 @@ def __init__( is_default=is_default, **kwargs ) - self.retry = retry + check_deprecated_retry(retry) @property def default_protocol(self): @@ -58,7 +63,7 @@ def available_protocols(self): return ["gsiftp://", "srm://"] -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): mtime_re = re.compile(r"^\s*Modify: (.+)$", flags=re.MULTILINE) size_re = re.compile(r"^\s*Size: ([0-9]+).*$", flags=re.MULTILINE) @@ -68,35 +73,26 @@ def __init__(self, *args, keep_local=False, provider=None, **kwargs): ) def _gfal(self, cmd, *args, retry=None, raise_workflow_error=True): - if retry is None: - retry = self.provider.retry + check_deprecated_retry(retry) _cmd = ["gfal-" + cmd] + list(args) - for i in range(retry + 1): - try: - logger.debug(_cmd) - return sp.run( - _cmd, check=True, stderr=sp.PIPE, stdout=sp.PIPE - ).stdout.decode() - except sp.CalledProcessError as e: - if i == retry: - if raise_workflow_error: - raise WorkflowError( - "Error calling gfal-{}:\n{}".format(cmd, e.stderr.decode()) - ) - else: - raise e - else: - # try again after some seconds - time.sleep(1) - continue + try: + logger.debug(_cmd) + return sp.run( + _cmd, check=True, stderr=sp.PIPE, stdout=sp.PIPE + ).stdout.decode() + except sp.CalledProcessError as e: + if raise_workflow_error: + raise WorkflowError( + "Error calling gfal-{}:\n{}".format(cmd, e.stderr.decode()) + ) + else: + raise e # === Implementations of abstract class members === def exists(self): try: - self._gfal( - "ls", "-a", self.remote_file(), retry=0, raise_workflow_error=False - ) + self._gfal("ls", "-a", self.remote_file(), raise_workflow_error=False) except sp.CalledProcessError as e: if e.returncode == 2: # exit code 2 means no such file or directory @@ -125,7 +121,7 @@ def size(self): size = self.size_re.search(stat).group(1) return int(size) - def download(self): + def _download(self): if self.exists(): if self.size() == 0: # Globus erroneously thinks that a transfer is incomplete if a @@ -145,7 +141,7 @@ def download(self): return self.local_file() return None - def upload(self): + def _upload(self): target = self.remote_file() source = "file://" + os.path.abspath(self.local_file()) # disable all timeouts (file transfers can take a long time) diff --git a/snakemake/remote/gridftp.py b/snakemake/remote/gridftp.py index e45168788..068969d56 100644 --- a/snakemake/remote/gridftp.py +++ b/snakemake/remote/gridftp.py @@ -34,27 +34,18 @@ class RemoteProvider(gfal.RemoteProvider): class RemoteObject(gfal.RemoteObject): def _globus(self, *args): - retry = self.provider.retry cmd = ["globus-url-copy"] + list(args) - for i in range(retry + 1): - try: - logger.debug(" ".join(cmd)) - return sp.run( - cmd, check=True, stderr=sp.PIPE, stdout=sp.PIPE - ).stdout.decode() - except sp.CalledProcessError as e: - if i == retry: - raise WorkflowError( - "Error calling globus-url-copy:\n{}".format( - cmd, e.stderr.decode() - ) - ) - else: - # try again after some seconds - time.sleep(1) - continue - - def download(self): + try: + logger.debug(" ".join(cmd)) + return sp.run( + cmd, check=True, stderr=sp.PIPE, stdout=sp.PIPE + ).stdout.decode() + except sp.CalledProcessError as e: + raise WorkflowError( + "Error calling globus-url-copy:\n{}".format(cmd, e.stderr.decode()) + ) + + def _download(self): if self.exists(): if self.size() == 0: # Globus erroneously thinks that a transfer is incomplete if a @@ -73,7 +64,7 @@ def download(self): return self.local_file() return None - def upload(self): + def _upload(self): target = self.remote_file() source = "file://" + os.path.abspath(self.local_file()) diff --git a/snakemake/remote/iRODS.py b/snakemake/remote/iRODS.py index e433321ae..703c2a58e 100644 --- a/snakemake/remote/iRODS.py +++ b/snakemake/remote/iRODS.py @@ -11,7 +11,11 @@ from pytz import timezone # module-specific -from snakemake.remote import AbstractRemoteProvider, AbstractRemoteObject +from snakemake.remote import ( + AbstractRemoteProvider, + AbstractRemoteObject, + AbstractRemoteRetryObject, +) from snakemake.exceptions import WorkflowError from snakemake.utils import os_sync @@ -87,7 +91,7 @@ def available_protocols(self): return ["irods://"] -class RemoteObject(AbstractRemoteObject): +class RemoteObject(AbstractRemoteRetryObject): """This is a class to interact with an iRODS server.""" def __init__(self, *args, keep_local=False, provider=None, **kwargs): @@ -169,7 +173,7 @@ def size(self): else: return self._iofile.size_local - def download(self, make_dest_dirs=True): + def _download(self, make_dest_dirs=True): if self.exists(): if make_dest_dirs: os.makedirs(os.path.dirname(self.local_path), exist_ok=True) @@ -190,7 +194,7 @@ def download(self, make_dest_dirs=True): "The file does not seem to exist remotely: %s" % self.local_file() ) - def upload(self): + def _upload(self): # get current local timestamp stat = os.stat(self.local_path) diff --git a/snakemake/remote/webdav.py b/snakemake/remote/webdav.py index 060c96dae..60fd63c8e 100644 --- a/snakemake/remote/webdav.py +++ b/snakemake/remote/webdav.py @@ -141,7 +141,7 @@ def size(self): else: return self._iofile.size_local - def download(self, make_dest_dirs=True): + def _download(self, make_dest_dirs=True): if self.exists(): # if the destination path does not exist, make it if make_dest_dirs: @@ -156,7 +156,7 @@ def download(self, make_dest_dirs=True): "The file does not seem to exist remotely: %s" % self.webdav_file ) - def upload(self): + def _upload(self): # make containing folder with self.webdavc() as webdavc: self.loop.run_until_complete( diff --git a/snakemake/workflow.py b/snakemake/workflow.py index e9b2f6fca..250dfedbe 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -1027,7 +1027,7 @@ def files(items): ) if not dryrun: - dag.warn_about_changes() + dag.warn_about_changes(quiet) if len(dag): shell_exec = shell.get_executable() if shell_exec is not None: @@ -1073,7 +1073,7 @@ def files(items): logger.info(NOTHING_TO_BE_DONE_MSG) else: # the dryrun case - dag.warn_about_changes() + dag.warn_about_changes(quiet) if len(dag): logger.run_info("\n".join(dag.stats())) else: @@ -1099,12 +1099,12 @@ def files(items): "This was a dry-run (flag -n). The order of jobs " "does not reflect the order of execution." ) - dag.warn_about_changes() + dag.warn_about_changes(quiet) logger.remove_logfile() else: if stats: self.scheduler.stats.to_json(stats) - dag.warn_about_changes() + dag.warn_about_changes(quiet) logger.logfile_hint() if not dryrun and not no_hooks: self._onsuccess(logger.get_logfile()) @@ -1112,7 +1112,7 @@ def files(items): else: if not dryrun and not no_hooks: self._onerror(logger.get_logfile()) - dag.warn_about_changes() + dag.warn_about_changes(quiet) logger.logfile_hint() return False diff --git a/test-environment.yml b/test-environment.yml index df82929e8..d1fcf797e 100644 --- a/test-environment.yml +++ b/test-environment.yml @@ -63,3 +63,4 @@ dependencies: - smart_open - filelock - tabulate + - retry