Skip to content

Commit

Permalink
feat: add configurable checksum support for uploads (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsg committed Jul 23, 2020
1 parent 343baa7 commit 68264f8
Show file tree
Hide file tree
Showing 11 changed files with 1,007 additions and 444 deletions.
197 changes: 196 additions & 1 deletion google/resumable_media/_helpers.py
Expand Up @@ -14,9 +14,12 @@

"""Shared utilities used by both downloads and uploads."""


import base64
import hashlib
import logging
import random
import time
import warnings

from six.moves import http_client

Expand All @@ -33,6 +36,18 @@
http_client.GATEWAY_TIMEOUT,
)

_SLOW_CRC32C_WARNING = (
"Currently using crcmod in pure python form. This is a slow "
"implementation. Python 3 has a faster implementation, `google-crc32c`, "
"which will be used if it is installed."
)
_HASH_HEADER = u"x-goog-hash"
_MISSING_CHECKSUM = u"""\
No {checksum_type} checksum was returned from the service while downloading {}
(which happens for composite objects), so client-side content integrity
checking is not being performed."""
_LOGGER = logging.getLogger(__name__)


def do_nothing():
"""Simple default callback."""
Expand Down Expand Up @@ -164,3 +179,183 @@ def wait_and_retry(func, get_status_code, retry_strategy):
return response

return response


def _get_crc32c_object():
""" Get crc32c object
Attempt to use the Google-CRC32c package. If it isn't available, try
to use CRCMod. CRCMod might be using a 'slow' varietal. If so, warn...
"""
try:
import crc32c

crc_obj = crc32c.Checksum()
except ImportError:
try:
import crcmod

crc_obj = crcmod.predefined.Crc("crc-32c")
_is_fast_crcmod()

except ImportError:
raise ImportError("Failed to import either `google-crc32c` or `crcmod`")

return crc_obj


def _is_fast_crcmod():
# Determine if this is using the slow form of crcmod.
nested_crcmod = __import__(
"crcmod.crcmod", globals(), locals(), ["_usingExtension"], 0,
)
fast_crc = getattr(nested_crcmod, "_usingExtension", False)
if not fast_crc:
warnings.warn(_SLOW_CRC32C_WARNING, RuntimeWarning, stacklevel=2)
return fast_crc


def _get_metadata_key(checksum_type):
if checksum_type == "md5":
return "md5Hash"
else:
return checksum_type


def prepare_checksum_digest(digest_bytestring):
"""Convert a checksum object into a digest encoded for an HTTP header.
Args:
bytes: A checksum digest bytestring.
Returns:
str: A base64 string representation of the input.
"""
encoded_digest = base64.b64encode(digest_bytestring)
# NOTE: ``b64encode`` returns ``bytes``, but HTTP headers expect ``str``.
return encoded_digest.decode(u"utf-8")


def _get_expected_checksum(response, get_headers, media_url, checksum_type):
"""Get the expected checksum and checksum object for the download response.
Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.
media_url (str): The URL containing the media to be downloaded.
checksum_type Optional(str): The checksum type to read from the headers,
exactly as it will appear in the headers (case-sensitive). Must be
"md5", "crc32c" or None.
Returns:
Tuple (Optional[str], object): The expected checksum of the response,
if it can be detected from the ``X-Goog-Hash`` header, and the
appropriate checksum object for the expected checksum.
"""
if checksum_type not in ["md5", "crc32c", None]:
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")
elif checksum_type in ["md5", "crc32c"]:
headers = get_headers(response)
expected_checksum = _parse_checksum_header(
headers.get(_HASH_HEADER), response, checksum_label=checksum_type
)

if expected_checksum is None:
msg = _MISSING_CHECKSUM.format(
media_url, checksum_type=checksum_type.upper()
)
_LOGGER.info(msg)
checksum_object = _DoNothingHash()
else:
if checksum_type == "md5":
checksum_object = hashlib.md5()
else:
checksum_object = _get_crc32c_object()
else:
expected_checksum = None
checksum_object = _DoNothingHash()

return (expected_checksum, checksum_object)


