Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry API calls with exponential backoff #287

Merged
merged 15 commits into from Oct 16, 2020
6 changes: 6 additions & 0 deletions google/cloud/storage/_helpers.py
Expand Up @@ -24,6 +24,8 @@

from six.moves.urllib.parse import urlsplit
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED


STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST"
Expand Down Expand Up @@ -205,6 +207,7 @@ def reload(
headers=self._encryption_headers(),
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -306,6 +309,7 @@ def patch(
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -368,13 +372,15 @@ def update(
if_metageneration_match=if_metageneration_match,
if_metageneration_not_match=if_metageneration_not_match,
)

api_response = client._connection.api_request(
method="PUT",
path=self.path,
data=self._properties,
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down
14 changes: 14 additions & 0 deletions google/cloud/storage/_http.py
Expand Up @@ -14,6 +14,8 @@

"""Create / interact with Google Cloud Storage connections."""

import functools

from google.cloud import _http

from google.cloud.storage import __version__
Expand Down Expand Up @@ -46,3 +48,15 @@ def __init__(self, client, client_info=None, api_endpoint=DEFAULT_API_ENDPOINT):

API_URL_TEMPLATE = "{api_base_url}/storage/{api_version}{path}"
"""A template for the URL of a particular API call."""

def api_request(self, *args, retry=None, **kwargs):
call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
if retry:
# If this is a ConditionalRetryPolicy, check conditions.
try:
retry = retry.get_retry_policy_if_conditions_met(**kwargs)
except AttributeError:
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
pass
if retry:
call = retry(call)
return call()
3 changes: 3 additions & 0 deletions google/cloud/storage/blob.py
Expand Up @@ -74,6 +74,7 @@
from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS
from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED


_API_ACCESS_ENDPOINT = "https://storage.googleapis.com"
Expand Down Expand Up @@ -2856,6 +2857,7 @@ def compose(
data=request,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -3000,6 +3002,7 @@ def rewrite(
headers=headers,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)
rewritten = int(api_response["totalBytesRewritten"])
size = int(api_response["objectSize"])
Expand Down
26 changes: 22 additions & 4 deletions google/cloud/storage/bucket.py
Expand Up @@ -57,7 +57,11 @@
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT

from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
from google.cloud.storage.retry import (
DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED_OR_ETAG_IN_JSON,
)

_UBLA_BPO_ENABLED_MESSAGE = (
"Pass only one of 'uniform_bucket_level_access_enabled' / "
Expand Down Expand Up @@ -1244,7 +1248,9 @@ def list_blobs(

client = self._require_client(client)
path = self.path + "/o"
api_request = functools.partial(client._connection.api_request, timeout=timeout)
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
Expand Down Expand Up @@ -1283,7 +1289,9 @@ def list_notifications(self, client=None, timeout=_DEFAULT_TIMEOUT):
"""
client = self._require_client(client)
path = self.path + "/notificationConfigs"
api_request = functools.partial(client._connection.api_request, timeout=timeout)
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
Expand Down Expand Up @@ -1424,6 +1432,7 @@ def delete(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY,
)

def delete_blob(
Expand Down Expand Up @@ -1521,6 +1530,7 @@ def delete_blob(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)

def delete_blobs(
Expand Down Expand Up @@ -1795,6 +1805,7 @@ def copy_blob(
query_params=query_params,
_target_object=new_blob,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)

if not preserve_acl:
Expand Down Expand Up @@ -2644,6 +2655,7 @@ def get_iam_policy(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY,
)
return Policy.from_api_repr(info)

Expand Down Expand Up @@ -2689,6 +2701,7 @@ def set_iam_policy(self, policy, client=None, timeout=_DEFAULT_TIMEOUT):
data=resource,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED_OR_ETAG_IN_JSON,
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
)
return Policy.from_api_repr(info)

Expand Down Expand Up @@ -2727,7 +2740,11 @@ def test_iam_permissions(self, permissions, client=None, timeout=_DEFAULT_TIMEOU

path = "%s/iam/testPermissions" % (self.path,)
resp = client._connection.api_request(
method="GET", path=path, query_params=query_params, timeout=timeout
method="GET",
path=path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
return resp.get("permissions", [])

Expand Down Expand Up @@ -2967,6 +2984,7 @@ def lock_retention_policy(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(api_response)

Expand Down
18 changes: 14 additions & 4 deletions google/cloud/storage/client.py
Expand Up @@ -45,6 +45,7 @@
from google.cloud.storage.acl import BucketACL
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY


_marker = object()
Expand Down Expand Up @@ -255,7 +256,7 @@ def get_service_account_email(self, project=None, timeout=_DEFAULT_TIMEOUT):
project = self.project
path = "/projects/%s/serviceAccount" % (project,)
api_response = self._base_connection.api_request(
method="GET", path=path, timeout=timeout
method="GET", path=path, timeout=timeout, retry=DEFAULT_RETRY,
)
return api_response["email_address"]

Expand Down Expand Up @@ -531,6 +532,7 @@ def create_bucket(
data=properties,
_target_object=bucket,
timeout=timeout,
retry=DEFAULT_RETRY,
)

bucket._set_properties(api_response)
Expand Down Expand Up @@ -777,7 +779,9 @@ def list_buckets(
if fields is not None:
extra_params["fields"] = fields

api_request = functools.partial(self._connection.api_request, timeout=timeout)
api_request = functools.partial(
self._connection.api_request, retry=DEFAULT_RETRY, timeout=timeout
)

return page_iterator.HTTPIterator(
client=self,
Expand Down Expand Up @@ -829,7 +833,11 @@ def create_hmac_key(
qs_params["userProject"] = user_project

api_response = self._connection.api_request(
method="POST", path=path, query_params=qs_params, timeout=timeout
method="POST",
path=path,
query_params=qs_params,
timeout=timeout,
retry=None,
)
metadata = HMACKeyMetadata(self)
metadata._properties = api_response["metadata"]
Expand Down Expand Up @@ -893,7 +901,9 @@ def list_hmac_keys(
if user_project is not None:
extra_params["userProject"] = user_project

api_request = functools.partial(self._connection.api_request, timeout=timeout)
api_request = functools.partial(
self._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)

return page_iterator.HTTPIterator(
client=self,
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/storage/hmac_key.py
Expand Up @@ -16,6 +16,8 @@
from google.cloud._helpers import _rfc3339_to_datetime

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED


class HMACKeyMetadata(object):
Expand Down Expand Up @@ -260,6 +262,7 @@ def update(self, timeout=_DEFAULT_TIMEOUT):
data=payload,
query_params=qs_params,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
)

def delete(self, timeout=_DEFAULT_TIMEOUT):
Expand All @@ -283,5 +286,9 @@ def delete(self, timeout=_DEFAULT_TIMEOUT):
qs_params["userProject"] = self.user_project

self._client._connection.api_request(
method="DELETE", path=self.path, query_params=qs_params, timeout=timeout
method="DELETE",
path=self.path,
query_params=qs_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
14 changes: 12 additions & 2 deletions google/cloud/storage/notification.py
Expand Up @@ -19,6 +19,7 @@
from google.api_core.exceptions import NotFound

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY


OBJECT_FINALIZE_EVENT_TYPE = "OBJECT_FINALIZE"
Expand Down Expand Up @@ -271,6 +272,7 @@ def create(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params=query_params,
data=properties,
timeout=timeout,
retry=None,
)

def exists(self, client=None, timeout=_DEFAULT_TIMEOUT):
Expand Down Expand Up @@ -347,7 +349,11 @@ def reload(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params["userProject"] = self.bucket.user_project

response = client._connection.api_request(
method="GET", path=self.path, query_params=query_params, timeout=timeout
method="GET",
path=self.path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(response)

Expand Down Expand Up @@ -385,7 +391,11 @@ def delete(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params["userProject"] = self.bucket.user_project

client._connection.api_request(
method="DELETE", path=self.path, query_params=query_params, timeout=timeout
method="DELETE",
path=self.path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)


Expand Down