diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 33b809d3c..52cd9c7d3 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -30,6 +30,7 @@ import copy import hashlib from io import BytesIO +from io import TextIOWrapper import logging import mimetypes import os @@ -78,6 +79,8 @@ from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.fileio import BlobReader +from google.cloud.storage.fileio import BlobWriter _API_ACCESS_ENDPOINT = "https://storage.googleapis.com" @@ -144,7 +147,9 @@ class Blob(_PropertyMixin): :type chunk_size: int :param chunk_size: (Optional) The size of a chunk of data whenever iterating (in bytes). - This must be a multiple of 256 KB per the API specification. + This must be a multiple of 256 KB per the API specification. If not + specified, the chunk_size of the blob itself is used. If that is not + specified, a default value of 40 MB is used. :type encryption_key: bytes :param encryption_key: @@ -3407,6 +3412,126 @@ def update_storage_class( retry=retry, ) + def open( + self, + mode="r", + chunk_size=None, + encoding=None, + errors=None, + newline=None, + **kwargs + ): + r"""Create a file handler for file-like I/O to or from this blob. + + This method can be used as a context manager, just like Python's + built-in 'open()' function. + + While reading, as with other read methods, if blob.generation is not set + the most recent blob generation will be used. Because the file-like IO + reader downloads progressively in chunks, this could result in data from + multiple versions being mixed together. If this is a concern, use + either bucket.get_blob(), or blob.reload(), which will download the + latest generation number and set it; or, if the generation is known, set + it manually, for instance with bucket.blob(generation=123456). + + :type mode: str + :param mode: + (Optional) A mode string, as per standard Python `open()` semantics.The first + character must be 'r', to open the blob for reading, or 'w' to open + it for writing. The second character, if present, must be 't' for + (unicode) text mode, or 'b' for bytes mode. If the second character + is omitted, text mode is the default. + + :type chunk_size: long + :param chunk_size: + (Optional) For reads, the minimum number of bytes to read at a time. + If fewer bytes than the chunk_size are requested, the remainder is + buffered. For writes, the maximum number of bytes to buffer before + sending data to the server, and the size of each request when data + is sent. Writes are implemented as a "resumable upload", so + chunk_size for writes must be exactly a multiple of 256KiB as with + other resumable uploads. The default is 40 MiB. + + :type encoding: str + :param encoding: + (Optional) For text mode only, the name of the encoding that the stream will + be decoded or encoded with. If omitted, it defaults to + locale.getpreferredencoding(False). + + :type errors: str + :param errors: + (Optional) For text mode only, an optional string that specifies how encoding + and decoding errors are to be handled. Pass 'strict' to raise a + ValueError exception if there is an encoding error (the default of + None has the same effect), or pass 'ignore' to ignore errors. (Note + that ignoring encoding errors can lead to data loss.) Other more + rarely-used options are also available; see the Python 'io' module + documentation for 'io.TextIOWrapper' for a complete list. + + :type newline: str + :param newline: + (Optional) For text mode only, controls how line endings are handled. It can + be None, '', '\n', '\r', and '\r\n'. If None, reads use "universal + newline mode" and writes use the system default. See the Python + 'io' module documentation for 'io.TextIOWrapper' for details. + + :param kwargs: Keyword arguments to pass to the underlying API calls. + For both uploads and downloads, the following arguments are + supported: "if_generation_match", "if_generation_not_match", + "if_metageneration_match", "if_metageneration_not_match", "timeout". + For uploads only, the following additional arguments are supported: + "content_type", "num_retries", "predefined_acl", "checksum". + + :returns: A 'BlobReader' or 'BlobWriter' from + 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one + of those classes, depending on the 'mode' argument. + + Example: + Read from a text blob by using open() as context manager. + + Using bucket.get_blob() fetches metadata such as the generation, + which prevents race conditions in case the blob is modified. + + >>> from google.cloud import storage + >>> client = storage.Client() + >>> bucket = client.bucket("bucket-name") + + >>> blob = bucket.get_blob("blob-name.txt") + >>> with blob.open("rt") as f: + >>> print(f.read()) + + """ + if mode == "rb": + if encoding or errors or newline: + raise ValueError( + "encoding, errors and newline arguments are for text mode only" + ) + return BlobReader(self, chunk_size=chunk_size, **kwargs) + elif mode == "wb": + if encoding or errors or newline: + raise ValueError( + "encoding, errors and newline arguments are for text mode only" + ) + return BlobWriter(self, chunk_size=chunk_size, **kwargs) + elif mode in ("r", "rt"): + return TextIOWrapper( + BlobReader(self, chunk_size=chunk_size, **kwargs), + encoding=encoding, + errors=errors, + newline=newline, + ) + elif mode in ("w", "wt"): + return TextIOWrapper( + BlobWriter(self, chunk_size=chunk_size, text_mode=True, **kwargs), + encoding=encoding, + errors=errors, + newline=newline, + ) + else: + raise NotImplementedError( + "Supported modes strings are 'r', 'rb', 'rt', 'w', 'wb', and 'wt' only." + ) + cache_control = _scalar_property("cacheControl") """HTTP 'Cache-Control' header for this object. diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py new file mode 100644 index 000000000..07f2f11a3 --- /dev/null +++ b/google/cloud/storage/fileio.py @@ -0,0 +1,421 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io + +# Resumable uploads require a chunk size of precisely a multiple of 256 KiB. +CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB +DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB + +# Valid keyword arguments for download methods, and blob.reload() if needed. +# Note: Changes here need to be reflected in the blob.open() docstring. +VALID_DOWNLOAD_KWARGS = { + "if_generation_match", + "if_generation_not_match", + "if_metageneration_match", + "if_metageneration_not_match", + "timeout", +} + +# Valid keyword arguments for upload methods. +# Note: Changes here need to be reflected in the blob.open() docstring. +VALID_UPLOAD_KWARGS = { + "content_type", + "num_retries", + "predefined_acl", + "if_generation_match", + "if_generation_not_match", + "if_metageneration_match", + "if_metageneration_not_match", + "timeout", + "checksum", +} + + +class BlobReader(io.BufferedIOBase): + """A file-like object that reads from a blob. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to download. + + :type chunk_size: long + :param chunk_size: + (Optional) The minimum number of bytes to read at a time. If fewer + bytes than the chunk_size are requested, the remainder is buffered. + The default is the chunk_size of the blob, or 40MiB. + + :param download_kwargs: Keyword arguments to pass to the underlying API + calls. The following arguments are supported: "if_generation_match", + "if_generation_not_match", "if_metageneration_match", + "if_metageneration_not_match", "timeout". + """ + + def __init__(self, blob, chunk_size=None, **download_kwargs): + """docstring note that download_kwargs also used for reload()""" + for kwarg in download_kwargs: + if kwarg not in VALID_DOWNLOAD_KWARGS: + raise ValueError( + "BlobReader does not support keyword argument {}.".format(kwarg) + ) + + self._blob = blob + self._pos = 0 + self._buffer = io.BytesIO() + self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + self._download_kwargs = download_kwargs + + def read(self, size=-1): + self._checkClosed() # Raises ValueError if closed. + + result = self._buffer.read(size) + # If the read request demands more bytes than are buffered, fetch more. + remaining_size = size - len(result) + if remaining_size > 0 or size < 0: + self._buffer.seek(0) + self._buffer.truncate(0) # Clear the buffer to make way for new data. + fetch_start = self._pos + len(result) + if size > 0: + # Fetch the larger of self._chunk_size or the remaining_size. + fetch_end = fetch_start + max(remaining_size, self._chunk_size) + else: + fetch_end = None + + # Download the blob. + result += self._blob.download_as_bytes( + start=fetch_start, end=fetch_end, **self._download_kwargs + ) + + # If more bytes were read than is immediately needed, buffer the + # remainder and then trim the result. + if size > 0 and len(result) > size: + self._buffer.write(result[size:]) + self._buffer.seek(0) + result = result[:size] + + self._pos += len(result) + + return result + + def read1(self, size=-1): + return self.read(size) + + def seek(self, pos, whence=0): + """Seek within the blob. + + This implementation of seek() uses knowledge of the blob size to + validate that the reported position does not exceed the blob last byte. + If the blob size is not already known it will call blob.reload(). + """ + self._checkClosed() # Raises ValueError if closed. + + if self._blob.size is None: + self._blob.reload(**self._download_kwargs) + + initial_pos = self._pos + + if whence == 0: + self._pos = pos + elif whence == 1: + self._pos += pos + elif whence == 2: + self._pos = self._blob.size + pos + if whence not in {0, 1, 2}: + raise ValueError("invalid whence value") + + if self._pos > self._blob.size: + self._pos = self._blob.size + + # Seek or invalidate buffer as needed. + difference = self._pos - initial_pos + new_buffer_pos = self._buffer.seek(difference, 1) + if new_buffer_pos != difference: # Buffer does not contain new pos. + # Invalidate buffer. + self._buffer.seek(0) + self._buffer.truncate(0) + + return self._pos + + def close(self): + self._buffer.close() + + def _checkClosed(self): + if self._buffer.closed: + raise ValueError("I/O operation on closed file.") + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + + +class BlobWriter(io.BufferedIOBase): + """A file-like object that writes to a blob. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to which to write. + + :type chunk_size: long + :param chunk_size: + (Optional) The maximum number of bytes to buffer before sending data + to the server, and the size of each request when data is sent. + Writes are implemented as a "resumable upload", so chunk_size for + writes must be exactly a multiple of 256KiB as with other resumable + uploads. The default is the chunk_size of the blob, or 40 MiB. + + :type text_mode: boolean + :param text_mode: + Whether this class is wrapped in 'io.TextIOWrapper'. Toggling this + changes the behavior of flush() to conform to TextIOWrapper's + expectations. + + :param upload_kwargs: Keyword arguments to pass to the underlying API + calls. The following arguments are supported: "if_generation_match", + "if_generation_not_match", "if_metageneration_match", + "if_metageneration_not_match", "timeout", "content_type", + "num_retries", "predefined_acl", "checksum". + """ + + def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): + for kwarg in upload_kwargs: + if kwarg not in VALID_UPLOAD_KWARGS: + raise ValueError( + "BlobWriter does not support keyword argument {}.".format(kwarg) + ) + self._blob = blob + self._buffer = SlidingBuffer() + self._upload_and_transport = None + # Resumable uploads require a chunk size of a multiple of 256KiB. + # self._chunk_size must not be changed after the upload is initiated. + self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + # In text mode this class will be wrapped and TextIOWrapper requires a + # different behavior of flush(). + self._text_mode = text_mode + self._upload_kwargs = upload_kwargs + + @property + def _chunk_size(self): + """Get the blob's default chunk size. + + :rtype: int or ``NoneType`` + :returns: The current blob's chunk size, if it is set. + """ + return self.__chunk_size + + @_chunk_size.setter + def _chunk_size(self, value): + """Set the blob's default chunk size. + + :type value: int + :param value: (Optional) The current blob's chunk size, if it is set. + + :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a + multiple of 256 KiB. + """ + if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0: + raise ValueError( + "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE + ) + self.__chunk_size = value + + def write(self, b): + self._checkClosed() # Raises ValueError if closed. + + pos = self._buffer.write(b) + + # If there is enough content, upload chunks. + num_chunks = len(self._buffer) // self._chunk_size + if num_chunks: + self._upload_chunks_from_buffer(num_chunks) + + return pos + + def _initiate_upload(self): + num_retries = self._upload_kwargs.pop("num_retries", None) + content_type = self._upload_kwargs.pop("content_type", None) + + if ( + self._upload_kwargs.get("if_metageneration_match") is None + and num_retries is None + ): + # Uploads are only idempotent (safe to retry) if + # if_metageneration_match is set. If it is not set, the default + # num_retries should be 0. Note: Because retry logic for uploads is + # provided by the google-resumable-media-python package, it doesn't + # use the ConditionalRetryStrategy class used in other API calls in + # this library to solve this problem. + num_retries = 0 + + self._upload_and_transport = self._blob._initiate_resumable_upload( + self._blob.bucket.client, + self._buffer, + content_type, + None, + num_retries, + chunk_size=self._chunk_size, + **self._upload_kwargs + ) + + def _upload_chunks_from_buffer(self, num_chunks): + """Upload a specified number of chunks.""" + + # Initialize the upload if necessary. + if not self._upload_and_transport: + self._initiate_upload() + + upload, transport = self._upload_and_transport + + # Upload chunks. The SlidingBuffer class will manage seek position. + for _ in range(num_chunks): + upload.transmit_next_chunk(transport) + + # Wipe the buffer of chunks uploaded, preserving any remaining data. + self._buffer.flush() + + def tell(self): + return self._buffer.tell() + len(self._buffer) + + def flush(self): + if self._text_mode: + # TextIOWrapper expects this method to succeed before calling close(). + return + + raise io.UnsupportedOperation( + "Cannot flush without finalizing upload. Use close() instead." + ) + + def close(self): + self._checkClosed() # Raises ValueError if closed. + + self._upload_chunks_from_buffer(1) + self._buffer.close() + + def _checkClosed(self): + if self._buffer.closed: + raise ValueError("I/O operation on closed file.") + + def readable(self): + return False + + def writable(self): + return True + + def seekable(self): + return False + + +class SlidingBuffer(object): + """A non-rewindable buffer that frees memory of chunks already consumed. + + This class is necessary because `google-resumable-media-python` expects + `tell()` to work relative to the start of the file, not relative to a place + in an intermediate buffer. Using this class, we present an external + interface with consistent seek and tell behavior without having to actually + store bytes already sent. + + Behavior of this class differs from an ordinary BytesIO buffer. `write()` + will always append to the end of the file only and not change the seek + position otherwise. `flush()` will delete all data already read (data to the + left of the seek position). `tell()` will report the seek position of the + buffer including all deleted data. Additionally the class implements + __len__() which will report the size of the actual underlying buffer. + + This class does not attempt to implement the entire Python I/O interface. + """ + + def __init__(self): + self._buffer = io.BytesIO() + self._cursor = 0 + + def write(self, b): + """Append to the end of the buffer without changing the position.""" + self._checkClosed() # Raises ValueError if closed. + + bookmark = self._buffer.tell() + self._buffer.seek(0, io.SEEK_END) + pos = self._buffer.write(b) + self._buffer.seek(bookmark) + return self._cursor + pos + + def read(self, size=-1): + """Read and move the cursor.""" + self._checkClosed() # Raises ValueError if closed. + + data = self._buffer.read(size) + self._cursor += len(data) + return data + + def flush(self): + """Delete already-read data (all data to the left of the position).""" + self._checkClosed() # Raises ValueError if closed. + + # BytesIO can't be deleted from the left, so save any leftover, unread + # data and truncate at 0, then readd leftover data. + leftover = self._buffer.read() + self._buffer.seek(0) + self._buffer.truncate(0) + self._buffer.write(leftover) + self._buffer.seek(0) + + def tell(self): + """Report how many bytes have been read from the buffer in total.""" + return self._cursor + + def seek(self, pos): + """Seek to a position (backwards only) within the internal buffer. + + This implementation of seek() verifies that the seek destination is + contained in _buffer. It will raise ValueError if the destination byte + has already been purged from the buffer. + + The "whence" argument is not supported in this implementation. + """ + self._checkClosed() # Raises ValueError if closed. + + buffer_initial_pos = self._buffer.tell() + difference = pos - self._cursor + buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR) + if ( + not buffer_seek_result - buffer_initial_pos == difference + or pos > self._cursor + ): + # The seek did not arrive at the expected byte because the internal + # buffer does not (or no longer) contains the byte. Reset and raise. + self._buffer.seek(buffer_initial_pos) + raise ValueError("Cannot seek() to that value.") + + self._cursor = pos + return self._cursor + + def __len__(self): + """Determine the size of the buffer by seeking to the end.""" + bookmark = self._buffer.tell() + length = self._buffer.seek(0, io.SEEK_END) + self._buffer.seek(bookmark) + return length + + def close(self): + return self._buffer.close() + + def _checkClosed(self): + return self._buffer._checkClosed() + + @property + def closed(self): + return self._buffer.closed diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 9e7a86c2f..31ffd9c8b 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -1,3 +1,5 @@ +# coding=utf-8 + # Copyright 2014 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -1103,6 +1105,58 @@ def test_blob_crc32_md5_hash(self): self.assertEqual(download_blob.crc32c, blob.crc32c) self.assertEqual(download_blob.md5_hash, blob.md5_hash) + def test_blobwriter_and_blobreader(self): + blob = self.bucket.blob("LargeFile") + + # Test BlobWriter works. + file_data = self.FILES["big"] + with open(file_data["path"], "rb") as file_obj: + with blob.open("wb", chunk_size=256 * 1024) as writer: + writer.write(file_obj.read(100)) + writer.write(file_obj.read(256 * 1024)) + writer.write(file_obj.read()) + self.case_blobs_to_delete.append(blob) + + blob.reload() + md5_hash = blob.md5_hash + if not isinstance(md5_hash, six.binary_type): + md5_hash = md5_hash.encode("utf-8") + self.assertEqual(md5_hash, file_data["hash"]) + + # Test BlobReader read and seek behave identically to filesystem file. + with open(file_data["path"], "rb") as file_obj: + with blob.open("rb", chunk_size=256 * 1024) as reader: + self.assertEqual(file_obj.read(100), reader.read(100)) + self.assertEqual(file_obj.read(256 * 1024), reader.read(256 * 1024)) + reader.seek(20) + file_obj.seek(20) + self.assertEqual( + file_obj.read(256 * 1024 * 2), reader.read(256 * 1024 * 2) + ) + self.assertEqual(file_obj.read(), reader.read()) + + def test_blobwriter_and_blobreader_text_mode(self): + blob = self.bucket.blob("MultibyteTextFile") + + # Construct a multibyte text_data sample file. + base_multibyte_text_string = u"abcde あいうえお line: " + text_data = "\n".join([base_multibyte_text_string + str(x) for x in range(100)]) + + # Test text BlobWriter works. + with blob.open("wt") as writer: + writer.write(text_data[:100]) + writer.write(text_data[100:]) + self.case_blobs_to_delete.append(blob) + + # Test text BlobReader read and seek to 0. Seeking to an non-0 byte on a + # multibyte text stream is not safe in Python but the API expects + # seek() to work regadless. + with blob.open("rt") as reader: + # This should produce 100 characters, not 100 bytes. + self.assertEqual(text_data[:100], reader.read(100)) + self.assertEqual(0, reader.seek(0)) + self.assertEqual(text_data, reader.read()) + class TestUnicode(TestStorageFiles): def test_fetch_object_and_check_content(self): diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 4aacc3a8c..e8573ce21 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -4673,6 +4673,40 @@ def test_from_string_w_domain_name_bucket(self): self.assertEqual(blob.name, "b") self.assertEqual(blob.bucket.name, "buckets.example.com") + def test_open(self): + from io import TextIOWrapper + from google.cloud.storage.fileio import BlobReader + from google.cloud.storage.fileio import BlobWriter + + blob_name = "blob-name" + client = self._make_client() + bucket = _Bucket(client) + blob = self._make_one(blob_name, bucket=bucket) + + f = blob.open("r") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobReader) + f = blob.open("rt") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobReader) + f = blob.open("rb") + self.assertEqual(type(f), BlobReader) + f = blob.open("w") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobWriter) + f = blob.open("wt") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobWriter) + f = blob.open("wb") + self.assertEqual(type(f), BlobWriter) + + with self.assertRaises(NotImplementedError): + blob.open("a") + with self.assertRaises(ValueError): + blob.open("rb", encoding="utf-8") + with self.assertRaises(ValueError): + blob.open("wb", encoding="utf-8") + class Test__quote(unittest.TestCase): @staticmethod diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py new file mode 100644 index 000000000..09fa58f8d --- /dev/null +++ b/tests/unit/test_fileio.py @@ -0,0 +1,593 @@ +# coding=utf-8 + +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import mock +import io +from google.cloud.storage.fileio import BlobReader, BlobWriter, SlidingBuffer +import string + +TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n" +TEST_BINARY_DATA = TEST_TEXT_DATA.encode("utf-8") +TEST_MULTIBYTE_TEXT_DATA = u"あいうえおかきくけこさしすせそたちつてと" +PLAIN_CONTENT_TYPE = "text/plain" +NUM_RETRIES = 2 + + +class TestBlobReaderBinary(unittest.TestCase): + def test_attributes(self): + blob = mock.Mock() + blob.chunk_size = 256 + reader = BlobReader(blob) + self.assertTrue(reader.seekable()) + self.assertTrue(reader.readable()) + self.assertFalse(reader.writable()) + self.assertEqual(256, reader._chunk_size) + + def test_read(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, **download_kwargs) + + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) + blob.download_as_bytes.assert_called_once_with( + start=0, end=8, **download_kwargs + ) + + # Read from buffered data only. + self.assertEqual(reader.read(3), TEST_BINARY_DATA[1:4]) + blob.download_as_bytes.assert_called_once() + + # Read remaining buffer plus an additional chunk read. + self.assertEqual(reader.read(8), TEST_BINARY_DATA[4:12]) + self.assertEqual(reader._pos, 12) + self.assertEqual(blob.download_as_bytes.call_count, 2) + blob.download_as_bytes.assert_called_with(start=8, end=16, **download_kwargs) + + # Read a larger amount, requiring a download larger than chunk_size. + self.assertEqual(reader.read(16), TEST_BINARY_DATA[12:28]) + self.assertEqual(reader._pos, 28) + self.assertEqual(blob.download_as_bytes.call_count, 3) + blob.download_as_bytes.assert_called_with(start=16, end=28, **download_kwargs) + + # Read all remaining data. + self.assertEqual(reader.read(), TEST_BINARY_DATA[28:]) + self.assertEqual(blob.download_as_bytes.call_count, 4) + blob.download_as_bytes.assert_called_with(start=28, end=None, **download_kwargs) + + reader.close() + + def test_readline(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + reader = BlobReader(blob, chunk_size=10) + + # Read a line. With chunk_size=10, expect three chunks downloaded. + self.assertEqual(reader.readline(), TEST_BINARY_DATA[:27]) + blob.download_as_bytes.assert_called_with(start=20, end=30) + self.assertEqual(blob.download_as_bytes.call_count, 3) + + # Read another line. + self.assertEqual(reader.readline(), TEST_BINARY_DATA[27:]) + blob.download_as_bytes.assert_called_with(start=50, end=60) + self.assertEqual(blob.download_as_bytes.call_count, 6) + + blob.size = len(TEST_BINARY_DATA) + reader.seek(0) + + # Read all lines. The readlines algorithm will attempt to read past the end of the last line once to verify there is no more to read. + self.assertEqual(b"".join(reader.readlines()), TEST_BINARY_DATA) + blob.download_as_bytes.assert_called_with( + start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10 + ) + self.assertEqual(blob.download_as_bytes.call_count, 13) + + reader.close() + + def test_seek(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.size = None + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, **download_kwargs) + + # Seek needs the blob size to work and should call reload() if the size + # is not known. Set a mock to initialize the size if reload() is called. + def initialize_size(**_): + blob.size = len(TEST_BINARY_DATA) + + blob.reload = mock.Mock(side_effect=initialize_size) + + # Seek, forcing a blob reload in order to validate the seek doesn't + # exceed the end of the blob. + self.assertEqual(reader.seek(4), 4) + blob.reload.assert_called_once_with(**download_kwargs) + self.assertEqual(reader.read(4), TEST_BINARY_DATA[4:8]) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # Seek forward 2 bytes with whence=1. Position is still in buffer. + self.assertEqual(reader.seek(2, 1), 10) + self.assertEqual(reader.read(2), TEST_BINARY_DATA[10:12]) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # Attempt seek past end of file. Position should be at end of file. + self.assertEqual( + reader.seek(len(TEST_BINARY_DATA) + 100), len(TEST_BINARY_DATA) + ) + + # Seek to beginning. The next read will need to download data again. + self.assertEqual(reader.seek(0), 0) + self.assertEqual(reader.read(4), TEST_BINARY_DATA[0:4]) + self.assertEqual(blob.download_as_bytes.call_count, 2) + + # Seek relative to end with whence=2. + self.assertEqual(reader.seek(-1, 2), len(TEST_BINARY_DATA) - 1) + self.assertEqual(reader.read(), TEST_BINARY_DATA[-1:]) + self.assertEqual(blob.download_as_bytes.call_count, 3) + + with self.assertRaises(ValueError): + reader.seek(1, 4) + + # tell() is an inherited method that uses seek(). + self.assertEqual(reader.tell(), reader._pos) + + reader.close() + + def test_close(self): + blob = mock.Mock() + reader = BlobReader(blob) + + reader.close() + + with self.assertRaises(ValueError): + reader.read() + + with self.assertRaises(ValueError): + reader.seek(0) + + def test_context_mgr(self): + # Just very that the context manager form doesn't crash. + blob = mock.Mock() + with BlobReader(blob) as reader: + reader.close() + + def test_rejects_invalid_kwargs(self): + blob = mock.Mock() + with self.assertRaises(ValueError): + BlobReader(blob, invalid_kwarg=1) + + +class TestBlobWriterBinary(unittest.TestCase): + def test_attributes(self): + blob = mock.Mock() + blob.chunk_size = 256 * 1024 + writer = BlobWriter(blob) + self.assertFalse(writer.seekable()) + self.assertFalse(writer.readable()) + self.assertTrue(writer.writable()) + self.assertEqual(256 * 1024, writer._chunk_size) + + def test_reject_wrong_chunk_size(self): + blob = mock.Mock() + blob.chunk_size = 123 + with self.assertRaises(ValueError): + _ = BlobWriter(blob) + + def test_write(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer with (arbitrary) arguments so we can validate the + # arguments are used. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + upload_kwargs = {"if_metageneration_match": 1} + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + num_retries=NUM_RETRIES, + content_type=PLAIN_CONTENT_TYPE, + **upload_kwargs + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, + NUM_RETRIES, + chunk_size=chunk_size, + **upload_kwargs + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + self.assertEqual(writer.tell(), 33) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_flush_fails(self): + blob = mock.Mock(chunk_size=None) + writer = BlobWriter(blob) + + with self.assertRaises(io.UnsupportedOperation): + writer.flush() + + def test_seek_fails(self): + blob = mock.Mock(chunk_size=None) + writer = BlobWriter(blob) + + with self.assertRaises(io.UnsupportedOperation): + writer.seek() + + def test_conditional_retries(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + num_retries=None, + content_type=PLAIN_CONTENT_TYPE, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + # Due to the condition not being fulfilled, num_retries should be 0. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, + 0, + chunk_size=chunk_size, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_rejects_invalid_kwargs(self): + blob = mock.Mock() + with self.assertRaises(ValueError): + BlobWriter(blob, invalid_kwarg=1) + + +class Test_SlidingBuffer(unittest.TestCase): + def test_write_and_read(self): + buff = SlidingBuffer() + + # Write and verify tell() still reports 0 and len is correct. + buff.write(TEST_BINARY_DATA) + self.assertEqual(buff.tell(), 0) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + # Read and verify tell() reports end. + self.assertEqual(buff.read(), TEST_BINARY_DATA) + self.assertEqual(buff.tell(), len(TEST_BINARY_DATA)) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + def test_flush(self): + buff = SlidingBuffer() + + # Write and verify tell() still reports 0 and len is correct. + buff.write(TEST_BINARY_DATA) + self.assertEqual(buff.tell(), 0) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + # Read 8 bytes and verify tell reports correctly. + self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) + self.assertEqual(buff.tell(), 8) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + # Flush buffer and verify tell doesn't change but len does. + buff.flush() + self.assertEqual(buff.tell(), 8) + self.assertEqual(len(buff), len(TEST_BINARY_DATA) - 8) + + # Read remainder. + self.assertEqual(buff.read(), TEST_BINARY_DATA[8:]) + self.assertEqual(buff.tell(), len(TEST_BINARY_DATA)) + self.assertEqual(len(buff), len(TEST_BINARY_DATA[8:])) + + def test_seek(self): + buff = SlidingBuffer() + buff.write(TEST_BINARY_DATA) + + # Try to seek forward. Verify the tell() doesn't change. + with self.assertRaises(ValueError): + pos = buff.tell() + buff.seek(len(TEST_BINARY_DATA) + 1) + self.assertEqual(pos, buff.tell()) + + # Read 8 bytes, test seek backwards, read again, and flush. + self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) + buff.seek(0) + self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) + buff.flush() + self.assertEqual(buff.tell(), 8) + + # Try to seek to a byte that has already been flushed. + with self.assertRaises(ValueError): + pos = buff.tell() + buff.seek(0) + self.assertEqual(pos, buff.tell()) + + def test_close(self): + buff = SlidingBuffer() + buff.close() + with self.assertRaises(ValueError): + buff.read() + + +class TestBlobReaderText(unittest.TestCase): + def test_attributes(self): + blob = mock.Mock() + reader = io.TextIOWrapper(BlobReader(blob)) + self.assertTrue(reader.seekable()) + self.assertTrue(reader.readable()) + self.assertFalse(reader.writable()) + + def test_read(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.chunk_size = None + blob.size = len(TEST_TEXT_DATA.encode("utf-8")) + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # The TextIOWrapper class has an internally defined chunk size which + # will override ours. The wrapper class is not under test. + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_TEXT_DATA[0:1]) + blob.download_as_bytes.assert_called_once() + + # Read from buffered data only. + self.assertEqual(reader.read(3), TEST_TEXT_DATA[1:4]) + blob.download_as_bytes.assert_called_once() + + # Read all remaining data. + self.assertEqual(reader.read(), TEST_TEXT_DATA[4:]) + + # Seek to 0 and read all remaining data again. + reader.seek(0) + self.assertEqual(reader.read(), TEST_TEXT_DATA) + + reader.close() + + def test_multibyte_read(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.chunk_size = None + blob.size = len(TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")) + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # The TextIOWrapper class has an internally defined chunk size which + # will override ours. The wrapper class is not under test. + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_MULTIBYTE_TEXT_DATA[0:1]) + blob.download_as_bytes.assert_called_once() + + # Read from buffered data only. + self.assertEqual(reader.read(3), TEST_MULTIBYTE_TEXT_DATA[1:4]) + blob.download_as_bytes.assert_called_once() + + # Read all remaining data. + self.assertEqual(reader.read(), TEST_MULTIBYTE_TEXT_DATA[4:]) + + # Seek to 0 and read all remaining data again. + reader.seek(0) + self.assertEqual(reader.read(), TEST_MULTIBYTE_TEXT_DATA) + + reader.close() + + def test_seek(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.size = None + blob.chunk_size = None + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # Seek needs the blob size to work and should call reload() if the size + # is not known. Set a mock to initialize the size if reload() is called. + def initialize_size(**_): + blob.size = len(TEST_TEXT_DATA.encode("utf-8")) + + blob.reload = mock.Mock(side_effect=initialize_size) + + # Seek, forcing a blob reload in order to validate the seek doesn't + # exceed the end of the blob. + self.assertEqual(reader.seek(4), 4) + blob.reload.assert_called_once_with(**download_kwargs) + self.assertEqual(reader.read(4), TEST_TEXT_DATA[4:8]) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # Seek to beginning. The next read will need to download data again. + self.assertEqual(reader.seek(0), 0) + self.assertEqual(reader.read(), TEST_TEXT_DATA) + self.assertEqual(blob.download_as_bytes.call_count, 2) + + reader.close() + + def test_multibyte_seek(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.size = None + blob.chunk_size = None + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # Seek needs the blob size to work and should call reload() if the size + # is not known. Set a mock to initialize the size if reload() is called. + def initialize_size(**_): + blob.size = len(TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")) + + blob.reload = mock.Mock(side_effect=initialize_size) + + # Seek, forcing a blob reload in order to validate the seek doesn't + # exceed the end of the blob. + self.assertEqual(reader.seek(4), 4) + blob.reload.assert_called_once_with(**download_kwargs) + + # Seek to beginning. + self.assertEqual(reader.seek(0), 0) + self.assertEqual(reader.read(), TEST_MULTIBYTE_TEXT_DATA) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # tell() is an inherited method that uses seek(). + self.assertEqual(reader.tell(), len(TEST_MULTIBYTE_TEXT_DATA.encode("utf-8"))) + + reader.close() + + def test_close(self): + blob = mock.Mock() + reader = BlobReader(blob) + + reader.close() + + with self.assertRaises(ValueError): + reader.read() + + with self.assertRaises(ValueError): + reader.seek(0) + + +class TestBlobWriterText(unittest.TestCase): + def test_write(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer in text mode. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + unwrapped_writer = BlobWriter( + blob, + chunk_size=chunk_size, + text_mode=True, + num_retries=NUM_RETRIES, + content_type=PLAIN_CONTENT_TYPE, + ) + + writer = io.TextIOWrapper(unwrapped_writer) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: unwrapped_writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_MULTIBYTE_TEXT_DATA[0:2]) + blob.initiate_resumable_upload.assert_not_called() + + # Write all data and close. + writer.write(TEST_MULTIBYTE_TEXT_DATA[2:]) + writer.close() + + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + unwrapped_writer._buffer, + PLAIN_CONTENT_TYPE, + None, + NUM_RETRIES, + chunk_size=chunk_size, + ) + upload.transmit_next_chunk.assert_called_with(transport)