def _parse_checksum_header(header_value, response, checksum_label):
"""Parses the checksum header from an ``X-Goog-Hash`` value.
.. _header reference: https://cloud.google.com/storage/docs/\
xml-api/reference-headers#xgooghash
Expects ``header_value`` (if not :data:`None`) to be in one of the three
following formats:
* ``crc32c=n03x6A==``
* ``md5=Ojk9c3dhfxgoKVVHYwFbHQ==``
* ``crc32c=n03x6A==,md5=Ojk9c3dhfxgoKVVHYwFbHQ==``
See the `header reference`_ for more information.
Args:
header_value (Optional[str]): The ``X-Goog-Hash`` header from
a download response.
response (~requests.Response): The HTTP response object.
checksum_label (str): The label of the header value to read, as in the
examples above. Typically "md5" or "crc32c"
Returns:
Optional[str]: The expected checksum of the response, if it
can be detected from the ``X-Goog-Hash`` header; otherwise, None.
Raises:
~google.resumable_media.common.InvalidResponse: If there are
multiple checksums of the requested type in ``header_value``.
"""
if header_value is None:
return None

matches = []
for checksum in header_value.split(u","):
name, value = checksum.split(u"=", 1)
if name == checksum_label:
matches.append(value)

if len(matches) == 0:
return None
elif len(matches) == 1:
return matches[0]
else:
raise common.InvalidResponse(
response,
u"X-Goog-Hash header had multiple ``{}`` values.".format(checksum_label),
header_value,
matches,
)


def _get_checksum_object(checksum_type):
"""Respond with a checksum object for a supported type, if not None.
Raises ValueError if checksum_type is unsupported.
"""
if checksum_type == "md5":
return hashlib.md5()
elif checksum_type == "crc32c":
return _get_crc32c_object()
elif checksum_type is None:
return None
else:
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")


class _DoNothingHash(object):
"""Do-nothing hash object.
Intended as a stand-in for ``hashlib.md5`` or a crc32c checksum
implementation in cases where it isn't necessary to compute the hash.
"""

def update(self, unused_chunk):
"""Do-nothing ``update`` method.
Intended to match the interface of ``hashlib.md5`` and other checksums.
Args:
unused_chunk (bytes): A chunk of data.
"""
96 changes: 95 additions & 1 deletion google/resumable_media/_upload.py
Expand Up @@ -58,6 +58,13 @@
)
_POST = u"POST"
_PUT = u"PUT"
_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = (
"The computed ``{}`` checksum, ``{}``, and the checksum reported by the "
"remote host, ``{}``, did not match."
)
_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
"Response metadata had no ``{}`` value; checksum could not be validated."
)


class UploadBase(object):
Expand Down Expand Up @@ -231,11 +238,20 @@ class MultipartUpload(UploadBase):
upload_url (str): The URL where the content will be uploaded.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the request, e.g. headers for encrypted data.
checksum Optional([str]): The type of checksum to compute to verify
the integrity of the object. The request metadata will be amended
to include the computed value. Using this option will override a
manually-set checksum value. Supported values are "md5", "crc32c"
and None. The default is None.
Attributes:
upload_url (str): The URL where the content will be uploaded.
"""

def __init__(self, upload_url, headers=None, checksum=None):
super(MultipartUpload, self).__init__(upload_url, headers=headers)
self._checksum_type = checksum

def _prepare_request(self, data, metadata, content_type):
"""Prepare the contents of an HTTP request.
Expand Down Expand Up @@ -274,11 +290,20 @@ def _prepare_request(self, data, metadata, content_type):

if not isinstance(data, six.binary_type):
raise TypeError(u"`data` must be bytes, received", type(data))

checksum_object = _helpers._get_checksum_object(self._checksum_type)
if checksum_object:
checksum_object.update(data)
actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
metadata_key = _helpers._get_metadata_key(self._checksum_type)
metadata[metadata_key] = actual_checksum

content, multipart_boundary = construct_multipart_request(
data, metadata, content_type
)
multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"'
self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type

return _POST, self.upload_url, content, self._headers

