From 23b7d1c3155deae3c804c510dee3a7cec97cd46c Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 26 Aug 2020 12:20:49 -0700 Subject: [PATCH] feat: add configurable checksumming for blob uploads and downloads (#246) Co-authored-by: Tres Seaver Co-authored-by: Frank Natividad --- google/cloud/storage/blob.py | 185 ++++++++++++++++++++++++++++++++++- setup.py | 2 +- tests/system/test_system.py | 89 +++++++++++++++++ tests/unit/test_blob.py | 82 ++++++++++++++-- 4 files changed, 344 insertions(+), 14 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index b8f01f63f..2940b52aa 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -29,6 +29,7 @@ import copy import hashlib from io import BytesIO +import logging import mimetypes import os import re @@ -108,6 +109,11 @@ _READ_LESS_THAN_SIZE = ( "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining." ) +_CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE = ( + "A checksum of type `{}` was requested, but checksumming is not available " + "for downloads when chunk_size is set." +) + _DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MB _MAX_MULTIPART_SIZE = 8388608 # 8 MB @@ -823,6 +829,7 @@ def _do_download( end=None, raw_download=False, timeout=_DEFAULT_TIMEOUT, + checksum="md5", ): """Perform a download without any error handling. @@ -860,6 +867,17 @@ def _do_download( repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". """ if self.chunk_size is None: if raw_download: @@ -868,12 +886,21 @@ def _do_download( klass = Download download = klass( - download_url, stream=file_obj, headers=headers, start=start, end=end + download_url, + stream=file_obj, + headers=headers, + start=start, + end=end, + checksum=checksum, ) response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) else: + if checksum: + msg = _CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format(checksum) + logging.info(msg) + if raw_download: klass = RawChunkedDownload else: @@ -903,6 +930,7 @@ def download_to_file( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum="md5", ): """Download the contents of this blob into a file-like object. @@ -979,6 +1007,17 @@ def download_to_file( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1004,6 +1043,7 @@ def download_to_file( end, raw_download, timeout=timeout, + checksum=checksum, ) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) @@ -1020,6 +1060,7 @@ def download_to_filename( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum="md5", ): """Download the contents of this blob into a named file. @@ -1072,6 +1113,17 @@ def download_to_filename( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + :raises: :class:`google.cloud.exceptions.NotFound` """ try: @@ -1087,6 +1139,7 @@ def download_to_filename( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) except resumable_media.DataCorruption: # Delete the corrupt downloaded file. @@ -1112,6 +1165,7 @@ def download_as_bytes( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum="md5", ): """Download the contents of this blob as a bytes object. @@ -1161,6 +1215,17 @@ def download_as_bytes( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + :rtype: bytes :returns: The data stored in this blob. @@ -1178,6 +1243,7 @@ def download_as_bytes( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) return string_buffer.getvalue() @@ -1453,6 +1519,7 @@ def _do_multipart_upload( if_metageneration_match, if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Perform a multipart upload. @@ -1514,6 +1581,14 @@ def _do_multipart_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. The request metadata will be amended + to include the computed value. Using this option will override a + manually-set checksum value. Supported values are "md5", + "crc32c" and None. The default is None. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -1569,7 +1644,7 @@ def _do_multipart_upload( ) upload_url = _add_query_parameters(base_url, name_value_pairs) - upload = MultipartUpload(upload_url, headers=headers) + upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) if num_retries is not None: upload._retry_strategy = resumable_media.RetryStrategy( @@ -1597,6 +1672,7 @@ def _initiate_resumable_upload( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Initiate a resumable upload. @@ -1671,6 +1747,16 @@ def _initiate_resumable_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. After the upload is complete, the + server-computed checksum of the resulting object will be checked + and google.resumable_media.common.DataCorruption will be raised on + a mismatch. On a validation failure, the client will attempt to + delete the uploaded object automatically. Supported values + are "md5", "crc32c" and None. The default is None. + :rtype: tuple :returns: Pair of @@ -1727,7 +1813,9 @@ def _initiate_resumable_upload( ) upload_url = _add_query_parameters(base_url, name_value_pairs) - upload = ResumableUpload(upload_url, chunk_size, headers=headers) + upload = ResumableUpload( + upload_url, chunk_size, headers=headers, checksum=checksum + ) if num_retries is not None: upload._retry_strategy = resumable_media.RetryStrategy( @@ -1759,6 +1847,7 @@ def _do_resumable_upload( if_metageneration_match, if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Perform a resumable upload. @@ -1823,6 +1912,16 @@ def _do_resumable_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. After the upload is complete, the + server-computed checksum of the resulting object will be checked + and google.resumable_media.common.DataCorruption will be raised on + a mismatch. On a validation failure, the client will attempt to + delete the uploaded object automatically. Supported values + are "md5", "crc32c" and None. The default is None. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. @@ -1839,10 +1938,16 @@ def _do_resumable_upload( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) while not upload.finished: - response = upload.transmit_next_chunk(transport, timeout=timeout) + try: + response = upload.transmit_next_chunk(transport, timeout=timeout) + except resumable_media.DataCorruption: + # Attempt to delete the corrupted object. + self.delete() + raise return response @@ -1859,6 +1964,7 @@ def _do_upload( if_metageneration_match, if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Determine an upload strategy and then perform the upload. @@ -1924,6 +2030,19 @@ def _do_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the @@ -1942,6 +2061,7 @@ def _do_upload( if_metageneration_match, if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) else: response = self._do_resumable_upload( @@ -1956,6 +2076,7 @@ def _do_upload( if_metageneration_match, if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) return response.json() @@ -1974,6 +2095,7 @@ def upload_from_file( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Upload the contents of this blob from a file-like object. @@ -2068,6 +2190,19 @@ def upload_from_file( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. @@ -2094,6 +2229,7 @@ def upload_from_file( if_metageneration_match, if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: @@ -2110,6 +2246,7 @@ def upload_from_filename( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Upload this blob's contents from the content of a named file. @@ -2176,6 +2313,19 @@ def upload_from_filename( repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. """ content_type = self._get_content_type(content_type, filename=filename) @@ -2192,6 +2342,7 @@ def upload_from_filename( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) def upload_from_string( @@ -2205,6 +2356,7 @@ def upload_from_string( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Upload contents of this blob from the provided string. @@ -2266,6 +2418,19 @@ def upload_from_string( repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. """ data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) @@ -2289,6 +2454,7 @@ def create_resumable_upload_session( origin=None, client=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Create a resumable upload session. @@ -2354,6 +2520,16 @@ def create_resumable_upload_session( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. After the upload is complete, the + server-computed checksum of the resulting object will be checked + and google.resumable_media.common.DataCorruption will be raised on + a mismatch. On a validation failure, the client will attempt to + delete the uploaded object automatically. Supported values + are "md5", "crc32c" and None. The default is None. + :rtype: str :returns: The resumable upload session URL. The upload can be completed by making an HTTP PUT request with the @@ -2383,6 +2559,7 @@ def create_resumable_upload_session( extra_headers=extra_headers, chunk_size=self._CHUNK_SIZE_MULTIPLE, timeout=timeout, + checksum=checksum, ) return upload.resumable_url diff --git a/setup.py b/setup.py index e74f251cc..ce0ebbecf 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ dependencies = [ "google-auth >= 1.11.0, < 2.0dev", "google-cloud-core >= 1.4.1, < 2.0dev", - "google-resumable-media >= 0.6.0, < 2.0dev", + "google-resumable-media >= 1.0.0, < 2.0dev", ] extras = {} diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 7d6e79b07..e6636b41d 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -22,6 +22,7 @@ import tempfile import time import unittest +import mock import requests import six @@ -33,6 +34,8 @@ from google.cloud.storage.bucket import LifecycleRuleDelete from google.cloud.storage.bucket import LifecycleRuleSetStorageClass from google.cloud import kms +from google import resumable_media +import google.auth import google.api_core from google.api_core import path_template import google.oauth2 @@ -573,6 +576,37 @@ def test_large_file_write_from_stream(self): md5_hash = md5_hash.encode("utf-8") self.assertEqual(md5_hash, file_data["hash"]) + def test_large_file_write_from_stream_with_checksum(self): + blob = self.bucket.blob("LargeFile") + + file_data = self.FILES["big"] + with open(file_data["path"], "rb") as file_obj: + blob.upload_from_file(file_obj, checksum="crc32c") + self.case_blobs_to_delete.append(blob) + + md5_hash = blob.md5_hash + if not isinstance(md5_hash, six.binary_type): + md5_hash = md5_hash.encode("utf-8") + self.assertEqual(md5_hash, file_data["hash"]) + + def test_large_file_write_from_stream_with_failed_checksum(self): + blob = self.bucket.blob("LargeFile") + + file_data = self.FILES["big"] + + # Intercept the digest processing at the last stage and replace it with garbage. + # This is done with a patch to monkey-patch the resumable media library's checksum + # processing; it does not mock a remote interface like a unit test would. The + # remote API is still exercised. + with open(file_data["path"], "rb") as file_obj: + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(resumable_media.DataCorruption): + blob.upload_from_file(file_obj, checksum="crc32c") + self.assertFalse(blob.exists()) + def test_large_encrypted_file_write_from_stream(self): blob = self.bucket.blob("LargeFile", encryption_key=self.ENCRYPTION_KEY) @@ -607,6 +641,32 @@ def test_small_file_write_from_filename(self): md5_hash = md5_hash.encode("utf-8") self.assertEqual(md5_hash, file_data["hash"]) + def test_small_file_write_from_filename_with_checksum(self): + blob = self.bucket.blob("SmallFile") + + file_data = self.FILES["simple"] + blob.upload_from_filename(file_data["path"], checksum="crc32c") + self.case_blobs_to_delete.append(blob) + + md5_hash = blob.md5_hash + if not isinstance(md5_hash, six.binary_type): + md5_hash = md5_hash.encode("utf-8") + self.assertEqual(md5_hash, file_data["hash"]) + + def test_small_file_write_from_filename_with_failed_checksum(self): + blob = self.bucket.blob("SmallFile") + + file_data = self.FILES["simple"] + # Intercept the digest processing at the last stage and replace it with garbage + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(google.api_core.exceptions.BadRequest): + blob.upload_from_filename(file_data["path"], checksum="crc32c") + + self.assertFalse(blob.exists()) + @unittest.skipUnless(USER_PROJECT, "USER_PROJECT not set in environment.") def test_crud_blob_w_user_project(self): with_user_project = Config.CLIENT.bucket( @@ -836,6 +896,35 @@ def test_download_w_generation_match(self): self.assertEqual(file_contents, stored_contents) + def test_download_w_failed_crc32c_checksum(self): + blob = self.bucket.blob("FailedChecksumBlob") + file_contents = b"Hello World" + blob.upload_from_string(file_contents) + self.case_blobs_to_delete.append(blob) + + with tempfile.NamedTemporaryFile() as temp_f: + # Intercept the digest processing at the last stage and replace it with garbage. + # This is done with a patch to monkey-patch the resumable media library's checksum + # processing; it does not mock a remote interface like a unit test would. The + # remote API is still exercised. + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(resumable_media.DataCorruption): + blob.download_to_filename(temp_f.name, checksum="crc32c") + + # Confirm the file was deleted on failure + self.assertFalse(os.path.isfile(temp_f.name)) + + # Now download with checksumming turned off + blob.download_to_filename(temp_f.name, checksum=None) + + with open(temp_f.name, "rb") as file_obj: + stored_contents = file_obj.read() + + self.assertEqual(file_contents, stored_contents) + def test_copy_existing_file(self): filename = self.FILES["logo"]["path"] blob = storage.Blob("CloudLogo", bucket=self.bucket) diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index d4b60a28c..9bf60d42d 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -986,11 +986,21 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): if w_range: patched.assert_called_once_with( - download_url, stream=file_obj, headers=headers, start=1, end=3 + download_url, + stream=file_obj, + headers=headers, + start=1, + end=3, + checksum="md5", ) else: patched.assert_called_once_with( - download_url, stream=file_obj, headers=headers, start=None, end=None + download_url, + stream=file_obj, + headers=headers, + start=None, + end=None, + checksum="md5", ) patched.return_value.consume.assert_called_once_with( @@ -1014,7 +1024,9 @@ def test__do_download_wo_chunks_w_custom_timeout(self): w_range=False, raw_download=False, timeout=9.58 ) - def _do_download_helper_w_chunks(self, w_range, raw_download, timeout=None): + def _do_download_helper_w_chunks( + self, w_range, raw_download, timeout=None, checksum="md5" + ): blob_name = "blob-name" client = mock.Mock(_credentials=_make_credentials(), spec=["_credentials"]) bucket = _Bucket(client) @@ -1057,6 +1069,7 @@ def side_effect(*args, **kwargs): start=1, end=3, raw_download=raw_download, + checksum=checksum, **timeout_kwarg ) else: @@ -1066,6 +1079,7 @@ def side_effect(*args, **kwargs): download_url, headers, raw_download=raw_download, + checksum=checksum, **timeout_kwarg ) @@ -1096,6 +1110,24 @@ def test__do_download_w_chunks_w_range_w_raw(self): def test__do_download_w_chunks_w_custom_timeout(self): self._do_download_helper_w_chunks(w_range=True, raw_download=True, timeout=9.58) + def test__do_download_w_chunks_w_checksum(self): + from google.cloud.storage import blob as blob_module + + with mock.patch("logging.info") as patch: + self._do_download_helper_w_chunks( + w_range=False, raw_download=False, checksum="md5" + ) + patch.assert_called_once_with( + blob_module._CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format("md5") + ) + + def test__do_download_w_chunks_wo_checksum(self): + with mock.patch("logging.info") as patch: + self._do_download_helper_w_chunks( + w_range=False, raw_download=False, checksum=None + ) + patch.assert_not_called() + def test_download_to_file_with_failure(self): import requests from google.resumable_media import InvalidResponse @@ -1132,6 +1164,7 @@ def test_download_to_file_with_failure(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_to_file_wo_media_link(self): @@ -1162,6 +1195,7 @@ def test_download_to_file_wo_media_link(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_to_file_w_generation_match(self): @@ -1191,6 +1225,7 @@ def test_download_to_file_w_generation_match(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): @@ -1228,6 +1263,7 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) def test_download_to_file_wo_chunks_wo_raw(self): @@ -1293,6 +1329,7 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1324,6 +1361,7 @@ def test_download_to_filename_w_generation_match(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_to_filename_w_updated_wo_raw(self): @@ -1381,6 +1419,7 @@ def test_download_to_filename_corrupted(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, filename) @@ -1415,6 +1454,7 @@ def test_download_to_filename_w_key(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1446,6 +1486,7 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) @@ -1524,6 +1565,7 @@ def test_download_as_bytes_w_generation_match(self): if_metageneration_match=None, if_metageneration_not_match=None, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_as_bytes_wo_raw(self): @@ -1565,6 +1607,7 @@ def _download_as_text_helper(self, raw_download, encoding=None, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) @@ -1593,6 +1636,7 @@ def test_download_as_text_w_generation_match(self): if_metageneration_match=None, if_metageneration_not_match=None, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_as_text_wo_raw(self): @@ -1631,6 +1675,7 @@ def test_download_as_string(self, mock_warn): if_metageneration_match=None, if_metageneration_not_match=None, timeout=self._get_default_timeout(), + checksum="md5", ) mock_warn.assert_called_with( @@ -2132,7 +2177,9 @@ def test__initiate_resumable_upload_with_generation_not_match(self): def test__initiate_resumable_upload_with_predefined_acl(self): self._initiate_resumable_helper(predefined_acl="private") - def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes): + def _make_resumable_transport( + self, headers1, headers2, headers3, total_bytes, data_corruption=False + ): from google import resumable_media fake_transport = mock.Mock(spec=["request"]) @@ -2142,9 +2189,12 @@ def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes): resumable_media.PERMANENT_REDIRECT, headers2 ) json_body = '{{"size": "{:d}"}}'.format(total_bytes) - fake_response3 = self._mock_requests_response( - http_client.OK, headers3, content=json_body.encode("utf-8") - ) + if data_corruption: + fake_response3 = resumable_media.DataCorruption(None) + else: + fake_response3 = self._mock_requests_response( + http_client.OK, headers3, content=json_body.encode("utf-8") + ) responses = [fake_response1, fake_response2, fake_response3] fake_transport.request.side_effect = responses @@ -2255,6 +2305,7 @@ def _do_resumable_helper( if_metageneration_match=None, if_metageneration_not_match=None, timeout=None, + data_corruption=False, ): bucket = _Bucket(name="yesterday") blob = self._make_one(u"blob-name", bucket=bucket) @@ -2274,7 +2325,7 @@ def _do_resumable_helper( headers1 = {"location": resumable_url} headers2 = {"range": "bytes=0-{:d}".format(blob.chunk_size - 1)} transport, responses = self._make_resumable_transport( - headers1, headers2, {}, total_bytes + headers1, headers2, {}, total_bytes, data_corruption=data_corruption ) # Create some mock arguments and call the method under test. @@ -2363,6 +2414,16 @@ def test__do_resumable_upload_with_retry(self): def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") + def test__do_resumable_upload_with_data_corruption(self): + from google.resumable_media import DataCorruption + + with mock.patch("google.cloud.storage.blob.Blob.delete") as patch: + try: + self._do_resumable_helper(data_corruption=True) + except Exception as e: + self.assertTrue(patch.called) + self.assertIsInstance(e, DataCorruption) + def _do_upload_helper( self, chunk_size=None, @@ -2434,6 +2495,7 @@ def _do_upload_helper( if_metageneration_match, if_metageneration_not_match, timeout=expected_timeout, + checksum=None, ) blob._do_resumable_upload.assert_not_called() else: @@ -2450,6 +2512,7 @@ def _do_upload_helper( if_metageneration_match, if_metageneration_not_match, timeout=expected_timeout, + checksum=None, ) def test__do_upload_uses_multipart(self): @@ -2526,6 +2589,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_match, if_metageneration_not_match, timeout=expected_timeout, + checksum=None, ) return stream @@ -2586,7 +2650,7 @@ def _do_upload_mock_call_helper( self.assertIsNone(pos_args[9]) # if_metageneration_not_match expected_timeout = self._get_default_timeout() if timeout is None else timeout - self.assertEqual(kwargs, {"timeout": expected_timeout}) + self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None}) return pos_args[1]