diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 9e09fc9f2..04671035b 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -23,6 +23,7 @@ import os from six.moves.urllib.parse import urlsplit +from google import resumable_media from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED @@ -45,6 +46,12 @@ ("if_source_metageneration_not_match", "ifSourceMetagenerationNotMatch"), ) +_NUM_RETRIES_MESSAGE = ( + "`num_retries` has been deprecated and will be removed in a future " + "release. Use the `retry` argument with a Retry or ConditionalRetryPolicy " + "object, or None, instead." +) + def _get_storage_host(): return os.environ.get(STORAGE_EMULATOR_ENV_VAR, _DEFAULT_STORAGE_HOST) @@ -524,3 +531,37 @@ def _bucket_bound_hostname_url(host, scheme=None): return host return "{scheme}://{host}/".format(scheme=scheme, host=host) + + +def _api_core_retry_to_resumable_media_retry(retry, num_retries=None): + """Convert google.api.core.Retry to google.resumable_media.RetryStrategy. + + Custom predicates are not translated. + + :type retry: google.api_core.Retry + :param retry: (Optional) The google.api_core.Retry object to translate. + + :type num_retries: int + :param num_retries: (Optional) The number of retries desired. This is + supported for backwards compatibility and is mutually exclusive with + `retry`. + + :rtype: google.resumable_media.RetryStrategy + :returns: A RetryStrategy with all applicable attributes copied from input, + or a RetryStrategy with max_retries set to 0 if None was input. + """ + + if retry is not None and num_retries is not None: + raise ValueError("num_retries and retry arguments are mutually exclusive") + + elif retry is not None: + return resumable_media.RetryStrategy( + max_sleep=retry._maximum, + max_cumulative_retry=retry._deadline, + initial_delay=retry._initial, + multiplier=retry._multiplier, + ) + elif num_retries is not None: + return resumable_media.RetryStrategy(max_retries=num_retries) + else: + return resumable_media.RetryStrategy(max_retries=0) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 3fb8a59b9..597e63ca4 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -65,8 +65,10 @@ from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud.storage._helpers import _raise_if_more_than_one_set +from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry from google.cloud.storage._signing import generate_signed_url_v2 from google.cloud.storage._signing import generate_signed_url_v4 +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage.acl import ACL from google.cloud.storage.acl import ObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -76,9 +78,11 @@ from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS from google.cloud.storage.constants import STANDARD_STORAGE_CLASS +from google.cloud.storage.retry import ConditionalRetryPolicy from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED from google.cloud.storage.fileio import BlobReader from google.cloud.storage.fileio import BlobWriter @@ -105,17 +109,6 @@ "name", "storageClass", ) -_NUM_RETRIES_MESSAGE = ( - "`num_retries` has been deprecated and will be removed in a future " - "release. The default behavior (when `num_retries` is not specified) when " - "a transient error (e.g. 429 Too Many Requests or 500 Internal Server " - "Error) occurs will be as follows: upload requests will be automatically " - "retried if and only if `if_metageneration_match` is specified (thus " - "making the upload idempotent). Subsequent retries will be sent after " - "waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until 10 minutes " - "of wait time have elapsed. At that point, there will be no more attempts " - "to retry." -) _READ_LESS_THAN_SIZE = ( "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining." ) @@ -892,6 +885,7 @@ def _do_download( raw_download=False, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=None, ): """Perform a download without any error handling. @@ -938,7 +932,25 @@ def _do_download( downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. """ + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + if self.chunk_size is None: if raw_download: klass = RawDownload @@ -953,6 +965,7 @@ def _do_download( end=end, checksum=checksum, ) + download._retry_strategy = retry_strategy response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) else: @@ -975,6 +988,7 @@ def _do_download( end=end, ) + download._retry_strategy = retry_strategy while not download.finished: download.consume_next_chunk(transport, timeout=timeout) @@ -991,6 +1005,7 @@ def download_to_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """DEPRECATED. Download the contents of this blob into a file-like object. @@ -1077,6 +1092,28 @@ def download_to_file( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1093,6 +1130,7 @@ def download_to_file( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def download_to_filename( @@ -1108,6 +1146,7 @@ def download_to_filename( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of this blob into a named file. @@ -1170,6 +1209,28 @@ def download_to_filename( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1187,6 +1248,7 @@ def download_to_filename( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) except resumable_media.DataCorruption: # Delete the corrupt downloaded file. @@ -1213,6 +1275,7 @@ def download_as_bytes( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of this blob as a bytes object. @@ -1272,6 +1335,28 @@ def download_as_bytes( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: bytes :returns: The data stored in this blob. @@ -1291,6 +1376,7 @@ def download_as_bytes( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) return string_buffer.getvalue() @@ -1305,6 +1391,7 @@ def download_as_string( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ): """(Deprecated) Download the contents of this blob as a bytes object. @@ -1356,6 +1443,28 @@ def download_as_string( (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: bytes :returns: The data stored in this blob. @@ -1377,6 +1486,7 @@ def download_as_string( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + retry=retry, ) def download_as_text( @@ -1391,6 +1501,7 @@ def download_as_text( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ): """Download the contents of this blob as text (*not* bytes). @@ -1445,6 +1556,28 @@ def download_as_text( (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: text :returns: The data stored in this blob, decoded to text. """ @@ -1458,6 +1591,7 @@ def download_as_text( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + retry=retry, ) if encoding is not None: @@ -1570,6 +1704,7 @@ def _do_multipart_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Perform a multipart upload. @@ -1645,6 +1780,21 @@ def _do_multipart_upload( manually-set checksum value. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -1706,10 +1856,9 @@ def _do_multipart_upload( upload_url = _add_query_parameters(base_url, name_value_pairs) upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries - ) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry( + retry, num_retries + ) response = upload.transmit( transport, data, object_metadata, content_type, timeout=timeout @@ -1733,6 +1882,7 @@ def _initiate_resumable_upload( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Initiate a resumable upload. @@ -1823,6 +1973,21 @@ def _initiate_resumable_upload( delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: tuple :returns: Pair of @@ -1887,10 +2052,9 @@ def _initiate_resumable_upload( upload_url, chunk_size, headers=headers, checksum=checksum ) - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries - ) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry( + retry, num_retries + ) upload.initiate( transport, @@ -1918,6 +2082,7 @@ def _do_resumable_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Perform a resumable upload. @@ -1998,6 +2163,21 @@ def _do_resumable_upload( delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. @@ -2015,6 +2195,7 @@ def _do_resumable_upload( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) while not upload.finished: @@ -2041,6 +2222,7 @@ def _do_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Determine an upload strategy and then perform the upload. @@ -2125,19 +2307,45 @@ def _do_upload( attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the **final** response in the resumable case. """ - if if_metageneration_match is None and num_retries is None: - # Uploads are only idempotent (safe to retry) if - # if_metageneration_match is set. If it is not set, the default - # num_retries should be 0. Note: Because retry logic for uploads is - # provided by the google-resumable-media-python package, it doesn't - # use the ConditionalRetryStrategy class used in other API calls in - # this library to solve this problem. - num_retries = 0 + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) if size is not None and size <= _MAX_MULTIPART_SIZE: response = self._do_multipart_upload( @@ -2153,6 +2361,7 @@ def _do_upload( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) else: response = self._do_resumable_upload( @@ -2168,6 +2377,7 @@ def _do_upload( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) return response.json() @@ -2187,6 +2397,7 @@ def upload_from_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload the contents of this blob from a file-like object. @@ -2301,6 +2512,28 @@ def upload_from_file( attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. @@ -2310,6 +2543,11 @@ def upload_from_file( """ if num_retries is not None: warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) + # num_retries and retry are mutually exclusive. If num_retries is + # set and retry is exactly the default, then nullify retry for + # backwards compatibility. + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = None _maybe_rewind(file_obj, rewind=rewind) predefined_acl = ACL.validate_predefined(predefined_acl) @@ -2328,6 +2566,7 @@ def upload_from_file( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: @@ -2346,6 +2585,7 @@ def upload_from_filename( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload this blob's contents from the content of a named file. @@ -2434,6 +2674,28 @@ def upload_from_filename( google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. """ content_type = self._get_content_type(content_type, filename=filename) @@ -2452,6 +2714,7 @@ def upload_from_filename( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def upload_from_string( @@ -2467,6 +2730,7 @@ def upload_from_string( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload contents of this blob from the provided string. @@ -2551,6 +2815,28 @@ def upload_from_string( google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. """ data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) @@ -2567,6 +2853,7 @@ def upload_from_string( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def create_resumable_upload_session( @@ -3371,9 +3658,12 @@ def open( :param kwargs: Keyword arguments to pass to the underlying API calls. For both uploads and downloads, the following arguments are supported: "if_generation_match", "if_generation_not_match", - "if_metageneration_match", "if_metageneration_not_match", "timeout". - For uploads only, the following additional arguments are supported: - "content_type", "num_retries", "predefined_acl", "checksum". + "if_metageneration_match", "if_metageneration_not_match", "timeout", + "retry". For uploads only, the following additional arguments are + supported: "content_type", "num_retries", "predefined_acl", + "checksum". "num_retries" is supported for backwards-compatibility + reasons only; please use "retry" with a Retry object or + ConditionalRetryPolicy instead. :returns: A 'BlobReader' or 'BlobWriter' from 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index a9a06746a..df42f0c11 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -53,6 +53,7 @@ from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import ConditionalRetryPolicy _marker = object() @@ -972,6 +973,7 @@ def download_blob_to_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of a blob object or blob URI into a file-like object. @@ -1021,6 +1023,27 @@ def download_blob_to_file( downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + retry (google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy) + (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. Examples: Download a blob using a blob resource. @@ -1046,6 +1069,19 @@ def download_blob_to_file( """ + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) + if not isinstance(blob_or_uri, Blob): blob_or_uri = Blob.from_string(blob_or_uri) download_url = blob_or_uri._get_download_url( @@ -1070,6 +1106,7 @@ def download_blob_to_file( raw_download, timeout=timeout, checksum=checksum, + retry=retry, ) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) @@ -1222,6 +1259,8 @@ def list_blobs( max_results=max_results, extra_params=extra_params, page_start=_blobs_page_start, + timeout=timeout, + retry=retry, ) iterator.bucket = bucket iterator.prefixes = set() diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 53d3d14ab..e74b9ed4a 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -13,8 +13,14 @@ # limitations under the License. import io +import warnings from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE +from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED +from google.cloud.storage.retry import ConditionalRetryPolicy + # Resumable uploads require a chunk size of precisely a multiple of 256 KiB. CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB @@ -28,20 +34,22 @@ "if_metageneration_match", "if_metageneration_not_match", "timeout", + "retry", } # Valid keyword arguments for upload methods. # Note: Changes here need to be reflected in the blob.open() docstring. VALID_UPLOAD_KWARGS = { "content_type", - "num_retries", "predefined_acl", + "num_retries", "if_generation_match", "if_generation_not_match", "if_metageneration_match", "if_metageneration_not_match", "timeout", "checksum", + "retry", } @@ -58,13 +66,35 @@ class BlobReader(io.BufferedIOBase): bytes than the chunk_size are requested, the remainder is buffered. The default is the chunk_size of the blob, or 40MiB. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :param download_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", "if_metageneration_not_match", "timeout". """ - def __init__(self, blob, chunk_size=None, **download_kwargs): + def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: if kwarg not in VALID_DOWNLOAD_KWARGS: @@ -76,6 +106,7 @@ def __init__(self, blob, chunk_size=None, **download_kwargs): self._pos = 0 self._buffer = io.BytesIO() self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + self._retry = retry self._download_kwargs = download_kwargs def read(self, size=-1): @@ -102,6 +133,7 @@ def read(self, size=-1): start=fetch_start, end=fetch_end, checksum=None, + retry=self._retry, **self._download_kwargs ) except RequestRangeNotSatisfiable: @@ -197,6 +229,28 @@ class BlobWriter(io.BufferedIOBase): changes the behavior of flush() to conform to TextIOWrapper's expectations. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :param upload_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", @@ -204,7 +258,14 @@ class BlobWriter(io.BufferedIOBase): "num_retries", "predefined_acl", "checksum". """ - def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): + def __init__( + self, + blob, + chunk_size=None, + text_mode=False, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, + **upload_kwargs + ): for kwarg in upload_kwargs: if kwarg not in VALID_UPLOAD_KWARGS: raise ValueError( @@ -219,6 +280,7 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): # In text mode this class will be wrapped and TextIOWrapper requires a # different behavior of flush(). self._text_mode = text_mode + self._retry = retry self._upload_kwargs = upload_kwargs @property @@ -259,20 +321,32 @@ def write(self, b): return pos def _initiate_upload(self): + # num_retries is only supported for backwards-compatibility reasons. num_retries = self._upload_kwargs.pop("num_retries", None) + retry = self._retry content_type = self._upload_kwargs.pop("content_type", None) - if ( - self._upload_kwargs.get("if_metageneration_match") is None - and num_retries is None - ): - # Uploads are only idempotent (safe to retry) if - # if_metageneration_match is set. If it is not set, the default - # num_retries should be 0. Note: Because retry logic for uploads is - # provided by the google-resumable-media-python package, it doesn't - # use the ConditionalRetryStrategy class used in other API calls in - # this library to solve this problem. - num_retries = 0 + if num_retries is not None: + warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) + # num_retries and retry are mutually exclusive. If num_retries is + # set and retry is exactly the default, then nullify retry for + # backwards compatibility. + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = None + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), + "ifMetagenerationMatch": self._upload_kwargs.get( + "if_metageneration_match" + ), + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) self._upload_and_transport = self._blob._initiate_resumable_upload( self._blob.bucket.client, @@ -281,6 +355,7 @@ def _initiate_upload(self): None, num_retries, chunk_size=self._chunk_size, + retry=retry, **self._upload_kwargs ) diff --git a/setup.py b/setup.py index 55863aabb..6f6fa1f3d 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ dependencies = [ "google-auth >= 1.11.0, < 2.0dev", "google-cloud-core >= 1.4.1, < 2.0dev", - "google-resumable-media >= 1.2.0, < 2.0dev", + "google-resumable-media >= 1.3.0, < 2.0dev", "requests >= 2.18.0, < 3.0.0dev", "googleapis-common-protos < 1.53.0; python_version<'3.0'", ] diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 275d01c60..75a439cf1 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -593,6 +593,45 @@ def test_hostname_and_scheme(self): self.assertEqual(self._call_fut(host=HOST, scheme=SCHEME), EXPECTED_URL) +class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase): + def test_conflict(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + with self.assertRaises(ValueError): + _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY, num_retries=2) + + def test_retry(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY) + self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum) + self.assertEqual(retry_strategy.max_cumulative_retry, DEFAULT_RETRY._deadline) + self.assertEqual(retry_strategy.initial_delay, DEFAULT_RETRY._initial) + self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier) + + def test_num_retries(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + retry_strategy = _api_core_retry_to_resumable_media_retry( + retry=None, num_retries=2 + ) + self.assertEqual(retry_strategy.max_retries, 2) + + def test_none(self): + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None) + self.assertEqual(retry_strategy.max_retries, 0) + + class _MD5Hash(object): def __init__(self, digest_val): self.digest_val = digest_val diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 3ec0db716..46a130dc8 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -29,6 +29,7 @@ from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED def _make_credentials(): @@ -1114,7 +1115,9 @@ def test__extract_headers_from_download_w_response_headers_not_match(self): self.assertIsNone(blob.md5_hash) self.assertIsNone(blob.crc32c) - def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): + def _do_download_helper_wo_chunks( + self, w_range, raw_download, timeout=None, **extra_kwargs + ): blob_name = "blob-name" client = mock.Mock() bucket = _Bucket(client) @@ -1138,6 +1141,8 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): expected_timeout = timeout timeout_kwarg = {"timeout": timeout} + extra_kwargs.update(timeout_kwarg) + with patch as patched: if w_range: blob._do_download( @@ -1148,7 +1153,7 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): start=1, end=3, raw_download=raw_download, - **timeout_kwarg + **extra_kwargs ) else: blob._do_download( @@ -1157,7 +1162,7 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): download_url, headers, raw_download=raw_download, - **timeout_kwarg + **extra_kwargs ) if w_range: @@ -1183,9 +1188,21 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): transport, timeout=expected_timeout ) + retry_strategy = patched.return_value._retry_strategy + retry = extra_kwargs.get("retry", None) + if retry is None: + self.assertEqual(retry_strategy.max_retries, 0) + else: + self.assertEqual(retry_strategy.max_sleep, retry._maximum) + def test__do_download_wo_chunks_wo_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=False, raw_download=False) + def test__do_download_wo_chunks_wo_range_wo_raw_w_retry(self): + self._do_download_helper_wo_chunks( + w_range=False, raw_download=False, retry=DEFAULT_RETRY + ) + def test__do_download_wo_chunks_w_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=True, raw_download=False) @@ -1334,6 +1351,7 @@ def test_download_to_file_with_failure(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_file_wo_media_link(self): @@ -1361,6 +1379,7 @@ def test_download_to_file_wo_media_link(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_file_w_generation_match(self): @@ -1384,9 +1403,12 @@ def test_download_to_file_w_generation_match(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) - def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): + def _download_to_file_helper( + self, use_chunks, raw_download, timeout=None, **extra_kwargs + ): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1404,12 +1426,15 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): expected_timeout = timeout timeout_kwarg = {"timeout": timeout} + extra_kwargs.update(timeout_kwarg) + file_obj = io.BytesIO() if raw_download: - blob.download_to_file(file_obj, raw_download=True, **timeout_kwarg) + blob.download_to_file(file_obj, raw_download=True, **extra_kwargs) else: - blob.download_to_file(file_obj, **timeout_kwarg) + blob.download_to_file(file_obj, **extra_kwargs) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) client.download_blob_to_file.assert_called_once_with( blob, file_obj, @@ -1422,11 +1447,15 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): raw_download=raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) def test_download_to_file_wo_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=False, raw_download=False) + def test_download_to_file_wo_chunks_no_retry(self): + self._download_to_file_helper(use_chunks=False, raw_download=False, retry=None) + def test_download_to_file_w_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=True, raw_download=False) @@ -1441,7 +1470,9 @@ def test_download_to_file_w_custom_timeout(self): use_chunks=False, raw_download=False, timeout=9.58 ) - def _download_to_filename_helper(self, updated, raw_download, timeout=None): + def _download_to_filename_helper( + self, updated, raw_download, timeout=None, **extra_kwargs + ): import os from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud._testing import _NamedTemporaryFile @@ -1457,10 +1488,15 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): with _NamedTemporaryFile() as temp: if timeout is None: - blob.download_to_filename(temp.name, raw_download=raw_download) + blob.download_to_filename( + temp.name, raw_download=raw_download, **extra_kwargs + ) else: blob.download_to_filename( - temp.name, raw_download=raw_download, timeout=timeout, + temp.name, + raw_download=raw_download, + timeout=timeout, + **extra_kwargs ) if updated is None: @@ -1475,6 +1511,8 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): expected_timeout = self._get_default_timeout() if timeout is None else timeout + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + client.download_blob_to_file.assert_called_once_with( blob, mock.ANY, @@ -1487,6 +1525,7 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): raw_download=raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1495,6 +1534,12 @@ def test_download_to_filename_w_updated_wo_raw(self): updated = "2014-12-06T13:13:50.690Z" self._download_to_filename_helper(updated=updated, raw_download=False) + def test_download_to_filename_w_updated_no_retry(self): + updated = "2014-12-06T13:13:50.690Z" + self._download_to_filename_helper( + updated=updated, raw_download=False, retry=None + ) + def test_download_to_filename_wo_updated_wo_raw(self): self._download_to_filename_helper(updated=None, raw_download=False) @@ -1533,6 +1578,7 @@ def test_download_to_filename_w_generation_match(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1572,11 +1618,12 @@ def test_download_to_filename_corrupted(self): raw_download=False, timeout=expected_timeout, checksum="md5", + retry=DEFAULT_RETRY, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertEqual(stream.name, filename) - def _download_as_bytes_helper(self, raw_download, timeout=None): + def _download_as_bytes_helper(self, raw_download, timeout=None, **extra_kwargs): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1584,12 +1631,16 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): if timeout is None: expected_timeout = self._get_default_timeout() - fetched = blob.download_as_bytes(raw_download=raw_download) + fetched = blob.download_as_bytes(raw_download=raw_download, **extra_kwargs) else: expected_timeout = timeout - fetched = blob.download_as_bytes(raw_download=raw_download, timeout=timeout) + fetched = blob.download_as_bytes( + raw_download=raw_download, timeout=timeout, **extra_kwargs + ) self.assertEqual(fetched, b"") + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + client.download_blob_to_file.assert_called_once_with( blob, mock.ANY, @@ -1602,16 +1653,11 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): raw_download=raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) stream = client.download_blob_to_file.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) - def test_download_as_bytes_wo_raw(self): - self._download_as_bytes_helper(raw_download=False) - - def test_download_as_bytes_w_raw(self): - self._download_as_bytes_helper(raw_download=True) - def test_download_as_bytes_w_custom_timeout(self): self._download_as_bytes_helper(raw_download=False, timeout=9.58) @@ -1640,8 +1686,21 @@ def test_download_as_bytes_w_generation_match(self): if_metageneration_not_match=None, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) + def test_download_as_bytes_wo_raw(self): + self._download_as_bytes_helper(raw_download=False) + + def test_download_as_bytes_no_retry(self): + self._download_as_bytes_helper(raw_download=False, retry=None) + + def test_download_as_bytes_w_raw(self): + self._download_as_bytes_helper(raw_download=True) + + def test_download_as_byte_w_custom_timeout(self): + self._download_as_bytes_helper(raw_download=False, timeout=9.58) + def _download_as_text_helper( self, raw_download, @@ -1658,6 +1717,7 @@ def _download_as_text_helper( no_charset=False, expected_value=u"DEADBEEF", payload=None, + **extra_kwargs ): if payload is None: if encoding is not None: @@ -1709,10 +1769,14 @@ def _download_as_text_helper( else: kwargs["timeout"] = expected_timeout = timeout + kwargs.update(extra_kwargs) + fetched = blob.download_as_text(**kwargs) self.assertEqual(fetched, expected_value) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + blob.download_as_bytes.assert_called_once_with( client=client, start=start, @@ -1723,11 +1787,15 @@ def _download_as_text_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + retry=expected_retry, ) def test_download_as_text_wo_raw(self): self._download_as_text_helper(raw_download=False) + def test_download_as_text_w_no_retry(self): + self._download_as_text_helper(raw_download=False, retry=None) + def test_download_as_text_w_raw(self): self._download_as_text_helper(raw_download=True) @@ -1815,6 +1883,7 @@ def test_download_as_string(self, mock_warn): if_metageneration_not_match=None, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) mock_warn.assert_called_with( @@ -1824,6 +1893,33 @@ def test_download_as_string(self, mock_warn): stacklevel=1, ) + def test_download_as_string_no_retry(self): + MEDIA_LINK = "http://example.com/media/" + + client = self._make_client() + blob = self._make_one( + "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} + ) + client.download_blob_to_file = mock.Mock() + + fetched = blob.download_as_string(retry=None) + self.assertEqual(fetched, b"") + + client.download_blob_to_file.assert_called_once_with( + blob, + mock.ANY, + start=None, + end=None, + raw_download=False, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=self._get_default_timeout(), + checksum="md5", + retry=None, + ) + def test__get_content_type_explicit(self): blob = self._make_one(u"blob-name", bucket=None) @@ -1944,6 +2040,7 @@ def _do_multipart_success( timeout=None, metadata=None, mtls=False, + retry=None, ): from six.moves.urllib.parse import urlencode @@ -1992,6 +2089,7 @@ def _do_multipart_success( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2063,6 +2161,28 @@ def _do_multipart_success( def test__do_multipart_upload_no_size(self, mock_get_boundary): self._do_multipart_success(mock_get_boundary, predefined_acl="private") + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_retry(self, mock_get_boundary): + self._do_multipart_success( + mock_get_boundary, predefined_acl="private", retry=DEFAULT_RETRY + ) + + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_num_retries(self, mock_get_boundary): + self._do_multipart_success( + mock_get_boundary, predefined_acl="private", num_retries=2 + ) + + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_retry_conflict(self, mock_get_boundary): + with self.assertRaises(ValueError): + self._do_multipart_success( + mock_get_boundary, + predefined_acl="private", + num_retries=2, + retry=DEFAULT_RETRY, + ) + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_no_size_mtls(self, mock_get_boundary): self._do_multipart_success( @@ -2101,7 +2221,7 @@ def test__do_multipart_upload_with_kms_with_version(self, mock_get_boundary): @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_retry(self, mock_get_boundary): - self._do_multipart_success(mock_get_boundary, num_retries=8) + self._do_multipart_success(mock_get_boundary, retry=DEFAULT_RETRY) @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_generation_match(self, mock_get_boundary): @@ -2165,6 +2285,7 @@ def _initiate_resumable_helper( timeout=None, metadata=None, mtls=False, + retry=None, ): from six.moves.urllib.parse import urlencode from google.resumable_media.requests import ResumableUpload @@ -2235,6 +2356,7 @@ def _initiate_resumable_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2300,13 +2422,15 @@ def _initiate_resumable_helper( self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) retry_strategy = upload._retry_strategy - self.assertEqual(retry_strategy.max_sleep, 64.0) - if num_retries is None: - self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) - self.assertIsNone(retry_strategy.max_retries) - else: - self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertFalse(num_retries is not None and retry is not None) + if num_retries is not None and retry is None: self.assertEqual(retry_strategy.max_retries, num_retries) + elif retry is None: + self.assertEqual(retry_strategy.max_retries, 0) + else: + self.assertEqual(retry_strategy.max_sleep, 60.0) + self.assertEqual(retry_strategy.max_cumulative_retry, 120.0) + self.assertIsNone(retry_strategy.max_retries) self.assertIs(client._http, transport) # Make sure we never read from the stream. self.assertEqual(stream.tell(), 0) @@ -2383,8 +2507,15 @@ def test__initiate_resumable_upload_with_extra_headers(self): self._initiate_resumable_helper(extra_headers=extra_headers) def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_helper(retry=DEFAULT_RETRY) + + def test__initiate_resumable_upload_with_num_retries(self): self._initiate_resumable_helper(num_retries=11) + def test__initiate_resumable_upload_with_retry_conflict(self): + with self.assertRaises(ValueError): + self._initiate_resumable_helper(retry=DEFAULT_RETRY, num_retries=2) + def test__initiate_resumable_upload_with_generation_match(self): self._initiate_resumable_helper( if_generation_match=4, if_metageneration_match=4 @@ -2536,6 +2667,7 @@ def _do_resumable_helper( if_metageneration_not_match=None, timeout=None, data_corruption=False, + retry=None, ): bucket = _Bucket(name="yesterday") blob = self._make_one(u"blob-name", bucket=bucket) @@ -2582,6 +2714,7 @@ def _do_resumable_helper( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2639,7 +2772,14 @@ def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) def test__do_resumable_upload_with_retry(self): - self._do_resumable_helper(num_retries=6) + self._do_resumable_helper(retry=DEFAULT_RETRY) + + def test__do_resumable_upload_with_num_retries(self): + self._do_resumable_helper(num_retries=8) + + def test__do_resumable_upload_with_retry_conflict(self): + with self.assertRaises(ValueError): + self._do_resumable_helper(num_retries=9, retry=DEFAULT_RETRY) def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") @@ -2665,6 +2805,7 @@ def _do_upload_helper( if_metageneration_not_match=None, size=None, timeout=None, + retry=None, ): from google.cloud.storage.blob import _MAX_MULTIPART_SIZE @@ -2708,13 +2849,12 @@ def _do_upload_helper( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) - # Adjust num_retries expectations to reflect the conditional default in - # _do_upload() - if num_retries is None and if_metageneration_match is None: - num_retries = 0 + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = DEFAULT_RETRY if if_metageneration_match else None self.assertIs(created_json, mock.sentinel.json) response.json.assert_called_once_with() @@ -2732,6 +2872,7 @@ def _do_upload_helper( if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) blob._do_resumable_upload.assert_not_called() else: @@ -2749,6 +2890,7 @@ def _do_upload_helper( if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) def test__do_upload_uses_multipart(self): @@ -2776,7 +2918,18 @@ def test__do_upload_uses_resumable_w_custom_timeout(self): ) def test__do_upload_with_retry(self): - self._do_upload_helper(num_retries=20) + self._do_upload_helper(retry=DEFAULT_RETRY) + + def test__do_upload_with_num_retries(self): + self._do_upload_helper(num_retries=2) + + def test__do_upload_with_conditional_retry_success(self): + self._do_upload_helper( + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, if_metageneration_match=1 + ) + + def test__do_upload_with_conditional_retry_failure(self): + self._do_upload_helper(retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) def _upload_from_file_helper(self, side_effect=None, **kwargs): from google.cloud._helpers import UTC @@ -2800,6 +2953,11 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_generation_not_match = kwargs.get("if_generation_not_match", None) if_metageneration_match = kwargs.get("if_metageneration_match", None) if_metageneration_not_match = kwargs.get("if_metageneration_not_match", None) + num_retries = kwargs.get("num_retries", None) + default_retry = ( + DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + ) + retry = kwargs.get("retry", default_retry) ret_val = blob.upload_from_file( stream, size=len(data), content_type=content_type, client=client, **kwargs ) @@ -2811,8 +2969,6 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): expected_timeout = kwargs.get("timeout", self._get_default_timeout()) - # Check the mock. - num_retries = kwargs.get("num_retries") blob._do_upload.assert_called_once_with( client, stream, @@ -2826,6 +2982,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) return stream @@ -2835,13 +2992,24 @@ def test_upload_from_file_success(self): @mock.patch("warnings.warn") def test_upload_from_file_with_retries(self, mock_warn): + self._upload_from_file_helper(retry=DEFAULT_RETRY) + + @mock.patch("warnings.warn") + def test_upload_from_file_with_num_retries(self, mock_warn): from google.cloud.storage import blob as blob_module - self._upload_from_file_helper(num_retries=20) + self._upload_from_file_helper(num_retries=2) mock_warn.assert_called_once_with( blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 ) + @mock.patch("warnings.warn") + def test_upload_from_file_with_retry_conflict(self, mock_warn): + # Special case here: in a conflict this method should NOT raise an error + # as that's handled further downstream. It should pass both options + # through. + self._upload_from_file_helper(retry=DEFAULT_RETRY, num_retries=2) + def test_upload_from_file_with_rewind(self): stream = self._upload_from_file_helper(rewind=True) assert stream.tell() == 0 @@ -2868,7 +3036,14 @@ def test_upload_from_file_failure(self): self.assertEqual(exc_info.exception.errors, []) def _do_upload_mock_call_helper( - self, blob, client, content_type, size, timeout=None + self, + blob, + client, + content_type, + size, + timeout=None, + num_retries=None, + retry=None, ): self.assertEqual(blob._do_upload.call_count, 1) mock_call = blob._do_upload.mock_calls[0] @@ -2878,7 +3053,7 @@ def _do_upload_mock_call_helper( self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) - self.assertIsNone(pos_args[4]) # num_retries + self.assertEqual(pos_args[4], num_retries) # num_retries self.assertIsNone(pos_args[5]) # predefined_acl self.assertIsNone(pos_args[6]) # if_generation_match self.assertIsNone(pos_args[7]) # if_generation_not_match @@ -2886,7 +3061,13 @@ def _do_upload_mock_call_helper( self.assertIsNone(pos_args[9]) # if_metageneration_not_match expected_timeout = self._get_default_timeout() if timeout is None else timeout - self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None}) + if not retry: + retry = ( + DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + ) + self.assertEqual( + kwargs, {"timeout": expected_timeout, "checksum": None, "retry": retry} + ) return pos_args[1] @@ -2921,6 +3102,72 @@ def test_upload_from_filename(self): self.assertEqual(stream.mode, "rb") self.assertEqual(stream.name, temp.name) + def test_upload_from_filename_with_retry(self): + from google.cloud._testing import _NamedTemporaryFile + + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, "wb") as file_obj: + file_obj.write(data) + + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client, retry=DEFAULT_RETRY + ) + + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json["metadata"]) + + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data), retry=DEFAULT_RETRY + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + self.assertEqual(stream.name, temp.name) + + def test_upload_from_filename_with_num_retries(self): + from google.cloud._testing import _NamedTemporaryFile + + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, "wb") as file_obj: + file_obj.write(data) + + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client, num_retries=2 + ) + + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json["metadata"]) + + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data), num_retries=2 + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + self.assertEqual(stream.name, temp.name) + def test_upload_from_filename_w_custom_timeout(self): from google.cloud._testing import _NamedTemporaryFile @@ -2965,6 +3212,11 @@ def _upload_from_string_helper(self, data, **kwargs): self.assertIsNone(ret_val) self.assertEqual(blob.component_count, 5) + extra_kwargs = {} + if "retry" in kwargs: + extra_kwargs["retry"] = kwargs["retry"] + if "num_retries" in kwargs: + extra_kwargs["num_retries"] = kwargs["num_retries"] # Check the mock. payload = _to_bytes(data, encoding="utf-8") stream = self._do_upload_mock_call_helper( @@ -2973,6 +3225,7 @@ def _upload_from_string_helper(self, data, **kwargs): "text/plain", len(payload), kwargs.get("timeout", self._get_default_timeout()), + **extra_kwargs ) self.assertIsInstance(stream, io.BytesIO) self.assertEqual(stream.getvalue(), payload) @@ -2989,6 +3242,14 @@ def test_upload_from_string_w_text(self): data = u"\N{snowman} \N{sailboat}" self._upload_from_string_helper(data) + def test_upload_from_string_w_text_w_retry(self): + data = u"\N{snowman} \N{sailboat}" + self._upload_from_string_helper(data, retry=DEFAULT_RETRY) + + def test_upload_from_string_w_text_w_num_retries(self): + data = u"\N{snowman} \N{sailboat}" + self._upload_from_string_helper(data, num_retries=2) + def _create_resumable_upload_session_helper( self, origin=None, side_effect=None, timeout=None ): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4c99a3860..33ec331d6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -29,6 +29,7 @@ from . import _read_local_json from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED _SERVICE_ACCOUNT_JSON = _read_local_json("url_signer_v4_test_account.json") @@ -1402,6 +1403,7 @@ def test_download_blob_to_file_with_failure(self): False, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ) def test_download_blob_to_file_with_uri(self): @@ -1432,18 +1434,42 @@ def test_download_blob_to_file_with_uri(self): False, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ) def test_download_blob_to_file_with_invalid_uri(self): project = "PROJECT" - credentials = _make_credentials(project=project) + credentials = _make_credentials() client = self._make_one(project=project, credentials=credentials) file_obj = io.BytesIO() with pytest.raises(ValueError, match="URI scheme must be gs"): client.download_blob_to_file("http://bucket_name/path/to/object", file_obj) - def _download_blob_to_file_helper(self, use_chunks, raw_download): + def test_download_blob_to_file_w_no_retry(self): + self._download_blob_to_file_helper( + use_chunks=True, raw_download=True, retry=None + ) + + def test_download_blob_to_file_w_conditional_retry_pass(self): + self._download_blob_to_file_helper( + use_chunks=True, + raw_download=True, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + if_generation_match=1, + ) + + def test_download_blob_to_file_w_conditional_retry_fail(self): + self._download_blob_to_file_helper( + use_chunks=True, + raw_download=True, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + expect_condition_fail=True, + ) + + def _download_blob_to_file_helper( + self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs + ): from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -1460,9 +1486,20 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download): file_obj = io.BytesIO() if raw_download: - client.download_blob_to_file(blob, file_obj, raw_download=True) + client.download_blob_to_file( + blob, file_obj, raw_download=True, **extra_kwargs + ) else: - client.download_blob_to_file(blob, file_obj) + client.download_blob_to_file(blob, file_obj, **extra_kwargs) + + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + if ( + expected_retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED + and not expect_condition_fail + ): + expected_retry = DEFAULT_RETRY + elif expect_condition_fail: + expected_retry = None headers = {"accept-encoding": "gzip"} blob._do_download.assert_called_once_with( @@ -1475,6 +1512,7 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download): raw_download, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=expected_retry, ) def test_download_blob_to_file_wo_chunks_wo_raw(self): @@ -1520,6 +1558,8 @@ def test_list_blobs_w_defaults_w_bucket_obj(self): max_results=expected_max_results, extra_params=expected_extra_params, page_start=expected_page_start, + timeout=self._get_default_timeout(), + retry=DEFAULT_RETRY, ) def test_list_blobs_w_explicit_w_user_project(self): @@ -1594,6 +1634,8 @@ def test_list_blobs_w_explicit_w_user_project(self): max_results=expected_max_results, extra_params=expected_extra_params, page_start=expected_page_start, + timeout=timeout, + retry=retry, ) def test_list_buckets_wo_project(self): diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 0ac16ab24..6ce9b4990 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -19,8 +19,10 @@ import io import string +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage.fileio import BlobReader, BlobWriter, SlidingBuffer from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage.retry import DEFAULT_RETRY TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n" TEST_BINARY_DATA = TEST_TEXT_DATA.encode("utf-8") @@ -37,7 +39,15 @@ def test_attributes(self): self.assertTrue(reader.seekable()) self.assertTrue(reader.readable()) self.assertFalse(reader.writable()) - self.assertEqual(256, reader._chunk_size) + self.assertEqual(reader._chunk_size, 256) + self.assertEqual(reader._retry, DEFAULT_RETRY) + + def test_attributes_explict(self): + blob = mock.Mock() + blob.chunk_size = 256 + reader = BlobReader(blob, chunk_size=1024, retry=None) + self.assertEqual(reader._chunk_size, 1024) + self.assertIsNone(reader._retry) def test_read(self): blob = mock.Mock() @@ -52,7 +62,7 @@ def read_from_fake_data(start=0, end=None, **_): # Read and trigger the first download of chunk_size. self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) blob.download_as_bytes.assert_called_once_with( - start=0, end=8, checksum=None, **download_kwargs + start=0, end=8, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read from buffered data only. @@ -64,7 +74,7 @@ def read_from_fake_data(start=0, end=None, **_): self.assertEqual(reader._pos, 12) self.assertEqual(blob.download_as_bytes.call_count, 2) blob.download_as_bytes.assert_called_with( - start=8, end=16, checksum=None, **download_kwargs + start=8, end=16, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read a larger amount, requiring a download larger than chunk_size. @@ -72,14 +82,32 @@ def read_from_fake_data(start=0, end=None, **_): self.assertEqual(reader._pos, 28) self.assertEqual(blob.download_as_bytes.call_count, 3) blob.download_as_bytes.assert_called_with( - start=16, end=28, checksum=None, **download_kwargs + start=16, end=28, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read all remaining data. self.assertEqual(reader.read(), TEST_BINARY_DATA[28:]) self.assertEqual(blob.download_as_bytes.call_count, 4) blob.download_as_bytes.assert_called_with( - start=28, end=None, checksum=None, **download_kwargs + start=28, end=None, checksum=None, retry=DEFAULT_RETRY, **download_kwargs + ) + + reader.close() + + def test_retry_passed_through(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, retry=None, **download_kwargs) + + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) + blob.download_as_bytes.assert_called_once_with( + start=0, end=8, checksum=None, retry=None, **download_kwargs ) reader.close() @@ -104,12 +132,16 @@ def read_from_fake_data(start=0, end=None, **_): # Read a line. With chunk_size=10, expect three chunks downloaded. self.assertEqual(reader.readline(), TEST_BINARY_DATA[:27]) - blob.download_as_bytes.assert_called_with(start=20, end=30, checksum=None) + blob.download_as_bytes.assert_called_with( + start=20, end=30, checksum=None, retry=DEFAULT_RETRY + ) self.assertEqual(blob.download_as_bytes.call_count, 3) # Read another line. self.assertEqual(reader.readline(), TEST_BINARY_DATA[27:]) - blob.download_as_bytes.assert_called_with(start=50, end=60, checksum=None) + blob.download_as_bytes.assert_called_with( + start=50, end=60, checksum=None, retry=DEFAULT_RETRY + ) self.assertEqual(blob.download_as_bytes.call_count, 6) blob.size = len(TEST_BINARY_DATA) @@ -118,7 +150,10 @@ def read_from_fake_data(start=0, end=None, **_): # Read all lines. The readlines algorithm will attempt to read past the end of the last line once to verify there is no more to read. self.assertEqual(b"".join(reader.readlines()), TEST_BINARY_DATA) blob.download_as_bytes.assert_called_with( - start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10, checksum=None + start=len(TEST_BINARY_DATA), + end=len(TEST_BINARY_DATA) + 10, + checksum=None, + retry=DEFAULT_RETRY, ) self.assertEqual(blob.download_as_bytes.call_count, 13) @@ -209,7 +244,14 @@ def test_attributes(self): self.assertFalse(writer.seekable()) self.assertFalse(writer.readable()) self.assertTrue(writer.writable()) - self.assertEqual(256 * 1024, writer._chunk_size) + self.assertEqual(writer._chunk_size, 256 * 1024) + + def test_attributes_explicit(self): + blob = mock.Mock() + blob.chunk_size = 256 * 1024 + writer = BlobWriter(blob, chunk_size=512 * 1024, retry=DEFAULT_RETRY) + self.assertEqual(writer._chunk_size, 512 * 1024) + self.assertEqual(writer._retry, DEFAULT_RETRY) def test_reject_wrong_chunk_size(self): blob = mock.Mock() @@ -261,6 +303,7 @@ def test_write(self): None, NUM_RETRIES, chunk_size=chunk_size, + retry=None, **upload_kwargs ) upload.transmit_next_chunk.assert_called_with(transport) @@ -286,7 +329,56 @@ def test_seek_fails(self): with self.assertRaises(io.UnsupportedOperation): writer.seek() - def test_conditional_retries(self): + def test_conditional_retry_failure(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, chunk_size=chunk_size, content_type=PLAIN_CONTENT_TYPE, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + # Due to the condition not being fulfilled, retry should be None. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=None, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_conditional_retry_pass(self): blob = mock.Mock() upload = mock.Mock() @@ -302,8 +394,8 @@ def test_conditional_retries(self): writer = BlobWriter( blob, chunk_size=chunk_size, - num_retries=None, content_type=PLAIN_CONTENT_TYPE, + if_metageneration_match=1, ) # The transmit_next_chunk method must actually consume bytes from the @@ -319,15 +411,69 @@ def test_conditional_retries(self): # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. - # Due to the condition not being fulfilled, num_retries should be 0. + # Due to the condition being fulfilled, retry should be DEFAULT_RETRY. writer.write(TEST_BINARY_DATA[4:32]) blob._initiate_resumable_upload.assert_called_once_with( blob.bucket.client, writer._buffer, PLAIN_CONTENT_TYPE, - None, - 0, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + if_metageneration_match=1, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + @mock.patch("warnings.warn") + def test_forced_default_retry(self, mock_warn): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + retry=DEFAULT_RETRY, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + None, # num_retries chunk_size=chunk_size, + retry=DEFAULT_RETRY, ) upload.transmit_next_chunk.assert_called_with(transport) self.assertEqual(upload.transmit_next_chunk.call_count, 4) @@ -337,6 +483,99 @@ def test_conditional_retries(self): writer.close() self.assertEqual(upload.transmit_next_chunk.call_count, 5) + def test_num_retries_and_retry_conflict(self): + blob = mock.Mock() + + blob._initiate_resumable_upload.side_effect = ValueError + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + num_retries=2, + retry=DEFAULT_RETRY, + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. The mock will raise a ValueError, simulating + # actual behavior when num_retries and retry are both specified. + with self.assertRaises(ValueError): + writer.write(TEST_BINARY_DATA[4:32]) + + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + 2, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + ) + + @mock.patch("warnings.warn") + def test_num_retries_only(self, mock_warn): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + num_retries=2, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + 2, # num_retries + chunk_size=chunk_size, + retry=None, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + mock_warn.assert_called_once_with( + _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 + ) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + def test_rejects_invalid_kwargs(self): blob = mock.Mock() with self.assertRaises(ValueError): @@ -606,5 +845,6 @@ def test_write(self): None, NUM_RETRIES, chunk_size=chunk_size, + retry=None, ) upload.transmit_next_chunk.assert_called_with(transport)