Skip to content

Commit

Permalink
feat: add ignore_flush parameter to BlobWriter (#644)
Browse files Browse the repository at this point in the history
* feat: add ignore_flush parameter to BlobWriter

* address feedback
  • Loading branch information
andrewsg committed Nov 4, 2021
1 parent 49f78b0 commit af9c9dc
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 17 deletions.
33 changes: 31 additions & 2 deletions google/cloud/storage/blob.py
Expand Up @@ -3760,6 +3760,7 @@ def open(
self,
mode="r",
chunk_size=None,
ignore_flush=None,
encoding=None,
errors=None,
newline=None,
Expand Down Expand Up @@ -3801,6 +3802,19 @@ def open(
chunk_size for writes must be exactly a multiple of 256KiB as with
other resumable uploads. The default is 40 MiB.
:type ignore_flush: bool
:param ignore_flush:
(Optional) For non text-mode writes, makes flush() do nothing
instead of raising an error. flush() without closing is not
supported by the remote service and therefore calling it normally
results in io.UnsupportedOperation. However, that behavior is
incompatible with some consumers and wrappers of file objects in
Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting
ignore_flush will cause flush() to successfully do nothing, for
compatibility with those contexts. The correct way to actually flush
data to the remote server is to close() (using a context manager,
such as in the example, will cause this to happen automatically).
:type encoding: str
:param encoding:
(Optional) For text mode only, the name of the encoding that the stream will
Expand Down Expand Up @@ -3873,23 +3887,38 @@ def open(
raise ValueError(
"encoding, errors and newline arguments are for text mode only"
)
if ignore_flush:
raise ValueError(
"ignore_flush argument is for non-text write mode only"
)
return BlobReader(self, chunk_size=chunk_size, **kwargs)
elif mode == "wb":
if encoding or errors or newline:
raise ValueError(
"encoding, errors and newline arguments are for text mode only"
)
return BlobWriter(self, chunk_size=chunk_size, **kwargs)
return BlobWriter(
self, chunk_size=chunk_size, ignore_flush=ignore_flush, **kwargs
)
elif mode in ("r", "rt"):
if ignore_flush:
raise ValueError(
"ignore_flush argument is for non-text write mode only"
)
return TextIOWrapper(
BlobReader(self, chunk_size=chunk_size, **kwargs),
encoding=encoding,
errors=errors,
newline=newline,
)
elif mode in ("w", "wt"):
if ignore_flush is False:
raise ValueError(
"ignore_flush is required for text mode writing and "
"cannot be set to False"
)
return TextIOWrapper(
BlobWriter(self, chunk_size=chunk_size, text_mode=True, **kwargs),
BlobWriter(self, chunk_size=chunk_size, ignore_flush=True, **kwargs),
encoding=encoding,
errors=errors,
newline=newline,
Expand Down
41 changes: 27 additions & 14 deletions google/cloud/storage/fileio.py
Expand Up @@ -229,11 +229,23 @@ class BlobWriter(io.BufferedIOBase):
writes must be exactly a multiple of 256KiB as with other resumable
uploads. The default is the chunk_size of the blob, or 40 MiB.
:type text_mode: boolean
:type text_mode: bool
:param text_mode:
Whether this class is wrapped in 'io.TextIOWrapper'. Toggling this
changes the behavior of flush() to conform to TextIOWrapper's
expectations.
(Deprecated) A synonym for ignore_flush. For backwards-compatibility,
if True, sets ignore_flush to True. Use ignore_flush instead. This
parameter will be removed in a future release.
:type ignore_flush: bool
:param ignore_flush:
Makes flush() do nothing instead of raise an error. flush() without
closing is not supported by the remote service and therefore calling it
on this class normally results in io.UnsupportedOperation. However, that
behavior is incompatible with some consumers and wrappers of file
objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting
ignore_flush will cause flush() to successfully do nothing, for
compatibility with those contexts. The correct way to actually flush
data to the remote server is to close() (using this object as a context
manager is recommended).
:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry:
Expand Down Expand Up @@ -278,6 +290,7 @@ def __init__(
blob,
chunk_size=None,
text_mode=False,
ignore_flush=False,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
**upload_kwargs
):
Expand All @@ -292,9 +305,8 @@ def __init__(
# Resumable uploads require a chunk size of a multiple of 256KiB.
# self._chunk_size must not be changed after the upload is initiated.
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
# In text mode this class will be wrapped and TextIOWrapper requires a
# different behavior of flush().
self._text_mode = text_mode
# text_mode is a deprecated synonym for ignore_flush
self._ignore_flush = ignore_flush or text_mode
self._retry = retry
self._upload_kwargs = upload_kwargs

Expand Down Expand Up @@ -394,13 +406,14 @@ def tell(self):
return self._buffer.tell() + len(self._buffer)

def flush(self):
if self._text_mode:
# TextIOWrapper expects this method to succeed before calling close().
return

raise io.UnsupportedOperation(
"Cannot flush without finalizing upload. Use close() instead."
)
# flush() is not fully supported by the remote service, so raise an
# error here, unless self._ignore_flush is set.
if not self._ignore_flush:
raise io.UnsupportedOperation(
"Cannot flush without finalizing upload. Use close() instead, "
"or set ignore_flush=True when constructing this class (see "
"docstring)."
)

def close(self):
self._checkClosed() # Raises ValueError if closed.
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/test_blob.py
Expand Up @@ -5585,13 +5585,21 @@ def test_open(self):
self.assertEqual(type(f.buffer), BlobWriter)
f = blob.open("wb")
self.assertEqual(type(f), BlobWriter)
f = blob.open("wb", ignore_flush=True)
self.assertTrue(f._ignore_flush)

with self.assertRaises(NotImplementedError):
blob.open("a")
with self.assertRaises(ValueError):
blob.open("rb", encoding="utf-8")
with self.assertRaises(ValueError):
blob.open("wb", encoding="utf-8")
with self.assertRaises(ValueError):
blob.open("r", ignore_flush=True)
with self.assertRaises(ValueError):
blob.open("rb", ignore_flush=True)
with self.assertRaises(ValueError):
blob.open("w", ignore_flush=False)


class Test__quote(unittest.TestCase):
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/test_fileio.py
Expand Up @@ -272,6 +272,13 @@ def test_attributes_explicit(self):
self.assertEqual(writer._chunk_size, 512 * 1024)
self.assertEqual(writer._retry, DEFAULT_RETRY)

def test_deprecated_text_mode_attribute(self):
blob = mock.Mock()
blob.chunk_size = 256 * 1024
writer = self._make_blob_writer(blob, text_mode=True)
self.assertTrue(writer._ignore_flush)
writer.flush() # This should do nothing and not raise an error.

def test_reject_wrong_chunk_size(self):
blob = mock.Mock()
blob.chunk_size = 123
Expand Down Expand Up @@ -857,7 +864,7 @@ def test_write(self, mock_warn):
unwrapped_writer = self._make_blob_writer(
blob,
chunk_size=chunk_size,
text_mode=True,
ignore_flush=True,
num_retries=NUM_RETRIES,
content_type=PLAIN_CONTENT_TYPE,
)
Expand Down

0 comments on commit af9c9dc

Please sign in to comment.