Skip to content

Commit

Permalink
fix: retry ConnectionError and similar errors that occur mid-download (
Browse files Browse the repository at this point in the history
…#251)

* Wrap Download business logic in a single retry block

* apply single retry block to download consume operations

* Make upload retries match downloads and remove old code

* fix coverage and refactor helper method

Co-authored-by: Cathy Ouyang <cathyo@google.com>
  • Loading branch information
andrewsg and cojenco committed Aug 19, 2021
1 parent 0248317 commit bb3ec13
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 203 deletions.
26 changes: 17 additions & 9 deletions google/resumable_media/_helpers.py
Expand Up @@ -140,6 +140,10 @@ def wait_and_retry(func, get_status_code, retry_strategy):
Expects ``func`` to return an HTTP response and uses ``get_status_code``
to check if the response is retry-able.
``func`` is expected to raise a failure status code as a
common.InvalidResponse, at which point this method will check the code
against the common.RETRIABLE list of retriable status codes.
Will retry until :meth:`~.RetryStrategy.retry_allowed` (on the current
``retry_strategy``) returns :data:`False`. Uses
:func:`calculate_retry_wait` to double the wait time (with jitter) after
Expand Down Expand Up @@ -176,18 +180,22 @@ def wait_and_retry(func, get_status_code, retry_strategy):
try:
response = func()
except connection_error_exceptions as e:
error = e
error = e # Fall through to retry, if there are retries left.
except common.InvalidResponse as e:
# An InvalidResponse is only retriable if its status code matches.
# The `process_response()` method on a Download or Upload method
# will convert the status code into an exception.
if get_status_code(e.response) in common.RETRYABLE:
error = e # Fall through to retry, if there are retries left.
else:
raise # If the status code is not retriable, raise w/o retry.
else:
if get_status_code(response) not in common.RETRYABLE:
return response
return response

if not retry_strategy.retry_allowed(total_sleep, num_retries):
# Retries are exhausted and no acceptable response was received. Raise the
# retriable_error or return the unacceptable response.
if error:
raise error

return response
# Retries are exhausted and no acceptable response was received.
# Raise the retriable_error.
raise error

base_wait, wait_time = calculate_retry_wait(
base_wait, retry_strategy.max_sleep, retry_strategy.multiplier
Expand Down
41 changes: 0 additions & 41 deletions google/resumable_media/requests/_request_helpers.py
Expand Up @@ -18,9 +18,6 @@
"""


import functools

from google.resumable_media import _helpers
from google.resumable_media import common


Expand Down Expand Up @@ -96,41 +93,3 @@ def _get_body(response):
)
response._content_consumed = True
return response._content


def http_request(
transport,
method,
url,
data=None,
headers=None,
retry_strategy=_DEFAULT_RETRY_STRATEGY,
**transport_kwargs
):
"""Make an HTTP request.
Args:
transport (~requests.Session): A ``requests`` object which can make
authenticated requests via a ``request()`` method. This method
must accept an HTTP method, an upload URL, a ``data`` keyword
argument and a ``headers`` keyword argument.
method (str): The HTTP method for the request.
url (str): The URL for the request.
data (Optional[bytes]): The body of the request.
headers (Mapping[str, str]): The headers for the request (``transport``
may also add additional headers).
retry_strategy (~google.resumable_media.common.RetryStrategy): The
strategy to use if the request fails and must be retried.
transport_kwargs (Dict[str, str]): Extra keyword arguments to be
passed along to ``transport.request``.
Returns:
~requests.Response: The return value of ``transport.request()``.
"""
if "timeout" not in transport_kwargs:
transport_kwargs["timeout"] = (_DEFAULT_CONNECT_TIMEOUT, _DEFAULT_READ_TIMEOUT)

func = functools.partial(
transport.request, method, url, data=data, headers=headers, **transport_kwargs
)
return _helpers.wait_and_retry(func, RequestsMixin._get_status_code, retry_strategy)
108 changes: 64 additions & 44 deletions google/resumable_media/requests/download.py
Expand Up @@ -157,20 +157,25 @@ def consume(
request_kwargs = {
"data": payload,
"headers": headers,
"retry_strategy": self._retry_strategy,
"timeout": timeout,
}
if self._stream is not None:
request_kwargs["stream"] = True

result = _request_helpers.http_request(transport, method, url, **request_kwargs)
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(method, url, **request_kwargs)

self._process_response(result)
self._process_response(result)

if self._stream is not None:
self._write_to_stream(result)
if self._stream is not None:
self._write_to_stream(result)

return result

return result
return _helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)


class RawDownload(_request_helpers.RawRequestsMixin, _download.Download):
Expand Down Expand Up @@ -285,24 +290,29 @@ def consume(
finished.
"""
method, url, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
result = _request_helpers.http_request(
transport,
method,
url,
data=payload,
headers=headers,
retry_strategy=self._retry_strategy,
stream=True,
timeout=timeout,
)

self._process_response(result)
# Wrap the request business logic in a function to be retried.
def retriable_request():
# NOTE: We assume "payload is None" but pass it along anyway.
result = transport.request(
method,
url,
data=payload,
headers=headers,
stream=True,
timeout=timeout,
)

if self._stream is not None:
self._write_to_stream(result)
self._process_response(result)

if self._stream is not None:
self._write_to_stream(result)

return result

return result
return _helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)


class ChunkedDownload(_request_helpers.RequestsMixin, _download.ChunkedDownload):
Expand Down Expand Up @@ -361,18 +371,23 @@ def consume_next_chunk(
ValueError: If the current download has finished.
"""
method, url, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
result = _request_helpers.http_request(
transport,
method,
url,
data=payload,
headers=headers,
retry_strategy=self._retry_strategy,
timeout=timeout,

# Wrap the request business logic in a function to be retried.
def retriable_request():
# NOTE: We assume "payload is None" but pass it along anyway.
result = transport.request(
method,
url,
data=payload,
headers=headers,
timeout=timeout,
)
self._process_response(result)
return result

return _helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
self._process_response(result)
return result


class RawChunkedDownload(_request_helpers.RawRequestsMixin, _download.ChunkedDownload):
Expand Down Expand Up @@ -431,19 +446,24 @@ def consume_next_chunk(
ValueError: If the current download has finished.
"""
method, url, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
result = _request_helpers.http_request(
transport,
method,
url,
data=payload,
headers=headers,
stream=True,
retry_strategy=self._retry_strategy,
timeout=timeout,

# Wrap the request business logic in a function to be retried.
def retriable_request():
# NOTE: We assume "payload is None" but pass it along anyway.
result = transport.request(
method,
url,
data=payload,
headers=headers,
stream=True,
timeout=timeout,
)
self._process_response(result)
return result

return _helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
self._process_response(result)
return result


def _add_decoder(response_raw, checksum):
Expand Down

0 comments on commit bb3ec13

Please sign in to comment.