Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: retry API calls with exponential backoff (#287)
Retries errors for idempotent API calls by default. Some API calls are conditionally idempotent (only idempotent if etag, generation, if_generation_match, if_metageneration_match are specified); in those cases, retries are also conditional on the inclusion of that data in the call.
  • Loading branch information
andrewsg committed Oct 16, 2020
1 parent 6f865d9 commit fbe5d9c
Show file tree
Hide file tree
Showing 16 changed files with 521 additions and 13 deletions.
6 changes: 6 additions & 0 deletions google/cloud/storage/_helpers.py
Expand Up @@ -24,6 +24,8 @@

from six.moves.urllib.parse import urlsplit
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED


STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST"
Expand Down Expand Up @@ -205,6 +207,7 @@ def reload(
headers=self._encryption_headers(),
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -306,6 +309,7 @@ def patch(
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -368,13 +372,15 @@ def update(
if_metageneration_match=if_metageneration_match,
if_metageneration_not_match=if_metageneration_not_match,
)

api_response = client._connection.api_request(
method="PUT",
path=self.path,
data=self._properties,
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down
15 changes: 15 additions & 0 deletions google/cloud/storage/_http.py
Expand Up @@ -14,6 +14,8 @@

"""Create / interact with Google Cloud Storage connections."""

import functools

from google.cloud import _http

from google.cloud.storage import __version__
Expand Down Expand Up @@ -46,3 +48,16 @@ def __init__(self, client, client_info=None, api_endpoint=DEFAULT_API_ENDPOINT):

API_URL_TEMPLATE = "{api_base_url}/storage/{api_version}{path}"
"""A template for the URL of a particular API call."""

def api_request(self, *args, **kwargs):
retry = kwargs.pop("retry", None)
call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
if retry:
# If this is a ConditionalRetryPolicy, check conditions.
try:
retry = retry.get_retry_policy_if_conditions_met(**kwargs)
except AttributeError: # This is not a ConditionalRetryPolicy.
pass
if retry:
call = retry(call)
return call()
3 changes: 3 additions & 0 deletions google/cloud/storage/blob.py
Expand Up @@ -74,6 +74,7 @@
from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS
from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED


_API_ACCESS_ENDPOINT = "https://storage.googleapis.com"
Expand Down Expand Up @@ -2856,6 +2857,7 @@ def compose(
data=request,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -3000,6 +3002,7 @@ def rewrite(
headers=headers,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)
rewritten = int(api_response["totalBytesRewritten"])
size = int(api_response["objectSize"])
Expand Down
24 changes: 20 additions & 4 deletions google/cloud/storage/bucket.py
Expand Up @@ -57,7 +57,9 @@
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT

from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON

_UBLA_BPO_ENABLED_MESSAGE = (
"Pass only one of 'uniform_bucket_level_access_enabled' / "
Expand Down Expand Up @@ -1244,7 +1246,9 @@ def list_blobs(

client = self._require_client(client)
path = self.path + "/o"
api_request = functools.partial(client._connection.api_request, timeout=timeout)
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
Expand Down Expand Up @@ -1283,7 +1287,9 @@ def list_notifications(self, client=None, timeout=_DEFAULT_TIMEOUT):
"""
client = self._require_client(client)
path = self.path + "/notificationConfigs"
api_request = functools.partial(client._connection.api_request, timeout=timeout)
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
Expand Down Expand Up @@ -1424,6 +1430,7 @@ def delete(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY,
)

def delete_blob(
Expand Down Expand Up @@ -1521,6 +1528,7 @@ def delete_blob(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)

def delete_blobs(
Expand Down Expand Up @@ -1795,6 +1803,7 @@ def copy_blob(
query_params=query_params,
_target_object=new_blob,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)

if not preserve_acl:
Expand Down Expand Up @@ -2644,6 +2653,7 @@ def get_iam_policy(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY,
)
return Policy.from_api_repr(info)

Expand Down Expand Up @@ -2689,6 +2699,7 @@ def set_iam_policy(self, policy, client=None, timeout=_DEFAULT_TIMEOUT):
data=resource,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY_IF_ETAG_IN_JSON,
)
return Policy.from_api_repr(info)

Expand Down Expand Up @@ -2727,7 +2738,11 @@ def test_iam_permissions(self, permissions, client=None, timeout=_DEFAULT_TIMEOU

path = "%s/iam/testPermissions" % (self.path,)
resp = client._connection.api_request(
method="GET", path=path, query_params=query_params, timeout=timeout
method="GET",
path=path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
return resp.get("permissions", [])

Expand Down Expand Up @@ -2967,6 +2982,7 @@ def lock_retention_policy(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(api_response)

Expand Down
18 changes: 14 additions & 4 deletions google/cloud/storage/client.py
Expand Up @@ -45,6 +45,7 @@
from google.cloud.storage.acl import BucketACL
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY


_marker = object()
Expand Down Expand Up @@ -255,7 +256,7 @@ def get_service_account_email(self, project=None, timeout=_DEFAULT_TIMEOUT):
project = self.project
path = "/projects/%s/serviceAccount" % (project,)
api_response = self._base_connection.api_request(
method="GET", path=path, timeout=timeout
method="GET", path=path, timeout=timeout, retry=DEFAULT_RETRY,
)
return api_response["email_address"]

Expand Down Expand Up @@ -531,6 +532,7 @@ def create_bucket(
data=properties,
_target_object=bucket,
timeout=timeout,
retry=DEFAULT_RETRY,
)

bucket._set_properties(api_response)
Expand Down Expand Up @@ -777,7 +779,9 @@ def list_buckets(
if fields is not None:
extra_params["fields"] = fields

api_request = functools.partial(self._connection.api_request, timeout=timeout)
api_request = functools.partial(
self._connection.api_request, retry=DEFAULT_RETRY, timeout=timeout
)

return page_iterator.HTTPIterator(
client=self,
Expand Down Expand Up @@ -829,7 +833,11 @@ def create_hmac_key(
qs_params["userProject"] = user_project

api_response = self._connection.api_request(
method="POST", path=path, query_params=qs_params, timeout=timeout
method="POST",
path=path,
query_params=qs_params,
timeout=timeout,
retry=None,
)
metadata = HMACKeyMetadata(self)
metadata._properties = api_response["metadata"]
Expand Down Expand Up @@ -893,7 +901,9 @@ def list_hmac_keys(
if user_project is not None:
extra_params["userProject"] = user_project

api_request = functools.partial(self._connection.api_request, timeout=timeout)
api_request = functools.partial(
self._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)

return page_iterator.HTTPIterator(
client=self,
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/storage/hmac_key.py
Expand Up @@ -16,6 +16,8 @@
from google.cloud._helpers import _rfc3339_to_datetime

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON


class HMACKeyMetadata(object):
Expand Down Expand Up @@ -260,6 +262,7 @@ def update(self, timeout=_DEFAULT_TIMEOUT):
data=payload,
query_params=qs_params,
timeout=timeout,
retry=DEFAULT_RETRY_IF_ETAG_IN_JSON,
)

def delete(self, timeout=_DEFAULT_TIMEOUT):
Expand All @@ -283,5 +286,9 @@ def delete(self, timeout=_DEFAULT_TIMEOUT):
qs_params["userProject"] = self.user_project

self._client._connection.api_request(
method="DELETE", path=self.path, query_params=qs_params, timeout=timeout
method="DELETE",
path=self.path,
query_params=qs_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
14 changes: 12 additions & 2 deletions google/cloud/storage/notification.py
Expand Up @@ -19,6 +19,7 @@
from google.api_core.exceptions import NotFound

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY


OBJECT_FINALIZE_EVENT_TYPE = "OBJECT_FINALIZE"
Expand Down Expand Up @@ -271,6 +272,7 @@ def create(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params=query_params,
data=properties,
timeout=timeout,
retry=None,
)

def exists(self, client=None, timeout=_DEFAULT_TIMEOUT):
Expand Down Expand Up @@ -347,7 +349,11 @@ def reload(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params["userProject"] = self.bucket.user_project

response = client._connection.api_request(
method="GET", path=self.path, query_params=query_params, timeout=timeout
method="GET",
path=self.path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(response)

Expand Down Expand Up @@ -385,7 +391,11 @@ def delete(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params["userProject"] = self.bucket.user_project

client._connection.api_request(
method="DELETE", path=self.path, query_params=query_params, timeout=timeout
method="DELETE",
path=self.path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)


Expand Down

0 comments on commit fbe5d9c

Please sign in to comment.