Skip to content

Commit

Permalink
feat: add public access prevention to bucket IAM configuration (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
shaffeeullah committed Jun 30, 2021
1 parent b20ca20 commit e3e57a9
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
31 changes: 30 additions & 1 deletion google/cloud/storage/bucket.py
Expand Up @@ -50,6 +50,7 @@
from google.cloud.storage.constants import MULTI_REGIONAL_LEGACY_STORAGE_CLASS
from google.cloud.storage.constants import MULTI_REGION_LOCATION_TYPE
from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS
from google.cloud.storage.constants import PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS
from google.cloud.storage.constants import REGION_LOCATION_TYPE
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
Expand Down Expand Up @@ -383,6 +384,12 @@ class IAMConfiguration(dict):
:type bucket: :class:`Bucket`
:params bucket: Bucket for which this instance is the policy.
:type public_access_prevention: str
:params public_access_prevention:
(Optional) Whether the public access prevention policy is 'unspecified' (default) or 'enforced'
See: https://cloud.google.com/storage/docs/public-access-prevention
See: https://cloud.google.com/storage/docs/public-access-prevention
:type uniform_bucket_level_access_enabled: bool
:params bucket_policy_only_enabled:
(Optional) Whether the IAM-only policy is enabled for the bucket.
Expand All @@ -404,6 +411,7 @@ class IAMConfiguration(dict):
def __init__(
self,
bucket,
public_access_prevention=_default,
uniform_bucket_level_access_enabled=_default,
uniform_bucket_level_access_locked_time=_default,
bucket_policy_only_enabled=_default,
Expand All @@ -428,8 +436,14 @@ def __init__(
if uniform_bucket_level_access_enabled is _default:
uniform_bucket_level_access_enabled = False

if public_access_prevention is _default:
public_access_prevention = PUBLIC_ACCESS_PREVENTION_UNSPECIFIED

data = {
"uniformBucketLevelAccess": {"enabled": uniform_bucket_level_access_enabled}
"uniformBucketLevelAccess": {
"enabled": uniform_bucket_level_access_enabled
},
"publicAccessPrevention": public_access_prevention,
}
if uniform_bucket_level_access_locked_time is not _default:
data["uniformBucketLevelAccess"]["lockedTime"] = _datetime_to_rfc3339(
Expand Down Expand Up @@ -464,6 +478,21 @@ def bucket(self):
"""
return self._bucket

@property
def public_access_prevention(self):
"""Setting for public access prevention policy. Options are 'unspecified' (default) or 'enforced'.
More information can be found at https://cloud.google.com/storage/docs/public-access-prevention
:rtype: string
:returns: the public access prevention status, either 'enforced' or 'unspecified'.
"""
return self["publicAccessPrevention"]

@public_access_prevention.setter
def public_access_prevention(self, value):
self["publicAccessPrevention"] = value
self.bucket._patch_property("iamConfiguration", self)

@property
def uniform_bucket_level_access_enabled(self):
"""If set, access checks only use bucket-level IAM policies or above.
Expand Down
13 changes: 13 additions & 0 deletions google/cloud/storage/constants.py
Expand Up @@ -96,3 +96,16 @@
_DEFAULT_TIMEOUT = 60 # in seconds
"""The default request timeout in seconds if a timeout is not explicitly given.
"""

# Public Access Prevention
PUBLIC_ACCESS_PREVENTION_ENFORCED = "enforced"
"""Enforced public access prevention value.
See: https://cloud.google.com/storage/docs/public-access-prevention
"""

PUBLIC_ACCESS_PREVENTION_UNSPECIFIED = "unspecified"
"""Unspecified public access prevention value.
See: https://cloud.google.com/storage/docs/public-access-prevention
"""
79 changes: 79 additions & 0 deletions tests/system/test_bucket.py
Expand Up @@ -766,3 +766,82 @@ def test_ubla_set_unset_preserves_acls(

assert bucket_acl_before == bucket_acl_after
assert blob_acl_before == blob_acl_after


def test_new_bucket_created_w_unspecified_pap(
storage_client, buckets_to_delete, blobs_to_delete,
):
from google.cloud.storage import constants

bucket_name = _helpers.unique_name("new-w-pap-unspecified")
bucket = storage_client.bucket(bucket_name)
bucket.iam_configuration.uniform_bucket_level_access_enabled = True
bucket.create()
buckets_to_delete.append(bucket)

assert (
bucket.iam_configuration.public_access_prevention
== constants.PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
)

bucket.iam_configuration.public_access_prevention = (
constants.PUBLIC_ACCESS_PREVENTION_ENFORCED
)
bucket.patch()
assert (
bucket.iam_configuration.public_access_prevention
== constants.PUBLIC_ACCESS_PREVENTION_ENFORCED
)
assert bucket.iam_configuration.uniform_bucket_level_access_enabled

bucket.iam_configuration.uniform_bucket_level_access_enabled = False
bucket.patch()
assert (
bucket.iam_configuration.public_access_prevention
== constants.PUBLIC_ACCESS_PREVENTION_ENFORCED
)

with pytest.raises(exceptions.BadRequest):
bucket.iam_configuration.public_access_prevention = "unexpected value"
bucket.patch()

with pytest.raises(exceptions.PreconditionFailed):
bucket.make_public()

blob_name = "my-blob.txt"
blob = bucket.blob(blob_name)
payload = b"DEADBEEF"
blob.upload_from_string(payload)

with pytest.raises(exceptions.PreconditionFailed):
blob.make_public()


def test_new_bucket_created_w_enforced_pap(
storage_client, buckets_to_delete, blobs_to_delete,
):
from google.cloud.storage import constants

bucket_name = _helpers.unique_name("new-w-pap-enforced")
bucket = storage_client.bucket(bucket_name)
bucket.iam_configuration.public_access_prevention = (
constants.PUBLIC_ACCESS_PREVENTION_ENFORCED
)
bucket.create()
buckets_to_delete.append(bucket)

assert (
bucket.iam_configuration.public_access_prevention
== constants.PUBLIC_ACCESS_PREVENTION_ENFORCED
)

bucket.iam_configuration.public_access_prevention = (
constants.PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
)
bucket.patch()

assert (
bucket.iam_configuration.public_access_prevention
== constants.PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
)
assert not bucket.iam_configuration.uniform_bucket_level_access_enabled
23 changes: 23 additions & 0 deletions tests/unit/test_bucket.py
Expand Up @@ -22,6 +22,8 @@
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
from google.cloud.storage.constants import PUBLIC_ACCESS_PREVENTION_ENFORCED
from google.cloud.storage.constants import PUBLIC_ACCESS_PREVENTION_UNSPECIFIED


def _create_signing_credentials():
Expand Down Expand Up @@ -356,6 +358,9 @@ def test_ctor_defaults(self):
self.assertIs(config.bucket, bucket)
self.assertFalse(config.uniform_bucket_level_access_enabled)
self.assertIsNone(config.uniform_bucket_level_access_locked_time)
self.assertEqual(
config.public_access_prevention, PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
)
self.assertFalse(config.bucket_policy_only_enabled)
self.assertIsNone(config.bucket_policy_only_locked_time)

Expand All @@ -378,6 +383,24 @@ def test_ctor_explicit_ubla(self):
self.assertTrue(config.bucket_policy_only_enabled)
self.assertEqual(config.bucket_policy_only_locked_time, now)

def test_ctor_explicit_pap(self):
bucket = self._make_bucket()

config = self._make_one(
bucket, public_access_prevention=PUBLIC_ACCESS_PREVENTION_ENFORCED,
)

self.assertIs(config.bucket, bucket)
self.assertFalse(config.uniform_bucket_level_access_enabled)
self.assertEqual(
config.public_access_prevention, PUBLIC_ACCESS_PREVENTION_ENFORCED
)

config.public_access_prevention = PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
self.assertEqual(
config.public_access_prevention, PUBLIC_ACCESS_PREVENTION_UNSPECIFIED
)

def test_ctor_explicit_bpo(self):
import datetime
import pytz
Expand Down

0 comments on commit e3e57a9

Please sign in to comment.