diff --git a/google/cloud/storage/_signing.py b/google/cloud/storage/_signing.py index f5e74e260..38610b6ff 100644 --- a/google/cloud/storage/_signing.py +++ b/google/cloud/storage/_signing.py @@ -531,9 +531,7 @@ def generate_signed_url_v4( expiration_seconds = get_expiration_seconds_v4(expiration) if _request_timestamp is None: - now = NOW() - request_timestamp = now.strftime("%Y%m%dT%H%M%SZ") - datestamp = now.date().strftime("%Y%m%d") + request_timestamp, datestamp = get_v4_now_dtstamps() else: request_timestamp = _request_timestamp datestamp = _request_timestamp[:8] @@ -629,6 +627,18 @@ def generate_signed_url_v4( ) +def get_v4_now_dtstamps(): + """Get current timestamp and datestamp in V4 valid format. + + :rtype: str, str + :returns: Current timestamp, datestamp. + """ + now = NOW() + timestamp = now.strftime("%Y%m%dT%H%M%SZ") + datestamp = now.date().strftime("%Y%m%d") + return timestamp, datestamp + + def _sign_message(message, access_token, service_account_email): """Signs a message. diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index ad5b39f21..6d25f9e43 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -14,18 +14,28 @@ """Client for interacting with the Google Cloud Storage API.""" -import warnings +import base64 +import binascii +import datetime import functools +import json +import warnings import google.api_core.client_options from google.auth.credentials import AnonymousCredentials from google.api_core import page_iterator -from google.cloud._helpers import _LocalStack +from google.cloud._helpers import _LocalStack, _NOW from google.cloud.client import ClientWithProject from google.cloud.exceptions import NotFound from google.cloud.storage._helpers import _get_storage_host from google.cloud.storage._http import Connection +from google.cloud.storage._signing import ( + get_expiration_seconds_v4, + get_v4_now_dtstamps, + ensure_signed_credentials, + _sign_message, +) from google.cloud.storage.batch import Batch from google.cloud.storage.bucket import Bucket from google.cloud.storage.blob import Blob @@ -836,6 +846,174 @@ def get_hmac_key_metadata( metadata.reload(timeout=timeout) # raises NotFound for missing key return metadata + def generate_signed_post_policy_v4( + self, + bucket_name, + blob_name, + expiration, + conditions=None, + fields=None, + credentials=None, + virtual_hosted_style=False, + bucket_bound_hostname=None, + scheme=None, + service_account_email=None, + access_token=None, + ): + """Generate a V4 signed policy object. + + .. note:: + + Assumes ``credentials`` implements the + :class:`google.auth.credentials.Signing` interface. Also assumes + ``credentials`` has a ``service_account_email`` property which + identifies the credentials. + + Generated policy object allows user to upload objects with a POST request. + + :type bucket_name: str + :param bucket_name: Bucket name. + + :type blob_name: str + :param blob_name: Object name. + + :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] + :param expiration: Policy expiration time. + + :type conditions: list + :param conditions: (Optional) List of POST policy conditions, which are + used to restrict what is allowed in the request. + + :type fields: dict + :param fields: (Optional) Additional elements to include into request. + + :type credentials: :class:`google.auth.credentials.Signing` + :param credentials: (Optional) Credentials object with an associated private + key to sign text. + + :type virtual_hosted_style: bool + :param virtual_hosted_style: (Optional) If True, construct the URL relative to the bucket + virtual hostname, e.g., '.storage.googleapis.com'. + + :type bucket_bound_hostname: str + :param bucket_bound_hostname: + (Optional) If passed, construct the URL relative to the bucket-bound hostname. + Value can be bare or with a scheme, e.g., 'example.com' or 'http://example.com'. + See: https://cloud.google.com/storage/docs/request-endpoints#cname + + :type scheme: str + :param scheme: + (Optional) If ``bucket_bound_hostname`` is passed as a bare hostname, use + this value as a scheme. ``https`` will work only when using a CDN. + Defaults to ``"http"``. + + :type service_account_email: str + :param service_account_email: (Optional) E-mail address of the service account. + + :type access_token: str + :param access_token: (Optional) Access token for a service account. + + :rtype: dict + :returns: Signed POST policy. + + Example: + Generate signed POST policy and upload a file. + + >>> from google.cloud import storage + >>> client = storage.Client() + >>> policy = client.generate_signed_post_policy_v4( + "bucket-name", + "blob-name", + expiration=datetime.datetime(2020, 3, 17), + conditions=[ + ["content-length-range", 0, 255] + ], + fields=[ + "x-goog-meta-hello" => "world" + ], + ) + >>> with open("bucket-name", "rb") as f: + files = {"file": ("bucket-name", f)} + requests.post(policy["url"], data=policy["fields"], files=files) + """ + credentials = self._credentials if credentials is None else credentials + ensure_signed_credentials(credentials) + + # prepare policy conditions and fields + timestamp, datestamp = get_v4_now_dtstamps() + + x_goog_credential = "{email}/{datestamp}/auto/storage/goog4_request".format( + email=credentials.signer_email, datestamp=datestamp + ) + required_conditions = [ + {"key": blob_name}, + {"x-goog-date": timestamp}, + {"x-goog-credential": x_goog_credential}, + {"x-goog-algorithm": "GOOG4-RSA-SHA256"}, + ] + + conditions = conditions or [] + policy_fields = {} + for key, value in sorted((fields or {}).items()): + if not key.startswith("x-ignore-"): + policy_fields[key] = value + conditions.append({key: value}) + + conditions += required_conditions + + # calculate policy expiration time + now = _NOW() + if expiration is None: + expiration = now + datetime.timedelta(hours=1) + + policy_expires = now + datetime.timedelta( + seconds=get_expiration_seconds_v4(expiration) + ) + + # encode policy for signing + policy = json.dumps( + {"conditions": conditions, "expiration": policy_expires.isoformat() + "Z"}, + separators=(",", ":"), + ) + str_to_sign = base64.b64encode(policy.encode("utf-8")) + + # sign the policy and get its cryptographic signature + if access_token and service_account_email: + signature = _sign_message(str_to_sign, access_token, service_account_email) + signature_bytes = base64.b64decode(signature) + else: + signature_bytes = credentials.sign_bytes(str_to_sign) + + # get hexadecimal representation of the signature + signature = binascii.hexlify(signature_bytes).decode("utf-8") + + policy_fields.update( + { + "key": blob_name, + "x-goog-algorithm": "GOOG4-RSA-SHA256", + "x-goog-credential": x_goog_credential, + "x-goog-date": timestamp, + "x-goog-signature": signature, + "policy": str_to_sign, + } + ) + # designate URL + if virtual_hosted_style: + url = "https://{}.storage.googleapis.com/".format(bucket_name) + + elif bucket_bound_hostname: + if ":" in bucket_bound_hostname: # URL includes scheme + url = bucket_bound_hostname + + else: # scheme is given separately + url = "{scheme}://{host}/".format( + scheme=scheme, host=bucket_bound_hostname + ) + else: + url = "https://storage.googleapis.com/{}/".format(bucket_name) + + return {"url": url, "fields": policy_fields} + def _item_to_bucket(iterator, item): """Convert a JSON bucket to the native object. diff --git a/tests/system/test_system.py b/tests/system/test_system.py index b33d3590a..2cb3dc0ab 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -1955,3 +1955,67 @@ def test_ubla_set_unset_preserves_acls(self): self.assertEqual(bucket_acl_before, bucket_acl_after) self.assertEqual(blob_acl_before, blob_acl_after) + + +class TestV4POSTPolicies(unittest.TestCase): + def setUp(self): + self.case_buckets_to_delete = [] + + def tearDown(self): + for bucket_name in self.case_buckets_to_delete: + bucket = Config.CLIENT.bucket(bucket_name) + retry_429_harder(bucket.delete)(force=True) + + def test_get_signed_policy_v4(self): + bucket_name = "post_policy" + unique_resource_id("-") + self.assertRaises(exceptions.NotFound, Config.CLIENT.get_bucket, bucket_name) + retry_429_503(Config.CLIENT.create_bucket)(bucket_name) + self.case_buckets_to_delete.append(bucket_name) + + blob_name = "post_policy_obj.txt" + with open(blob_name, "w") as f: + f.write("DEADBEEF") + + policy = Config.CLIENT.generate_signed_post_policy_v4( + bucket_name, + blob_name, + conditions=[ + {"bucket": bucket_name}, + ["starts-with", "$Content-Type", "text/pla"], + ], + expiration=datetime.datetime.now() + datetime.timedelta(hours=1), + fields={"content-type": "text/plain"}, + ) + with open(blob_name, "r") as f: + files = {"file": (blob_name, f)} + response = requests.post(policy["url"], data=policy["fields"], files=files) + + os.remove(blob_name) + self.assertEqual(response.status_code, 204) + + def test_get_signed_policy_v4_invalid_field(self): + bucket_name = "post_policy" + unique_resource_id("-") + self.assertRaises(exceptions.NotFound, Config.CLIENT.get_bucket, bucket_name) + retry_429_503(Config.CLIENT.create_bucket)(bucket_name) + self.case_buckets_to_delete.append(bucket_name) + + blob_name = "post_policy_obj.txt" + with open(blob_name, "w") as f: + f.write("DEADBEEF") + + policy = Config.CLIENT.generate_signed_post_policy_v4( + bucket_name, + blob_name, + conditions=[ + {"bucket": bucket_name}, + ["starts-with", "$Content-Type", "text/pla"], + ], + expiration=datetime.datetime.now() + datetime.timedelta(hours=1), + fields={"x-goog-random": "invalid_field", "content-type": "text/plain"}, + ) + with open(blob_name, "r") as f: + files = {"file": (blob_name, f)} + response = requests.post(policy["url"], data=policy["fields"], files=files) + + os.remove(blob_name) + self.assertEqual(response.status_code, 400) diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py index df379f1e9..a864e9eae 100644 --- a/tests/unit/__init__.py +++ b/tests/unit/__init__.py @@ -11,3 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import io +import json +import os + + +def _read_local_json(json_file): + here = os.path.dirname(__file__) + json_path = os.path.abspath(os.path.join(here, json_file)) + with io.open(json_path, "r", encoding="utf-8-sig") as fileobj: + return json.load(fileobj) diff --git a/tests/unit/test__signing.py b/tests/unit/test__signing.py index 86d702cc7..d1d2224e3 100644 --- a/tests/unit/test__signing.py +++ b/tests/unit/test__signing.py @@ -18,9 +18,7 @@ import binascii import calendar import datetime -import io import json -import os import time import unittest @@ -29,12 +27,7 @@ import six from six.moves import urllib_parse - -def _read_local_json(json_file): - here = os.path.dirname(__file__) - json_path = os.path.abspath(os.path.join(here, json_file)) - with io.open(json_path, "r", encoding="utf-8-sig") as fileobj: - return json.load(fileobj) +from . import _read_local_json _SERVICE_ACCOUNT_JSON = _read_local_json("url_signer_v4_test_account.json") @@ -762,6 +755,22 @@ def test_bytes(self): self.assertEqual(encoded_param, "bytes") +class TestV4Stamps(unittest.TestCase): + def test_get_v4_now_dtstamps(self): + import datetime + from google.cloud.storage._signing import get_v4_now_dtstamps + + with mock.patch( + "google.cloud.storage._signing.NOW", + return_value=datetime.datetime(2020, 3, 12, 13, 14, 15), + ) as now_mock: + timestamp, datestamp = get_v4_now_dtstamps() + now_mock.assert_called_once() + + self.assertEqual(timestamp, "20200312T131415Z") + self.assertEqual(datestamp, "20200312") + + _DUMMY_SERVICE_ACCOUNT = None diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4ba98f82e..4d1149382 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -12,15 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import io import json -import unittest - import mock import pytest +import re import requests +import unittest from six.moves import http_client +from google.oauth2.service_account import Credentials +from . import _read_local_json + +_SERVICE_ACCOUNT_JSON = _read_local_json("url_signer_v4_test_account.json") +_CONFORMANCE_TESTS = _read_local_json("url_signer_v4_test_data.json") +_POST_POLICY_TESTS = [test for test in _CONFORMANCE_TESTS if "policyInput" in test] +_DUMMY_CREDENTIALS = Credentials.from_service_account_info(_SERVICE_ACCOUNT_JSON) + def _make_credentials(): import google.auth.credentials @@ -28,6 +37,20 @@ def _make_credentials(): return mock.Mock(spec=google.auth.credentials.Credentials) +def _create_signing_credentials(): + import google.auth.credentials + + class _SigningCredentials( + google.auth.credentials.Credentials, google.auth.credentials.Signing + ): + pass + + credentials = mock.Mock(spec=_SigningCredentials) + credentials.sign_bytes = mock.Mock(return_value=b"Signature_bytes") + credentials.signer_email = "test@mail.com" + return credentials + + def _make_connection(*responses): import google.cloud.storage._http from google.cloud.exceptions import NotFound @@ -1471,3 +1494,322 @@ def test_get_hmac_key_metadata_w_project(self): headers=mock.ANY, timeout=self._get_default_timeout(), ) + + def test_get_signed_policy_v4(self): + import datetime + + BUCKET_NAME = "bucket-name" + BLOB_NAME = "object-name" + EXPECTED_SIGN = "5369676e61747572655f6279746573" + EXPECTED_POLICY = b"eyJjb25kaXRpb25zIjpbeyJidWNrZXQiOiJidWNrZXQtbmFtZSJ9LHsiYWNsIjoicHJpdmF0ZSJ9LFsic3RhcnRzLXdpdGgiLCIkQ29udGVudC1UeXBlIiwidGV4dC9wbGFpbiJdLHsia2V5Ijoib2JqZWN0LW5hbWUifSx7IngtZ29vZy1kYXRlIjoiMjAyMDAzMTJUMTE0NzE2WiJ9LHsieC1nb29nLWNyZWRlbnRpYWwiOiJ0ZXN0QG1haWwuY29tLzIwMjAwMzEyL2F1dG8vc3RvcmFnZS9nb29nNF9yZXF1ZXN0In0seyJ4LWdvb2ctYWxnb3JpdGhtIjoiR09PRzQtUlNBLVNIQTI1NiJ9XSwiZXhwaXJhdGlvbiI6IjIwMjAtMDMtMjZUMDA6MDA6MTBaIn0=" + + client = self._make_one(project="PROJECT") + + dtstamps_patch, now_patch, expire_secs_patch = _time_functions_patches() + with dtstamps_patch, now_patch, expire_secs_patch: + policy = client.generate_signed_post_policy_v4( + BUCKET_NAME, + BLOB_NAME, + expiration=datetime.datetime(2020, 3, 12), + conditions=[ + {"bucket": BUCKET_NAME}, + {"acl": "private"}, + ["starts-with", "$Content-Type", "text/plain"], + ], + credentials=_create_signing_credentials(), + ) + self.assertEqual( + policy["url"], "https://storage.googleapis.com/" + BUCKET_NAME + "/" + ) + fields = policy["fields"] + + self.assertEqual(fields["key"], BLOB_NAME) + self.assertEqual(fields["x-goog-algorithm"], "GOOG4-RSA-SHA256") + self.assertEqual(fields["x-goog-date"], "20200312T114716Z") + self.assertEqual( + fields["x-goog-credential"], + "test@mail.com/20200312/auto/storage/goog4_request", + ) + self.assertEqual(fields["x-goog-signature"], EXPECTED_SIGN) + self.assertEqual(fields["policy"], EXPECTED_POLICY) + + def test_get_signed_policy_v4_without_credentials(self): + import datetime + + BUCKET_NAME = "bucket-name" + BLOB_NAME = "object-name" + EXPECTED_SIGN = "5369676e61747572655f6279746573" + EXPECTED_POLICY = b"eyJjb25kaXRpb25zIjpbeyJidWNrZXQiOiJidWNrZXQtbmFtZSJ9LHsiYWNsIjoicHJpdmF0ZSJ9LFsic3RhcnRzLXdpdGgiLCIkQ29udGVudC1UeXBlIiwidGV4dC9wbGFpbiJdLHsia2V5Ijoib2JqZWN0LW5hbWUifSx7IngtZ29vZy1kYXRlIjoiMjAyMDAzMTJUMTE0NzE2WiJ9LHsieC1nb29nLWNyZWRlbnRpYWwiOiJ0ZXN0QG1haWwuY29tLzIwMjAwMzEyL2F1dG8vc3RvcmFnZS9nb29nNF9yZXF1ZXN0In0seyJ4LWdvb2ctYWxnb3JpdGhtIjoiR09PRzQtUlNBLVNIQTI1NiJ9XSwiZXhwaXJhdGlvbiI6IjIwMjAtMDMtMjZUMDA6MDA6MTBaIn0=" + + client = self._make_one( + project="PROJECT", credentials=_create_signing_credentials() + ) + + dtstamps_patch, now_patch, expire_secs_patch = _time_functions_patches() + with dtstamps_patch, now_patch, expire_secs_patch: + policy = client.generate_signed_post_policy_v4( + BUCKET_NAME, + BLOB_NAME, + expiration=datetime.datetime(2020, 3, 12), + conditions=[ + {"bucket": BUCKET_NAME}, + {"acl": "private"}, + ["starts-with", "$Content-Type", "text/plain"], + ], + ) + self.assertEqual( + policy["url"], "https://storage.googleapis.com/" + BUCKET_NAME + "/" + ) + fields = policy["fields"] + + self.assertEqual(fields["key"], BLOB_NAME) + self.assertEqual(fields["x-goog-algorithm"], "GOOG4-RSA-SHA256") + self.assertEqual(fields["x-goog-date"], "20200312T114716Z") + self.assertEqual( + fields["x-goog-credential"], + "test@mail.com/20200312/auto/storage/goog4_request", + ) + self.assertEqual(fields["x-goog-signature"], EXPECTED_SIGN) + self.assertEqual(fields["policy"], EXPECTED_POLICY) + + def test_get_signed_policy_v4_with_fields(self): + import datetime + + BUCKET_NAME = "bucket-name" + BLOB_NAME = "object-name" + FIELD1_VALUE = "Value1" + EXPECTED_SIGN = "5369676e61747572655f6279746573" + EXPECTED_POLICY = b"eyJjb25kaXRpb25zIjpbeyJidWNrZXQiOiJidWNrZXQtbmFtZSJ9LHsiYWNsIjoicHJpdmF0ZSJ9LFsic3RhcnRzLXdpdGgiLCIkQ29udGVudC1UeXBlIiwidGV4dC9wbGFpbiJdLHsiZmllbGQxIjoiVmFsdWUxIn0seyJrZXkiOiJvYmplY3QtbmFtZSJ9LHsieC1nb29nLWRhdGUiOiIyMDIwMDMxMlQxMTQ3MTZaIn0seyJ4LWdvb2ctY3JlZGVudGlhbCI6InRlc3RAbWFpbC5jb20vMjAyMDAzMTIvYXV0by9zdG9yYWdlL2dvb2c0X3JlcXVlc3QifSx7IngtZ29vZy1hbGdvcml0aG0iOiJHT09HNC1SU0EtU0hBMjU2In1dLCJleHBpcmF0aW9uIjoiMjAyMC0wMy0yNlQwMDowMDoxMFoifQ==" + + client = self._make_one(project="PROJECT") + + dtstamps_patch, now_patch, expire_secs_patch = _time_functions_patches() + with dtstamps_patch, now_patch, expire_secs_patch: + policy = client.generate_signed_post_policy_v4( + BUCKET_NAME, + BLOB_NAME, + expiration=datetime.datetime(2020, 3, 12), + conditions=[ + {"bucket": BUCKET_NAME}, + {"acl": "private"}, + ["starts-with", "$Content-Type", "text/plain"], + ], + fields={"field1": FIELD1_VALUE, "x-ignore-field": "Ignored_value"}, + credentials=_create_signing_credentials(), + ) + self.assertEqual( + policy["url"], "https://storage.googleapis.com/" + BUCKET_NAME + "/" + ) + fields = policy["fields"] + + self.assertEqual(fields["key"], BLOB_NAME) + self.assertEqual(fields["x-goog-algorithm"], "GOOG4-RSA-SHA256") + self.assertEqual(fields["x-goog-date"], "20200312T114716Z") + self.assertEqual(fields["field1"], FIELD1_VALUE) + self.assertNotIn("x-ignore-field", fields.keys()) + self.assertEqual( + fields["x-goog-credential"], + "test@mail.com/20200312/auto/storage/goog4_request", + ) + self.assertEqual(fields["x-goog-signature"], EXPECTED_SIGN) + self.assertEqual(fields["policy"], EXPECTED_POLICY) + + def test_get_signed_policy_v4_virtual_hosted_style(self): + import datetime + + BUCKET_NAME = "bucket-name" + + client = self._make_one(project="PROJECT") + + dtstamps_patch, _, _ = _time_functions_patches() + with dtstamps_patch: + policy = client.generate_signed_post_policy_v4( + BUCKET_NAME, + "object-name", + expiration=datetime.datetime(2020, 3, 12), + virtual_hosted_style=True, + credentials=_create_signing_credentials(), + ) + self.assertEqual( + policy["url"], "https://{}.storage.googleapis.com/".format(BUCKET_NAME) + ) + + def test_get_signed_policy_v4_bucket_bound_hostname(self): + import datetime + + client = self._make_one(project="PROJECT") + + dtstamps_patch, _, _ = _time_functions_patches() + with dtstamps_patch: + policy = client.generate_signed_post_policy_v4( + "bucket-name", + "object-name", + expiration=datetime.datetime(2020, 3, 12), + bucket_bound_hostname="https://bucket.bound_hostname", + credentials=_create_signing_credentials(), + ) + self.assertEqual(policy["url"], "https://bucket.bound_hostname") + + def test_get_signed_policy_v4_bucket_bound_hostname_with_scheme(self): + import datetime + + client = self._make_one(project="PROJECT") + + dtstamps_patch, _, _ = _time_functions_patches() + with dtstamps_patch: + policy = client.generate_signed_post_policy_v4( + "bucket-name", + "object-name", + expiration=datetime.datetime(2020, 3, 12), + bucket_bound_hostname="bucket.bound_hostname", + scheme="http", + credentials=_create_signing_credentials(), + ) + self.assertEqual(policy["url"], "http://bucket.bound_hostname/") + + def test_get_signed_policy_v4_no_expiration(self): + BUCKET_NAME = "bucket-name" + EXPECTED_POLICY = b"eyJjb25kaXRpb25zIjpbeyJrZXkiOiJvYmplY3QtbmFtZSJ9LHsieC1nb29nLWRhdGUiOiIyMDIwMDMxMlQxMTQ3MTZaIn0seyJ4LWdvb2ctY3JlZGVudGlhbCI6InRlc3RAbWFpbC5jb20vMjAyMDAzMTIvYXV0by9zdG9yYWdlL2dvb2c0X3JlcXVlc3QifSx7IngtZ29vZy1hbGdvcml0aG0iOiJHT09HNC1SU0EtU0hBMjU2In1dLCJleHBpcmF0aW9uIjoiMjAyMC0wMy0yNlQwMDowMDoxMFoifQ==" + + client = self._make_one(project="PROJECT") + + dtstamps_patch, now_patch, expire_secs_patch = _time_functions_patches() + with dtstamps_patch, now_patch, expire_secs_patch: + policy = client.generate_signed_post_policy_v4( + BUCKET_NAME, + "object-name", + expiration=None, + credentials=_create_signing_credentials(), + ) + + self.assertEqual( + policy["url"], "https://storage.googleapis.com/" + BUCKET_NAME + "/" + ) + self.assertEqual(policy["fields"]["policy"], EXPECTED_POLICY) + + def test_get_signed_policy_v4_with_access_token(self): + import datetime + + BUCKET_NAME = "bucket-name" + BLOB_NAME = "object-name" + EXPECTED_SIGN = "0c4003044105" + EXPECTED_POLICY = b"eyJjb25kaXRpb25zIjpbeyJidWNrZXQiOiJidWNrZXQtbmFtZSJ9LHsiYWNsIjoicHJpdmF0ZSJ9LFsic3RhcnRzLXdpdGgiLCIkQ29udGVudC1UeXBlIiwidGV4dC9wbGFpbiJdLHsia2V5Ijoib2JqZWN0LW5hbWUifSx7IngtZ29vZy1kYXRlIjoiMjAyMDAzMTJUMTE0NzE2WiJ9LHsieC1nb29nLWNyZWRlbnRpYWwiOiJ0ZXN0QG1haWwuY29tLzIwMjAwMzEyL2F1dG8vc3RvcmFnZS9nb29nNF9yZXF1ZXN0In0seyJ4LWdvb2ctYWxnb3JpdGhtIjoiR09PRzQtUlNBLVNIQTI1NiJ9XSwiZXhwaXJhdGlvbiI6IjIwMjAtMDMtMjZUMDA6MDA6MTBaIn0=" + + client = self._make_one(project="PROJECT") + + dtstamps_patch, now_patch, expire_secs_patch = _time_functions_patches() + with dtstamps_patch, now_patch, expire_secs_patch: + with mock.patch( + "google.cloud.storage.client._sign_message", return_value=b"DEADBEEF" + ): + policy = client.generate_signed_post_policy_v4( + BUCKET_NAME, + BLOB_NAME, + expiration=datetime.datetime(2020, 3, 12), + conditions=[ + {"bucket": BUCKET_NAME}, + {"acl": "private"}, + ["starts-with", "$Content-Type", "text/plain"], + ], + credentials=_create_signing_credentials(), + service_account_email="test@mail.com", + access_token="token", + ) + self.assertEqual( + policy["url"], "https://storage.googleapis.com/" + BUCKET_NAME + "/" + ) + fields = policy["fields"] + + self.assertEqual(fields["key"], BLOB_NAME) + self.assertEqual(fields["x-goog-algorithm"], "GOOG4-RSA-SHA256") + self.assertEqual(fields["x-goog-date"], "20200312T114716Z") + self.assertEqual( + fields["x-goog-credential"], + "test@mail.com/20200312/auto/storage/goog4_request", + ) + self.assertEqual(fields["x-goog-signature"], EXPECTED_SIGN) + self.assertEqual(fields["policy"], EXPECTED_POLICY) + + +@pytest.mark.parametrize("test_data", _POST_POLICY_TESTS) +def test_conformance_post_policy(test_data): + import datetime + from google.cloud.storage.client import Client + + in_data = test_data["policyInput"] + timestamp = datetime.datetime.strptime(in_data["timestamp"], "%Y-%m-%dT%H:%M:%SZ") + + client = Client(credentials=_DUMMY_CREDENTIALS) + + # mocking time functions + with mock.patch("google.cloud.storage._signing.NOW", return_value=timestamp): + with mock.patch( + "google.cloud.storage.client.get_expiration_seconds_v4", + return_value=in_data["expiration"], + ): + with mock.patch("google.cloud.storage.client._NOW", return_value=timestamp): + + policy = client.generate_signed_post_policy_v4( + bucket_name=in_data["bucket"], + blob_name=in_data["object"], + conditions=_prepare_conditions(in_data), + fields=in_data.get("fields"), + credentials=_DUMMY_CREDENTIALS, + expiration=in_data["expiration"], + virtual_hosted_style=in_data.get("urlStyle") + == "VIRTUAL_HOSTED_STYLE", + bucket_bound_hostname=in_data.get("bucketBoundHostname"), + scheme=in_data.get("scheme"), + ) + fields = policy["fields"] + + for field in ( + "x-goog-algorithm", + "x-goog-credential", + "x-goog-date", + "x-goog-signature", + ): + assert fields[field] == test_data["policyOutput"]["fields"][field] + + out_data = test_data["policyOutput"] + decoded_policy = base64.b64decode(fields["policy"]).decode("unicode_escape") + assert decoded_policy == out_data["expectedDecodedPolicy"] + assert policy["url"] == out_data["url"] + + +def _prepare_conditions(in_data): + """Helper for V4 POST policy generation conformance tests. + + Convert conformance test data conditions dict into list. + + Args: + in_data (dict): conditions arg from conformance test data. + + Returns: + list: conditions arg to pass into generate_signed_post_policy_v4(). + """ + if "conditions" in in_data: + conditions = [] + for key, value in in_data["conditions"].items(): + # camel case to snake case with "-" separator + field = re.sub(r"(?