From 0dbbb8ac17a4b632707485ee6c7cc15e4670efaa Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 11 Jun 2021 15:26:18 -0700 Subject: [PATCH] feat: media operation retries can be configured using the same interface as with non-media operation (#447) All media operation calls (downloads and uploads) can be configured with Retry objects and ConditionalRetryPolicy objects, nearly identically to non-media operations. This is accomplished by converting the Retry object to a google-resumable-media-python library RetryStrategy object at the point of entry to that library. Custom predicates of Retry objects (for instance set with Retry(predicate=...)) are not supported for media operations; they will be replaced with a media-operation-specific predicate. This change is backwards-compatible for users of public methods using num_retries arguments to configure uploads; num_retries continue to be supported but the deprecation warning remains in effect. They will be fully removed and replaced with Retry objects in the future. With this change, the default parameters for a media operations retry changes to be uniform with non-media operation retries. Specifically, the retry deadline for media operation retries becomes 120 seconds unless otherwise configured. --- google/cloud/storage/_helpers.py | 41 ++++ google/cloud/storage/blob.py | 350 ++++++++++++++++++++++++++++--- google/cloud/storage/client.py | 39 ++++ google/cloud/storage/fileio.py | 103 +++++++-- setup.py | 2 +- tests/unit/test__helpers.py | 39 ++++ tests/unit/test_blob.py | 335 +++++++++++++++++++++++++---- tests/unit/test_client.py | 50 ++++- tests/unit/test_fileio.py | 268 +++++++++++++++++++++-- 9 files changed, 1127 insertions(+), 100 deletions(-) diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 9e09fc9f2..04671035b 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -23,6 +23,7 @@ import os from six.moves.urllib.parse import urlsplit +from google import resumable_media from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED @@ -45,6 +46,12 @@ ("if_source_metageneration_not_match", "ifSourceMetagenerationNotMatch"), ) +_NUM_RETRIES_MESSAGE = ( + "`num_retries` has been deprecated and will be removed in a future " + "release. Use the `retry` argument with a Retry or ConditionalRetryPolicy " + "object, or None, instead." +) + def _get_storage_host(): return os.environ.get(STORAGE_EMULATOR_ENV_VAR, _DEFAULT_STORAGE_HOST) @@ -524,3 +531,37 @@ def _bucket_bound_hostname_url(host, scheme=None): return host return "{scheme}://{host}/".format(scheme=scheme, host=host) + + +def _api_core_retry_to_resumable_media_retry(retry, num_retries=None): + """Convert google.api.core.Retry to google.resumable_media.RetryStrategy. + + Custom predicates are not translated. + + :type retry: google.api_core.Retry + :param retry: (Optional) The google.api_core.Retry object to translate. + + :type num_retries: int + :param num_retries: (Optional) The number of retries desired. This is + supported for backwards compatibility and is mutually exclusive with + `retry`. + + :rtype: google.resumable_media.RetryStrategy + :returns: A RetryStrategy with all applicable attributes copied from input, + or a RetryStrategy with max_retries set to 0 if None was input. + """ + + if retry is not None and num_retries is not None: + raise ValueError("num_retries and retry arguments are mutually exclusive") + + elif retry is not None: + return resumable_media.RetryStrategy( + max_sleep=retry._maximum, + max_cumulative_retry=retry._deadline, + initial_delay=retry._initial, + multiplier=retry._multiplier, + ) + elif num_retries is not None: + return resumable_media.RetryStrategy(max_retries=num_retries) + else: + return resumable_media.RetryStrategy(max_retries=0) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 3fb8a59b9..597e63ca4 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -65,8 +65,10 @@ from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud.storage._helpers import _raise_if_more_than_one_set +from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry from google.cloud.storage._signing import generate_signed_url_v2 from google.cloud.storage._signing import generate_signed_url_v4 +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage.acl import ACL from google.cloud.storage.acl import ObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -76,9 +78,11 @@ from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS from google.cloud.storage.constants import STANDARD_STORAGE_CLASS +from google.cloud.storage.retry import ConditionalRetryPolicy from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED from google.cloud.storage.fileio import BlobReader from google.cloud.storage.fileio import BlobWriter @@ -105,17 +109,6 @@ "name", "storageClass", ) -_NUM_RETRIES_MESSAGE = ( - "`num_retries` has been deprecated and will be removed in a future " - "release. The default behavior (when `num_retries` is not specified) when " - "a transient error (e.g. 429 Too Many Requests or 500 Internal Server " - "Error) occurs will be as follows: upload requests will be automatically " - "retried if and only if `if_metageneration_match` is specified (thus " - "making the upload idempotent). Subsequent retries will be sent after " - "waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until 10 minutes " - "of wait time have elapsed. At that point, there will be no more attempts " - "to retry." -) _READ_LESS_THAN_SIZE = ( "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining." ) @@ -892,6 +885,7 @@ def _do_download( raw_download=False, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=None, ): """Perform a download without any error handling. @@ -938,7 +932,25 @@ def _do_download( downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. """ + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + if self.chunk_size is None: if raw_download: klass = RawDownload @@ -953,6 +965,7 @@ def _do_download( end=end, checksum=checksum, ) + download._retry_strategy = retry_strategy response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) else: @@ -975,6 +988,7 @@ def _do_download( end=end, ) + download._retry_strategy = retry_strategy while not download.finished: download.consume_next_chunk(transport, timeout=timeout) @@ -991,6 +1005,7 @@ def download_to_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """DEPRECATED. Download the contents of this blob into a file-like object. @@ -1077,6 +1092,28 @@ def download_to_file( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1093,6 +1130,7 @@ def download_to_file( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def download_to_filename( @@ -1108,6 +1146,7 @@ def download_to_filename( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of this blob into a named file. @@ -1170,6 +1209,28 @@ def download_to_filename( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1187,6 +1248,7 @@ def download_to_filename( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) except resumable_media.DataCorruption: # Delete the corrupt downloaded file. @@ -1213,6 +1275,7 @@ def download_as_bytes( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of this blob as a bytes object. @@ -1272,6 +1335,28 @@ def download_as_bytes( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: bytes :returns: The data stored in this blob. @@ -1291,6 +1376,7 @@ def download_as_bytes( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) return string_buffer.getvalue() @@ -1305,6 +1391,7 @@ def download_as_string( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ): """(Deprecated) Download the contents of this blob as a bytes object. @@ -1356,6 +1443,28 @@ def download_as_string( (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: bytes :returns: The data stored in this blob. @@ -1377,6 +1486,7 @@ def download_as_string( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + retry=retry, ) def download_as_text( @@ -1391,6 +1501,7 @@ def download_as_text( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ): """Download the contents of this blob as text (*not* bytes). @@ -1445,6 +1556,28 @@ def download_as_text( (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: text :returns: The data stored in this blob, decoded to text. """ @@ -1458,6 +1591,7 @@ def download_as_text( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + retry=retry, ) if encoding is not None: @@ -1570,6 +1704,7 @@ def _do_multipart_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Perform a multipart upload. @@ -1645,6 +1780,21 @@ def _do_multipart_upload( manually-set checksum value. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -1706,10 +1856,9 @@ def _do_multipart_upload( upload_url = _add_query_parameters(base_url, name_value_pairs) upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries - ) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry( + retry, num_retries + ) response = upload.transmit( transport, data, object_metadata, content_type, timeout=timeout @@ -1733,6 +1882,7 @@ def _initiate_resumable_upload( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Initiate a resumable upload. @@ -1823,6 +1973,21 @@ def _initiate_resumable_upload( delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: tuple :returns: Pair of @@ -1887,10 +2052,9 @@ def _initiate_resumable_upload( upload_url, chunk_size, headers=headers, checksum=checksum ) - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries - ) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry( + retry, num_retries + ) upload.initiate( transport, @@ -1918,6 +2082,7 @@ def _do_resumable_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Perform a resumable upload. @@ -1998,6 +2163,21 @@ def _do_resumable_upload( delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. @@ -2015,6 +2195,7 @@ def _do_resumable_upload( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) while not upload.finished: @@ -2041,6 +2222,7 @@ def _do_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Determine an upload strategy and then perform the upload. @@ -2125,19 +2307,45 @@ def _do_upload( attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the **final** response in the resumable case. """ - if if_metageneration_match is None and num_retries is None: - # Uploads are only idempotent (safe to retry) if - # if_metageneration_match is set. If it is not set, the default - # num_retries should be 0. Note: Because retry logic for uploads is - # provided by the google-resumable-media-python package, it doesn't - # use the ConditionalRetryStrategy class used in other API calls in - # this library to solve this problem. - num_retries = 0 + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) if size is not None and size <= _MAX_MULTIPART_SIZE: response = self._do_multipart_upload( @@ -2153,6 +2361,7 @@ def _do_upload( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) else: response = self._do_resumable_upload( @@ -2168,6 +2377,7 @@ def _do_upload( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) return response.json() @@ -2187,6 +2397,7 @@ def upload_from_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload the contents of this blob from a file-like object. @@ -2301,6 +2512,28 @@ def upload_from_file( attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. @@ -2310,6 +2543,11 @@ def upload_from_file( """ if num_retries is not None: warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) + # num_retries and retry are mutually exclusive. If num_retries is + # set and retry is exactly the default, then nullify retry for + # backwards compatibility. + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = None _maybe_rewind(file_obj, rewind=rewind) predefined_acl = ACL.validate_predefined(predefined_acl) @@ -2328,6 +2566,7 @@ def upload_from_file( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: @@ -2346,6 +2585,7 @@ def upload_from_filename( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload this blob's contents from the content of a named file. @@ -2434,6 +2674,28 @@ def upload_from_filename( google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. """ content_type = self._get_content_type(content_type, filename=filename) @@ -2452,6 +2714,7 @@ def upload_from_filename( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def upload_from_string( @@ -2467,6 +2730,7 @@ def upload_from_string( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload contents of this blob from the provided string. @@ -2551,6 +2815,28 @@ def upload_from_string( google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. """ data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) @@ -2567,6 +2853,7 @@ def upload_from_string( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def create_resumable_upload_session( @@ -3371,9 +3658,12 @@ def open( :param kwargs: Keyword arguments to pass to the underlying API calls. For both uploads and downloads, the following arguments are supported: "if_generation_match", "if_generation_not_match", - "if_metageneration_match", "if_metageneration_not_match", "timeout". - For uploads only, the following additional arguments are supported: - "content_type", "num_retries", "predefined_acl", "checksum". + "if_metageneration_match", "if_metageneration_not_match", "timeout", + "retry". For uploads only, the following additional arguments are + supported: "content_type", "num_retries", "predefined_acl", + "checksum". "num_retries" is supported for backwards-compatibility + reasons only; please use "retry" with a Retry object or + ConditionalRetryPolicy instead. :returns: A 'BlobReader' or 'BlobWriter' from 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index a9a06746a..df42f0c11 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -53,6 +53,7 @@ from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import ConditionalRetryPolicy _marker = object() @@ -972,6 +973,7 @@ def download_blob_to_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of a blob object or blob URI into a file-like object. @@ -1021,6 +1023,27 @@ def download_blob_to_file( downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + retry (google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy) + (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. Examples: Download a blob using a blob resource. @@ -1046,6 +1069,19 @@ def download_blob_to_file( """ + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) + if not isinstance(blob_or_uri, Blob): blob_or_uri = Blob.from_string(blob_or_uri) download_url = blob_or_uri._get_download_url( @@ -1070,6 +1106,7 @@ def download_blob_to_file( raw_download, timeout=timeout, checksum=checksum, + retry=retry, ) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) @@ -1222,6 +1259,8 @@ def list_blobs( max_results=max_results, extra_params=extra_params, page_start=_blobs_page_start, + timeout=timeout, + retry=retry, ) iterator.bucket = bucket iterator.prefixes = set() diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 53d3d14ab..e74b9ed4a 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -13,8 +13,14 @@ # limitations under the License. import io +import warnings from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE +from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED +from google.cloud.storage.retry import ConditionalRetryPolicy + # Resumable uploads require a chunk size of precisely a multiple of 256 KiB. CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB @@ -28,20 +34,22 @@ "if_metageneration_match", "if_metageneration_not_match", "timeout", + "retry", } # Valid keyword arguments for upload methods. # Note: Changes here need to be reflected in the blob.open() docstring. VALID_UPLOAD_KWARGS = { "content_type", - "num_retries", "predefined_acl", + "num_retries", "if_generation_match", "if_generation_not_match", "if_metageneration_match", "if_metageneration_not_match", "timeout", "checksum", + "retry", } @@ -58,13 +66,35 @@ class BlobReader(io.BufferedIOBase): bytes than the chunk_size are requested, the remainder is buffered. The default is the chunk_size of the blob, or 40MiB. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :param download_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", "if_metageneration_not_match", "timeout". """ - def __init__(self, blob, chunk_size=None, **download_kwargs): + def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: if kwarg not in VALID_DOWNLOAD_KWARGS: @@ -76,6 +106,7 @@ def __init__(self, blob, chunk_size=None, **download_kwargs): self._pos = 0 self._buffer = io.BytesIO() self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + self._retry = retry self._download_kwargs = download_kwargs def read(self, size=-1): @@ -102,6 +133,7 @@ def read(self, size=-1): start=fetch_start, end=fetch_end, checksum=None, + retry=self._retry, **self._download_kwargs ) except RequestRangeNotSatisfiable: @@ -197,6 +229,28 @@ class BlobWriter(io.BufferedIOBase): changes the behavior of flush() to conform to TextIOWrapper's expectations. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :param upload_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", @@ -204,7 +258,14 @@ class BlobWriter(io.BufferedIOBase): "num_retries", "predefined_acl", "checksum". """ - def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): + def __init__( + self, + blob, + chunk_size=None, + text_mode=False, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, + **upload_kwargs + ): for kwarg in upload_kwargs: if kwarg not in VALID_UPLOAD_KWARGS: raise ValueError( @@ -219,6 +280,7 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): # In text mode this class will be wrapped and TextIOWrapper requires a # different behavior of flush(). self._text_mode = text_mode + self._retry = retry self._upload_kwargs = upload_kwargs @property @@ -259,20 +321,32 @@ def write(self, b): return pos def _initiate_upload(self): + # num_retries is only supported for backwards-compatibility reasons. num_retries = self._upload_kwargs.pop("num_retries", None) + retry = self._retry content_type = self._upload_kwargs.pop("content_type", None) - if ( - self._upload_kwargs.get("if_metageneration_match") is None - and num_retries is None - ): - # Uploads are only idempotent (safe to retry) if - # if_metageneration_match is set. If it is not set, the default - # num_retries should be 0. Note: Because retry logic for uploads is - # provided by the google-resumable-media-python package, it doesn't - # use the ConditionalRetryStrategy class used in other API calls in - # this library to solve this problem. - num_retries = 0 + if num_retries is not None: + warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) + # num_retries and retry are mutually exclusive. If num_retries is + # set and retry is exactly the default, then nullify retry for + # backwards compatibility. + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = None + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), + "ifMetagenerationMatch": self._upload_kwargs.get( + "if_metageneration_match" + ), + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) self._upload_and_transport = self._blob._initiate_resumable_upload( self._blob.bucket.client, @@ -281,6 +355,7 @@ def _initiate_upload(self): None, num_retries, chunk_size=self._chunk_size, + retry=retry, **self._upload_kwargs ) diff --git a/setup.py b/setup.py index 55863aabb..6f6fa1f3d 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ dependencies = [ "google-auth >= 1.11.0, < 2.0dev", "google-cloud-core >= 1.4.1, < 2.0dev", - "google-resumable-media >= 1.2.0, < 2.0dev", + "google-resumable-media >= 1.3.0, < 2.0dev", "requests >= 2.18.0, < 3.0.0dev", "googleapis-common-protos < 1.53.0; python_version<'3.0'", ] diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 275d01c60..75a439cf1 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -593,6 +593,45 @@ def test_hostname_and_scheme(self): self.assertEqual(self._call_fut(host=HOST, scheme=SCHEME), EXPECTED_URL) +class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase): + def test_conflict(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + with self.assertRaises(ValueError): + _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY, num_retries=2) + + def test_retry(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY) + self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum) + self.assertEqual(retry_strategy.max_cumulative_retry, DEFAULT_RETRY._deadline) + self.assertEqual(retry_strategy.initial_delay, DEFAULT_RETRY._initial) + self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier) + + def test_num_retries(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + retry_strategy = _api_core_retry_to_resumable_media_retry( + retry=None, num_retries=2 + ) + self.assertEqual(retry_strategy.max_retries, 2) + + def test_none(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None) + self.assertEqual(retry_strategy.max_retries, 0) + + class _MD5Hash(object): def __init__(self, digest_val): self.digest_val = digest_val diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 3ec0db716..46a130dc8 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -29,6 +29,7 @@ from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED def _make_credentials(): @@ -1114,7 +1115,9 @@ def test__extract_headers_from_download_w_response_headers_not_match(self): self.assertIsNone(blob.md5_hash) self.assertIsNone(blob.crc32c) - def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): + def _do_download_helper_wo_chunks( + self, w_range, raw_download, timeout=None, **extra_kwargs + ): blob_name = "blob-name" client = mock.Mock() bucket = _Bucket(client) @@ -1138,6 +1141,8 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): expected_timeout = timeout timeout_kwarg = {"timeout": timeout} + extra_kwargs.update(timeout_kwarg) + with patch as patched: if w_range: blob._do_download( @@ -1148,7 +1153,7 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): start=1, end=3, raw_download=raw_download, - **timeout_kwarg + **extra_kwargs ) else: blob._do_download( @@ -1157,7 +1162,7 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): download_url, headers, raw_download=raw_download, - **timeout_kwarg + **extra_kwargs ) if w_range: @@ -1183,9 +1188,21 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): transport, timeout=expected_timeout ) + retry_strategy = patched.return_value._retry_strategy + retry = extra_kwargs.get("retry", None) + if retry is None: + self.assertEqual(retry_strategy.max_retries, 0) + else: + self.assertEqual(retry_strategy.max_sleep, retry._maximum) + def test__do_download_wo_chunks_wo_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=False, raw_download=False) + def test__do_download_wo_chunks_wo_range_wo_raw_w_retry(self): + self._do_download_helper_wo_chunks( + w_range=False, raw_download=False, retry=DEFAULT_RETRY + ) + def test__do_download_wo_chunks_w_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=True, raw_download=False) @@ -1334,6 +1351,7 @@ def test_download_to_file_with_failure(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_file_wo_media_link(self): @@ -1361,6 +1379,7 @@ def test_download_to_file_wo_media_link(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_file_w_generation_match(self): @@ -1384,9 +1403,12 @@ def test_download_to_file_w_generation_match(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) - def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): + def _download_to_file_helper( + self, use_chunks, raw_download, timeout=None, **extra_kwargs + ): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1404,12 +1426,15 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): expected_timeout = timeout timeout_kwarg = {"timeout": timeout} + extra_kwargs.update(timeout_kwarg) + file_obj = io.BytesIO() if raw_download: - blob.download_to_file(file_obj, raw_download=True, **timeout_kwarg) + blob.download_to_file(file_obj, raw_download=True, **extra_kwargs) else: - blob.download_to_file(file_obj, **timeout_kwarg) + blob.download_to_file(file_obj, **extra_kwargs) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) client.download_blob_to_file.assert_called_once_with( blob, file_obj, @@ -1422,11 +1447,15 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): raw_download=raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) def test_download_to_file_wo_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=False, raw_download=False) + def test_download_to_file_wo_chunks_no_retry(self): + self._download_to_file_helper(use_chunks=False, raw_download=False, retry=None) + def test_download_to_file_w_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=True, raw_download=False) @@ -1441,7 +1470,9 @@ def test_download_to_file_w_custom_timeout(self): use_chunks=False, raw_download=False, timeout=9.58 ) - def _download_to_filename_helper(self, updated, raw_download, timeout=None): + def _download_to_filename_helper( + self, updated, raw_download, timeout=None, **extra_kwargs + ): import os from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud._testing import _NamedTemporaryFile @@ -1457,10 +1488,15 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): with _NamedTemporaryFile() as temp: if timeout is None: - blob.download_to_filename(temp.name, raw_download=raw_download) + blob.download_to_filename( + temp.name, raw_download=raw_download, **extra_kwargs + ) else: blob.download_to_filename( - temp.name, raw_download=raw_download, timeout=timeout, + temp.name, + raw_download=raw_download, + timeout=timeout, + **extra_kwargs ) if updated is None: @@ -1475,6 +1511,8 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): expected_timeout = self._get_default_timeout() if timeout is None else timeout + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + client.download_blob_to_file.assert_called_once_with( blob, mock.ANY, @@ -1487,6 +1525,7 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): raw_download=raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1495,6 +1534,12 @@ def test_download_to_filename_w_updated_wo_raw(self): updated = "2014-12-06T13:13:50.690Z" self._download_to_filename_helper(updated=updated, raw_download=False) + def test_download_to_filename_w_updated_no_retry(self): + updated = "2014-12-06T13:13:50.690Z" + self._download_to_filename_helper( + updated=updated, raw_download=False, retry=None + ) + def test_download_to_filename_wo_updated_wo_raw(self): self._download_to_filename_helper(updated=None, raw_download=False) @@ -1533,6 +1578,7 @@ def test_download_to_filename_w_generation_match(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1572,11 +1618,12 @@ def test_download_to_filename_corrupted(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertEqual(stream.name, filename) - def _download_as_bytes_helper(self, raw_download, timeout=None): + def _download_as_bytes_helper(self, raw_download, timeout=None, **extra_kwargs): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1584,12 +1631,16 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): if timeout is None: expected_timeout = self._get_default_timeout() - fetched = blob.download_as_bytes(raw_download=raw_download) + fetched = blob.download_as_bytes(raw_download=raw_download, **extra_kwargs) else: expected_timeout = timeout - fetched = blob.download_as_bytes(raw_download=raw_download, timeout=timeout) + fetched = blob.download_as_bytes( + raw_download=raw_download, timeout=timeout, **extra_kwargs + ) self.assertEqual(fetched, b"") + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + client.download_blob_to_file.assert_called_once_with( blob, mock.ANY, @@ -1602,16 +1653,11 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): raw_download=raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) - def test_download_as_bytes_wo_raw(self): - self._download_as_bytes_helper(raw_download=False) - - def test_download_as_bytes_w_raw(self): - self._download_as_bytes_helper(raw_download=True) - def test_download_as_bytes_w_custom_timeout(self): self._download_as_bytes_helper(raw_download=False, timeout=9.58) @@ -1640,8 +1686,21 @@ def test_download_as_bytes_w_generation_match(self): if_metageneration_not_match=None, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) + def test_download_as_bytes_wo_raw(self): + self._download_as_bytes_helper(raw_download=False) + + def test_download_as_bytes_no_retry(self): + self._download_as_bytes_helper(raw_download=False, retry=None) + + def test_download_as_bytes_w_raw(self): + self._download_as_bytes_helper(raw_download=True) + + def test_download_as_byte_w_custom_timeout(self): + self._download_as_bytes_helper(raw_download=False, timeout=9.58) + def _download_as_text_helper( self, raw_download, @@ -1658,6 +1717,7 @@ def _download_as_text_helper( no_charset=False, expected_value=u"DEADBEEF", payload=None, + **extra_kwargs ): if payload is None: if encoding is not None: @@ -1709,10 +1769,14 @@ def _download_as_text_helper( else: kwargs["timeout"] = expected_timeout = timeout + kwargs.update(extra_kwargs) + fetched = blob.download_as_text(**kwargs) self.assertEqual(fetched, expected_value) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + blob.download_as_bytes.assert_called_once_with( client=client, start=start, @@ -1723,11 +1787,15 @@ def _download_as_text_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + retry=expected_retry, ) def test_download_as_text_wo_raw(self): self._download_as_text_helper(raw_download=False) + def test_download_as_text_w_no_retry(self): + self._download_as_text_helper(raw_download=False, retry=None) + def test_download_as_text_w_raw(self): self._download_as_text_helper(raw_download=True) @@ -1815,6 +1883,7 @@ def test_download_as_string(self, mock_warn): if_metageneration_not_match=None, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) mock_warn.assert_called_with( @@ -1824,6 +1893,33 @@ def test_download_as_string(self, mock_warn): stacklevel=1, ) + def test_download_as_string_no_retry(self): + MEDIA_LINK = "http://example.com/media/" + + client = self._make_client() + blob = self._make_one( + "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} + ) + client.download_blob_to_file = mock.Mock() + + fetched = blob.download_as_string(retry=None) + self.assertEqual(fetched, b"") + + client.download_blob_to_file.assert_called_once_with( + blob, + mock.ANY, + start=None, + end=None, + raw_download=False, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=self._get_default_timeout(), + checksum="md5", + retry=None, + ) + def test__get_content_type_explicit(self): blob = self._make_one(u"blob-name", bucket=None) @@ -1944,6 +2040,7 @@ def _do_multipart_success( timeout=None, metadata=None, mtls=False, + retry=None, ): from six.moves.urllib.parse import urlencode @@ -1992,6 +2089,7 @@ def _do_multipart_success( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2063,6 +2161,28 @@ def _do_multipart_success( def test__do_multipart_upload_no_size(self, mock_get_boundary): self._do_multipart_success(mock_get_boundary, predefined_acl="private") + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_retry(self, mock_get_boundary): + self._do_multipart_success( + mock_get_boundary, predefined_acl="private", retry=DEFAULT_RETRY + ) + + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_num_retries(self, mock_get_boundary): + self._do_multipart_success( + mock_get_boundary, predefined_acl="private", num_retries=2 + ) + + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_retry_conflict(self, mock_get_boundary): + with self.assertRaises(ValueError): + self._do_multipart_success( + mock_get_boundary, + predefined_acl="private", + num_retries=2, + retry=DEFAULT_RETRY, + ) + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_no_size_mtls(self, mock_get_boundary): self._do_multipart_success( @@ -2101,7 +2221,7 @@ def test__do_multipart_upload_with_kms_with_version(self, mock_get_boundary): @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_retry(self, mock_get_boundary): - self._do_multipart_success(mock_get_boundary, num_retries=8) + self._do_multipart_success(mock_get_boundary, retry=DEFAULT_RETRY) @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_generation_match(self, mock_get_boundary): @@ -2165,6 +2285,7 @@ def _initiate_resumable_helper( timeout=None, metadata=None, mtls=False, + retry=None, ): from six.moves.urllib.parse import urlencode from google.resumable_media.requests import ResumableUpload @@ -2235,6 +2356,7 @@ def _initiate_resumable_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2300,13 +2422,15 @@ def _initiate_resumable_helper( self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) retry_strategy = upload._retry_strategy - self.assertEqual(retry_strategy.max_sleep, 64.0) - if num_retries is None: - self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) - self.assertIsNone(retry_strategy.max_retries) - else: - self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertFalse(num_retries is not None and retry is not None) + if num_retries is not None and retry is None: self.assertEqual(retry_strategy.max_retries, num_retries) + elif retry is None: + self.assertEqual(retry_strategy.max_retries, 0) + else: + self.assertEqual(retry_strategy.max_sleep, 60.0) + self.assertEqual(retry_strategy.max_cumulative_retry, 120.0) + self.assertIsNone(retry_strategy.max_retries) self.assertIs(client._http, transport) # Make sure we never read from the stream. self.assertEqual(stream.tell(), 0) @@ -2383,8 +2507,15 @@ def test__initiate_resumable_upload_with_extra_headers(self): self._initiate_resumable_helper(extra_headers=extra_headers) def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_helper(retry=DEFAULT_RETRY) + + def test__initiate_resumable_upload_with_num_retries(self): self._initiate_resumable_helper(num_retries=11) + def test__initiate_resumable_upload_with_retry_conflict(self): + with self.assertRaises(ValueError): + self._initiate_resumable_helper(retry=DEFAULT_RETRY, num_retries=2) + def test__initiate_resumable_upload_with_generation_match(self): self._initiate_resumable_helper( if_generation_match=4, if_metageneration_match=4 @@ -2536,6 +2667,7 @@ def _do_resumable_helper( if_metageneration_not_match=None, timeout=None, data_corruption=False, + retry=None, ): bucket = _Bucket(name="yesterday") blob = self._make_one(u"blob-name", bucket=bucket) @@ -2582,6 +2714,7 @@ def _do_resumable_helper( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2639,7 +2772,14 @@ def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) def test__do_resumable_upload_with_retry(self): - self._do_resumable_helper(num_retries=6) + self._do_resumable_helper(retry=DEFAULT_RETRY) + + def test__do_resumable_upload_with_num_retries(self): + self._do_resumable_helper(num_retries=8) + + def test__do_resumable_upload_with_retry_conflict(self): + with self.assertRaises(ValueError): + self._do_resumable_helper(num_retries=9, retry=DEFAULT_RETRY) def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") @@ -2665,6 +2805,7 @@ def _do_upload_helper( if_metageneration_not_match=None, size=None, timeout=None, + retry=None, ): from google.cloud.storage.blob import _MAX_MULTIPART_SIZE @@ -2708,13 +2849,12 @@ def _do_upload_helper( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) - # Adjust num_retries expectations to reflect the conditional default in - # _do_upload() - if num_retries is None and if_metageneration_match is None: - num_retries = 0 + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = DEFAULT_RETRY if if_metageneration_match else None self.assertIs(created_json, mock.sentinel.json) response.json.assert_called_once_with() @@ -2732,6 +2872,7 @@ def _do_upload_helper( if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) blob._do_resumable_upload.assert_not_called() else: @@ -2749,6 +2890,7 @@ def _do_upload_helper( if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) def test__do_upload_uses_multipart(self): @@ -2776,7 +2918,18 @@ def test__do_upload_uses_resumable_w_custom_timeout(self): ) def test__do_upload_with_retry(self): - self._do_upload_helper(num_retries=20) + self._do_upload_helper(retry=DEFAULT_RETRY) + + def test__do_upload_with_num_retries(self): + self._do_upload_helper(num_retries=2) + + def test__do_upload_with_conditional_retry_success(self): + self._do_upload_helper( + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, if_metageneration_match=1 + ) + + def test__do_upload_with_conditional_retry_failure(self): + self._do_upload_helper(retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) def _upload_from_file_helper(self, side_effect=None, **kwargs): from google.cloud._helpers import UTC @@ -2800,6 +2953,11 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_generation_not_match = kwargs.get("if_generation_not_match", None) if_metageneration_match = kwargs.get("if_metageneration_match", None) if_metageneration_not_match = kwargs.get("if_metageneration_not_match", None) + num_retries = kwargs.get("num_retries", None) + default_retry = ( + DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + ) + retry = kwargs.get("retry", default_retry) ret_val = blob.upload_from_file( stream, size=len(data), content_type=content_type, client=client, **kwargs ) @@ -2811,8 +2969,6 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): expected_timeout = kwargs.get("timeout", self._get_default_timeout()) - # Check the mock. - num_retries = kwargs.get("num_retries") blob._do_upload.assert_called_once_with( client, stream, @@ -2826,6 +2982,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) return stream @@ -2835,13 +2992,24 @@ def test_upload_from_file_success(self): @mock.patch("warnings.warn") def test_upload_from_file_with_retries(self, mock_warn): + self._upload_from_file_helper(retry=DEFAULT_RETRY) + + @mock.patch("warnings.warn") + def test_upload_from_file_with_num_retries(self, mock_warn): from google.cloud.storage import blob as blob_module - self._upload_from_file_helper(num_retries=20) + self._upload_from_file_helper(num_retries=2) mock_warn.assert_called_once_with( blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 ) + @mock.patch("warnings.warn") + def test_upload_from_file_with_retry_conflict(self, mock_warn): + # Special case here: in a conflict this method should NOT raise an error + # as that's handled further downstream. It should pass both options + # through. + self._upload_from_file_helper(retry=DEFAULT_RETRY, num_retries=2) + def test_upload_from_file_with_rewind(self): stream = self._upload_from_file_helper(rewind=True) assert stream.tell() == 0 @@ -2868,7 +3036,14 @@ def test_upload_from_file_failure(self): self.assertEqual(exc_info.exception.errors, []) def _do_upload_mock_call_helper( - self, blob, client, content_type, size, timeout=None + self, + blob, + client, + content_type, + size, + timeout=None, + num_retries=None, + retry=None, ): self.assertEqual(blob._do_upload.call_count, 1) mock_call = blob._do_upload.mock_calls[0] @@ -2878,7 +3053,7 @@ def _do_upload_mock_call_helper( self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) - self.assertIsNone(pos_args[4]) # num_retries + self.assertEqual(pos_args[4], num_retries) # num_retries self.assertIsNone(pos_args[5]) # predefined_acl self.assertIsNone(pos_args[6]) # if_generation_match self.assertIsNone(pos_args[7]) # if_generation_not_match @@ -2886,7 +3061,13 @@ def _do_upload_mock_call_helper( self.assertIsNone(pos_args[9]) # if_metageneration_not_match expected_timeout = self._get_default_timeout() if timeout is None else timeout - self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None}) + if not retry: + retry = ( + DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + ) + self.assertEqual( + kwargs, {"timeout": expected_timeout, "checksum": None, "retry": retry} + ) return pos_args[1] @@ -2921,6 +3102,72 @@ def test_upload_from_filename(self): self.assertEqual(stream.mode, "rb") self.assertEqual(stream.name, temp.name) + def test_upload_from_filename_with_retry(self): + from google.cloud._testing import _NamedTemporaryFile + + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, "wb") as file_obj: + file_obj.write(data) + + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client, retry=DEFAULT_RETRY + ) + + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json["metadata"]) + + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data), retry=DEFAULT_RETRY + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + self.assertEqual(stream.name, temp.name) + + def test_upload_from_filename_with_num_retries(self): + from google.cloud._testing import _NamedTemporaryFile + + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, "wb") as file_obj: + file_obj.write(data) + + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client, num_retries=2 + ) + + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json["metadata"]) + + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data), num_retries=2 + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + self.assertEqual(stream.name, temp.name) + def test_upload_from_filename_w_custom_timeout(self): from google.cloud._testing import _NamedTemporaryFile @@ -2965,6 +3212,11 @@ def _upload_from_string_helper(self, data, **kwargs): self.assertIsNone(ret_val) self.assertEqual(blob.component_count, 5) + extra_kwargs = {} + if "retry" in kwargs: + extra_kwargs["retry"] = kwargs["retry"] + if "num_retries" in kwargs: + extra_kwargs["num_retries"] = kwargs["num_retries"] # Check the mock. payload = _to_bytes(data, encoding="utf-8") stream = self._do_upload_mock_call_helper( @@ -2973,6 +3225,7 @@ def _upload_from_string_helper(self, data, **kwargs): "text/plain", len(payload), kwargs.get("timeout", self._get_default_timeout()), + **extra_kwargs ) self.assertIsInstance(stream, io.BytesIO) self.assertEqual(stream.getvalue(), payload) @@ -2989,6 +3242,14 @@ def test_upload_from_string_w_text(self): data = u"\N{snowman} \N{sailboat}" self._upload_from_string_helper(data) + def test_upload_from_string_w_text_w_retry(self): + data = u"\N{snowman} \N{sailboat}" + self._upload_from_string_helper(data, retry=DEFAULT_RETRY) + + def test_upload_from_string_w_text_w_num_retries(self): + data = u"\N{snowman} \N{sailboat}" + self._upload_from_string_helper(data, num_retries=2) + def _create_resumable_upload_session_helper( self, origin=None, side_effect=None, timeout=None ): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4c99a3860..33ec331d6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -29,6 +29,7 @@ from . import _read_local_json from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED _SERVICE_ACCOUNT_JSON = _read_local_json("url_signer_v4_test_account.json") @@ -1402,6 +1403,7 @@ def test_download_blob_to_file_with_failure(self): False, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ) def test_download_blob_to_file_with_uri(self): @@ -1432,18 +1434,42 @@ def test_download_blob_to_file_with_uri(self): False, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ) def test_download_blob_to_file_with_invalid_uri(self): project = "PROJECT" - credentials = _make_credentials(project=project) + credentials = _make_credentials() client = self._make_one(project=project, credentials=credentials) file_obj = io.BytesIO() with pytest.raises(ValueError, match="URI scheme must be gs"): client.download_blob_to_file("http://bucket_name/path/to/object", file_obj) - def _download_blob_to_file_helper(self, use_chunks, raw_download): + def test_download_blob_to_file_w_no_retry(self): + self._download_blob_to_file_helper( + use_chunks=True, raw_download=True, retry=None + ) + + def test_download_blob_to_file_w_conditional_retry_pass(self): + self._download_blob_to_file_helper( + use_chunks=True, + raw_download=True, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + if_generation_match=1, + ) + + def test_download_blob_to_file_w_conditional_retry_fail(self): + self._download_blob_to_file_helper( + use_chunks=True, + raw_download=True, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + expect_condition_fail=True, + ) + + def _download_blob_to_file_helper( + self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs + ): from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -1460,9 +1486,20 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download): file_obj = io.BytesIO() if raw_download: - client.download_blob_to_file(blob, file_obj, raw_download=True) + client.download_blob_to_file( + blob, file_obj, raw_download=True, **extra_kwargs + ) else: - client.download_blob_to_file(blob, file_obj) + client.download_blob_to_file(blob, file_obj, **extra_kwargs) + + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + if ( + expected_retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED + and not expect_condition_fail + ): + expected_retry = DEFAULT_RETRY + elif expect_condition_fail: + expected_retry = None headers = {"accept-encoding": "gzip"} blob._do_download.assert_called_once_with( @@ -1475,6 +1512,7 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download): raw_download, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=expected_retry, ) def test_download_blob_to_file_wo_chunks_wo_raw(self): @@ -1520,6 +1558,8 @@ def test_list_blobs_w_defaults_w_bucket_obj(self): max_results=expected_max_results, extra_params=expected_extra_params, page_start=expected_page_start, + timeout=self._get_default_timeout(), + retry=DEFAULT_RETRY, ) def test_list_blobs_w_explicit_w_user_project(self): @@ -1594,6 +1634,8 @@ def test_list_blobs_w_explicit_w_user_project(self): max_results=expected_max_results, extra_params=expected_extra_params, page_start=expected_page_start, + timeout=timeout, + retry=retry, ) def test_list_buckets_wo_project(self): diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 0ac16ab24..6ce9b4990 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -19,8 +19,10 @@ import io import string +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage.fileio import BlobReader, BlobWriter, SlidingBuffer from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage.retry import DEFAULT_RETRY TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n" TEST_BINARY_DATA = TEST_TEXT_DATA.encode("utf-8") @@ -37,7 +39,15 @@ def test_attributes(self): self.assertTrue(reader.seekable()) self.assertTrue(reader.readable()) self.assertFalse(reader.writable()) - self.assertEqual(256, reader._chunk_size) + self.assertEqual(reader._chunk_size, 256) + self.assertEqual(reader._retry, DEFAULT_RETRY) + + def test_attributes_explict(self): + blob = mock.Mock() + blob.chunk_size = 256 + reader = BlobReader(blob, chunk_size=1024, retry=None) + self.assertEqual(reader._chunk_size, 1024) + self.assertIsNone(reader._retry) def test_read(self): blob = mock.Mock() @@ -52,7 +62,7 @@ def read_from_fake_data(start=0, end=None, **_): # Read and trigger the first download of chunk_size. self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) blob.download_as_bytes.assert_called_once_with( - start=0, end=8, checksum=None, **download_kwargs + start=0, end=8, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read from buffered data only. @@ -64,7 +74,7 @@ def read_from_fake_data(start=0, end=None, **_): self.assertEqual(reader._pos, 12) self.assertEqual(blob.download_as_bytes.call_count, 2) blob.download_as_bytes.assert_called_with( - start=8, end=16, checksum=None, **download_kwargs + start=8, end=16, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read a larger amount, requiring a download larger than chunk_size. @@ -72,14 +82,32 @@ def read_from_fake_data(start=0, end=None, **_): self.assertEqual(reader._pos, 28) self.assertEqual(blob.download_as_bytes.call_count, 3) blob.download_as_bytes.assert_called_with( - start=16, end=28, checksum=None, **download_kwargs + start=16, end=28, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read all remaining data. self.assertEqual(reader.read(), TEST_BINARY_DATA[28:]) self.assertEqual(blob.download_as_bytes.call_count, 4) blob.download_as_bytes.assert_called_with( - start=28, end=None, checksum=None, **download_kwargs + start=28, end=None, checksum=None, retry=DEFAULT_RETRY, **download_kwargs + ) + + reader.close() + + def test_retry_passed_through(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, retry=None, **download_kwargs) + + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) + blob.download_as_bytes.assert_called_once_with( + start=0, end=8, checksum=None, retry=None, **download_kwargs ) reader.close() @@ -104,12 +132,16 @@ def read_from_fake_data(start=0, end=None, **_): # Read a line. With chunk_size=10, expect three chunks downloaded. self.assertEqual(reader.readline(), TEST_BINARY_DATA[:27]) - blob.download_as_bytes.assert_called_with(start=20, end=30, checksum=None) + blob.download_as_bytes.assert_called_with( + start=20, end=30, checksum=None, retry=DEFAULT_RETRY + ) self.assertEqual(blob.download_as_bytes.call_count, 3) # Read another line. self.assertEqual(reader.readline(), TEST_BINARY_DATA[27:]) - blob.download_as_bytes.assert_called_with(start=50, end=60, checksum=None) + blob.download_as_bytes.assert_called_with( + start=50, end=60, checksum=None, retry=DEFAULT_RETRY + ) self.assertEqual(blob.download_as_bytes.call_count, 6) blob.size = len(TEST_BINARY_DATA) @@ -118,7 +150,10 @@ def read_from_fake_data(start=0, end=None, **_): # Read all lines. The readlines algorithm will attempt to read past the end of the last line once to verify there is no more to read. self.assertEqual(b"".join(reader.readlines()), TEST_BINARY_DATA) blob.download_as_bytes.assert_called_with( - start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10, checksum=None + start=len(TEST_BINARY_DATA), + end=len(TEST_BINARY_DATA) + 10, + checksum=None, + retry=DEFAULT_RETRY, ) self.assertEqual(blob.download_as_bytes.call_count, 13) @@ -209,7 +244,14 @@ def test_attributes(self): self.assertFalse(writer.seekable()) self.assertFalse(writer.readable()) self.assertTrue(writer.writable()) - self.assertEqual(256 * 1024, writer._chunk_size) + self.assertEqual(writer._chunk_size, 256 * 1024) + + def test_attributes_explicit(self): + blob = mock.Mock() + blob.chunk_size = 256 * 1024 + writer = BlobWriter(blob, chunk_size=512 * 1024, retry=DEFAULT_RETRY) + self.assertEqual(writer._chunk_size, 512 * 1024) + self.assertEqual(writer._retry, DEFAULT_RETRY) def test_reject_wrong_chunk_size(self): blob = mock.Mock() @@ -261,6 +303,7 @@ def test_write(self): None, NUM_RETRIES, chunk_size=chunk_size, + retry=None, **upload_kwargs ) upload.transmit_next_chunk.assert_called_with(transport) @@ -286,7 +329,56 @@ def test_seek_fails(self): with self.assertRaises(io.UnsupportedOperation): writer.seek() - def test_conditional_retries(self): + def test_conditional_retry_failure(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, chunk_size=chunk_size, content_type=PLAIN_CONTENT_TYPE, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + # Due to the condition not being fulfilled, retry should be None. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=None, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_conditional_retry_pass(self): blob = mock.Mock() upload = mock.Mock() @@ -302,8 +394,8 @@ def test_conditional_retries(self): writer = BlobWriter( blob, chunk_size=chunk_size, - num_retries=None, content_type=PLAIN_CONTENT_TYPE, + if_metageneration_match=1, ) # The transmit_next_chunk method must actually consume bytes from the @@ -319,15 +411,69 @@ def test_conditional_retries(self): # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. - # Due to the condition not being fulfilled, num_retries should be 0. + # Due to the condition being fulfilled, retry should be DEFAULT_RETRY. writer.write(TEST_BINARY_DATA[4:32]) blob._initiate_resumable_upload.assert_called_once_with( blob.bucket.client, writer._buffer, PLAIN_CONTENT_TYPE, - None, - 0, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + if_metageneration_match=1, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + @mock.patch("warnings.warn") + def test_forced_default_retry(self, mock_warn): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + retry=DEFAULT_RETRY, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + None, # num_retries chunk_size=chunk_size, + retry=DEFAULT_RETRY, ) upload.transmit_next_chunk.assert_called_with(transport) self.assertEqual(upload.transmit_next_chunk.call_count, 4) @@ -337,6 +483,99 @@ def test_conditional_retries(self): writer.close() self.assertEqual(upload.transmit_next_chunk.call_count, 5) + def test_num_retries_and_retry_conflict(self): + blob = mock.Mock() + + blob._initiate_resumable_upload.side_effect = ValueError + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + num_retries=2, + retry=DEFAULT_RETRY, + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. The mock will raise a ValueError, simulating + # actual behavior when num_retries and retry are both specified. + with self.assertRaises(ValueError): + writer.write(TEST_BINARY_DATA[4:32]) + + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + 2, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + ) + + @mock.patch("warnings.warn") + def test_num_retries_only(self, mock_warn): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + num_retries=2, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + 2, # num_retries + chunk_size=chunk_size, + retry=None, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + mock_warn.assert_called_once_with( + _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 + ) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + def test_rejects_invalid_kwargs(self): blob = mock.Mock() with self.assertRaises(ValueError): @@ -606,5 +845,6 @@ def test_write(self): None, NUM_RETRIES, chunk_size=chunk_size, + retry=None, ) upload.transmit_next_chunk.assert_called_with(transport)