Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Bucket.reload() and Bucket.update() wrappers to restrict generation match args #153

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 85 additions & 0 deletions google/cloud/storage/bucket.py
Expand Up @@ -783,6 +783,91 @@ def create(
timeout=timeout,
)

def update(
self,
client=None,
timeout=_DEFAULT_TIMEOUT,
if_metageneration_match=None,
if_metageneration_not_match=None,
):
"""Sends all properties in a PUT request.

Updates the ``_properties`` with the response from the backend.

If :attr:`user_project` is set, bills the API request to that project.

:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current object.

:type timeout: float or tuple
:param timeout: (Optional) The amount of time, in seconds, to wait
for the server response.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

:type if_metageneration_match: long
:param if_metageneration_match: (Optional) Make the operation conditional on whether the
blob's current metageneration matches the given value.

:type if_metageneration_not_match: long
:param if_metageneration_not_match: (Optional) Make the operation conditional on whether the
blob's current metageneration does not match the given value.
"""
super(Bucket, self).update(
client=client,
timeout=timeout,
if_metageneration_match=if_metageneration_match,
if_metageneration_not_match=if_metageneration_not_match,
)

def reload(
self,
client=None,
projection="noAcl",
timeout=_DEFAULT_TIMEOUT,
if_metageneration_match=None,
if_metageneration_not_match=None,
):
"""Reload properties from Cloud Storage.

If :attr:`user_project` is set, bills the API request to that project.

:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current object.

:type projection: str
:param projection: (Optional) If used, must be 'full' or 'noAcl'.
Defaults to ``'noAcl'``. Specifies the set of
properties to return.

:type timeout: float or tuple
:param timeout: (Optional) The amount of time, in seconds, to wait
for the server response.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

:type if_metageneration_match: long
:param if_metageneration_match: (Optional) Make the operation conditional on whether the
blob's current metageneration matches the given value.

:type if_metageneration_not_match: long
:param if_metageneration_not_match: (Optional) Make the operation conditional on whether the
blob's current metageneration does not match the given value.
"""
super(Bucket, self).reload(
client=client,
projection=projection,
timeout=timeout,
if_metageneration_match=if_metageneration_match,
if_metageneration_not_match=if_metageneration_not_match,
)

def patch(
self,
client=None,
Expand Down
56 changes: 56 additions & 0 deletions tests/unit/test_bucket.py
Expand Up @@ -1184,6 +1184,62 @@ def test_delete_blobs_miss_w_on_error(self):
self.assertEqual(kw[1]["path"], "/b/%s/o/%s" % (NAME, NONESUCH))
self.assertEqual(kw[1]["timeout"], self._get_default_timeout())

def test_reload_bucket_w_metageneration_match(self):
NAME = "name"
METAGENERATION_NUMBER = 9

connection = _Connection({})
client = _Client(connection)
bucket = self._make_one(client=client, name=NAME)

bucket.reload(if_metageneration_match=METAGENERATION_NUMBER)

self.assertEqual(len(connection._requested), 1)
req = connection._requested[0]
self.assertEqual(req["method"], "GET")
self.assertEqual(req["path"], "/b/%s" % NAME)
self.assertEqual(req["timeout"], self._get_default_timeout())
self.assertEqual(
req["query_params"],
{"projection": "noAcl", "ifMetagenerationMatch": METAGENERATION_NUMBER},
)

def test_reload_bucket_w_generation_match(self):
connection = _Connection({})
client = _Client(connection)
bucket = self._make_one(client=client, name="name")

with self.assertRaises(TypeError):
bucket.reload(if_generation_match=6)

def test_update_bucket_w_metageneration_match(self):
NAME = "name"
METAGENERATION_NUMBER = 9

connection = _Connection({})
client = _Client(connection)
bucket = self._make_one(client=client, name=NAME)

bucket.update(if_metageneration_match=METAGENERATION_NUMBER)

self.assertEqual(len(connection._requested), 1)
req = connection._requested[0]
self.assertEqual(req["method"], "PUT")
self.assertEqual(req["path"], "/b/%s" % NAME)
self.assertEqual(req["timeout"], self._get_default_timeout())
self.assertEqual(
req["query_params"],
{"projection": "full", "ifMetagenerationMatch": METAGENERATION_NUMBER},
)

def test_update_bucket_w_generation_match(self):
connection = _Connection({})
client = _Client(connection)
bucket = self._make_one(client=client, name="name")

with self.assertRaises(TypeError):
bucket.update(if_generation_match=6)

@staticmethod
def _make_blob(bucket_name, blob_name):
from google.cloud.storage.blob import Blob
Expand Down