Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make v4 signing formatting consistent w/ spec #56

Merged
merged 8 commits into from Feb 13, 2020
22 changes: 14 additions & 8 deletions google/cloud/storage/_signing.py
Expand Up @@ -18,7 +18,6 @@
import collections
import datetime
import hashlib
import re
import json

import six
Expand All @@ -31,8 +30,6 @@


NOW = datetime.datetime.utcnow # To be replaced by tests.
MULTIPLE_SPACES_RE = r"\s+"
MULTIPLE_SPACES = re.compile(MULTIPLE_SPACES_RE)

SERVICE_ACCOUNT_URL = (
"https://googleapis.dev/python/google-api-core/latest/"
Expand Down Expand Up @@ -192,7 +189,7 @@ def get_canonical_headers(headers):
normalized = collections.defaultdict(list)
for key, val in headers:
key = key.lower().strip()
val = MULTIPLE_SPACES.sub(" ", val.strip())
val = " ".join(val.split())
normalized[key].append(val)

ordered_headers = sorted((key, ",".join(val)) for key, val in normalized.items())
Expand All @@ -206,8 +203,8 @@ def get_canonical_headers(headers):
)


def canonicalize(method, resource, query_parameters, headers):
"""Canonicalize method, resource
def canonicalize_v2(method, resource, query_parameters, headers):
"""Canonicalize method, resource per the V2 spec.

:type method: str
:param method: The HTTP verb that will be used when requesting the URL.
Expand Down Expand Up @@ -301,6 +298,7 @@ def generate_signed_url_v2(
:type resource: str
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).
Caller should have already URL-encoded the value.

:type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
:param expiration: Point in time when the signed URL should expire.
Expand Down Expand Up @@ -368,7 +366,7 @@ def generate_signed_url_v2(
"""
expiration_stamp = get_expiration_seconds_v2(expiration)

canonical = canonicalize(method, resource, query_parameters, headers)
canonical = canonicalize_v2(method, resource, query_parameters, headers)

# Generate the string to sign.
elements_to_sign = [
Expand Down Expand Up @@ -462,6 +460,7 @@ def generate_signed_url_v4(
:type resource: str
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).
Caller should have already URL-encoded the value.

:type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
:param expiration: Point in time when the signed URL should expire.
Expand Down Expand Up @@ -589,13 +588,20 @@ def generate_signed_url_v4(
ordered_query_parameters = sorted(query_parameters.items())
canonical_query_string = six.moves.urllib.parse.urlencode(ordered_query_parameters)

lowercased_headers = dict(ordered_headers)

if "x-goog-content-sha256" in lowercased_headers:
payload = lowercased_headers["x-goog-content-sha256"]
else:
payload = "UNSIGNED-PAYLOAD"

canonical_elements = [
method,
resource,
canonical_query_string,
canonical_header_string,
signed_headers,
"UNSIGNED-PAYLOAD",
payload,
]
canonical_request = "\n".join(canonical_elements)

Expand Down
9 changes: 6 additions & 3 deletions tests/unit/test__signing.py
Expand Up @@ -309,12 +309,12 @@ def test_w_embedded_ws(self):
self.assertEqual(ordered, expected_ordered)


class Test_canonicalize(unittest.TestCase):
class Test_canonicalize_v2(unittest.TestCase):
@staticmethod
def _call_fut(*args, **kwargs):
from google.cloud.storage._signing import canonicalize
from google.cloud.storage._signing import canonicalize_v2

return canonicalize(*args, **kwargs)
return canonicalize_v2(*args, **kwargs)

def test_wo_headers_or_query_parameters(self):
method = "GET"
Expand Down Expand Up @@ -650,6 +650,9 @@ def test_w_custom_host_header(self):
def test_w_custom_headers(self):
self._generate_helper(headers={"x-goog-foo": "bar"})

def test_w_custom_payload_hash_goog(self):
self._generate_helper(headers={"x-goog-content-sha256": "DEADBEEF"})

def test_w_custom_query_parameters_w_string_value(self):
self._generate_helper(query_parameters={"bar": "/"})

Expand Down