def transmit(self, transport, data, metadata, content_type, timeout=None):
Expand Down Expand Up @@ -321,6 +346,13 @@ class ResumableUpload(UploadBase):
be sent with the :meth:`initiate` request, e.g. headers for
encrypted data. These **will not** be sent with
:meth:`transmit_next_chunk` or :meth:`recover` requests.
checksum Optional([str]): The type of checksum to compute to verify
the integrity of the object. After the upload is complete, the
server-computed checksum of the resulting object will be read
and google.resumable_media.common.DataCorruption will be raised on
a mismatch. The corrupted file will not be deleted from the remote
host automatically. Supported values are "md5", "crc32c" and None.
The default is None.
Attributes:
upload_url (str): The URL where the content will be uploaded.
Expand All @@ -330,7 +362,7 @@ class ResumableUpload(UploadBase):
:data:`.UPLOAD_CHUNK_SIZE`.
"""

def __init__(self, upload_url, chunk_size, headers=None):
def __init__(self, upload_url, chunk_size, checksum=None, headers=None):
super(ResumableUpload, self).__init__(upload_url, headers=headers)
if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0:
raise ValueError(
Expand All @@ -342,6 +374,9 @@ def __init__(self, upload_url, chunk_size, headers=None):
self._stream = None
self._content_type = None
self._bytes_uploaded = 0
self._bytes_checksummed = 0
self._checksum_type = checksum
self._checksum_object = None
self._total_bytes = None
self._resumable_url = None
self._invalid = False
Expand Down Expand Up @@ -576,12 +611,36 @@ def _prepare_request(self):
msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded)
raise ValueError(msg)

self._update_checksum(start_byte, payload)

headers = {
_CONTENT_TYPE_HEADER: self._content_type,
_helpers.CONTENT_RANGE_HEADER: content_range,
}
return _PUT, self.resumable_url, payload, headers

def _update_checksum(self, start_byte, payload):
"""Update the checksum with the payload if not already updated.
Because error recovery can result in bytes being transmitted more than
once, the checksum tracks the number of bytes checked in
self._bytes_checksummed and skips bytes that have already been summed.
"""
if not self._checksum_type:
return

if not self._checksum_object:
self._checksum_object = _helpers._get_checksum_object(self._checksum_type)

if start_byte < self._bytes_checksummed:
offset = self._bytes_checksummed - start_byte
data = payload[offset:]
else:
data = payload

self._checksum_object.update(data)
self._bytes_checksummed += len(data)

def _make_invalid(self):
"""Simple setter for ``invalid``.
Expand Down Expand Up @@ -630,6 +689,8 @@ def _process_response(self, response, bytes_sent):
self._bytes_uploaded = self._bytes_uploaded + bytes_sent
# Tombstone the current upload so it cannot be used again.
self._finished = True
# Validate the checksum. This can raise an exception on failure.
self._validate_checksum(response)
else:
bytes_range = _helpers.header_required(
response,
Expand All @@ -648,6 +709,39 @@ def _process_response(self, response, bytes_sent):
)
self._bytes_uploaded = int(match.group(u"end_byte")) + 1

def _validate_checksum(self, response):
"""Check the computed checksum, if any, against the response headers.
Args:
response (object): The HTTP response object.
Raises:
~google.resumable_media.common.DataCorruption: If the checksum
computed locally and the checksum reported by the remote host do
not match.
"""
if self._checksum_type is None:
return
metadata_key = _helpers._get_metadata_key(self._checksum_type)
metadata = response.json()
remote_checksum = metadata.get(metadata_key)
if remote_checksum is None:
raise common.InvalidResponse(
response,
_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
self._get_headers(response),
)
local_checksum = _helpers.prepare_checksum_digest(
self._checksum_object.digest()
)
if local_checksum != remote_checksum:
raise common.DataCorruption(
response,
_UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
self._checksum_type.upper(), local_checksum, remote_checksum
),
)

def transmit_next_chunk(self, transport, timeout=None):
"""Transmit the next chunk of the resource to be uploaded.
Expand Down

0 comments on commit 68264f8

Please sign in to comment.