Skip to content

Commit

Permalink
feat: add blob.open() for file-like I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsg committed Feb 22, 2021
1 parent 3e69bf9 commit 6f4d679
Show file tree
Hide file tree
Showing 5 changed files with 1,013 additions and 0 deletions.
35 changes: 35 additions & 0 deletions google/cloud/storage/blob.py
Expand Up @@ -30,6 +30,7 @@
import copy
import hashlib
from io import BytesIO
from io import TextIOWrapper
import logging
import mimetypes
import os
Expand Down Expand Up @@ -78,6 +79,8 @@
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
from google.cloud.storage.fileio import BlobReader
from google.cloud.storage.fileio import BlobWriter


_API_ACCESS_ENDPOINT = "https://storage.googleapis.com"
Expand Down Expand Up @@ -3407,6 +3410,38 @@ def update_storage_class(
retry=retry,
)

def open(
self,
mode="r",
chunk_size=None,
encoding=None,
errors=None,
newline=None,
**kwargs
):
if mode == "rb":
return BlobReader(self, chunk_size=chunk_size, **kwargs)
elif mode == "wb":
return BlobWriter(self, chunk_size=chunk_size, **kwargs)
elif mode in ("r", "rt"):
return TextIOWrapper(
BlobReader(self, chunk_size=chunk_size, **kwargs),
encoding=encoding,
errors=errors,
newline=newline,
)
elif mode in ("w", "wt"):
return TextIOWrapper(
BlobWriter(self, chunk_size=chunk_size, text_mode=True, **kwargs),
encoding=encoding,
errors=errors,
newline=newline,
)
else:
raise NotImplementedError(
"Supported modes strings are 'r', 'rb', 'rt', 'w', 'wb', and 'wt' only."
)

cache_control = _scalar_property("cacheControl")
"""HTTP 'Cache-Control' header for this object.
Expand Down
327 changes: 327 additions & 0 deletions google/cloud/storage/fileio.py
@@ -0,0 +1,327 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io

# Resumable uploads require a chunk size of precisely a multiple of 256KiB.
DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 # 10 MiB (40 times minimum chunk size).

# Valid keyword arguments for download methods, and blob.reload() if needed.
# Note: Changes here need to be reflected in the blob.open() docstring.
VALID_DOWNLOAD_KWARGS = {
"if_generation_match",
"if_generation_not_match",
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
}

# Valid keyword arguments for upload methods.
# Note: Changes here need to be reflected in the blob.open() docstring.
VALID_UPLOAD_KWARGS = {
"content_type",
"num_retries",
"predefined_acl",
"if_generation_match",
"if_generation_not_match",
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"checksum",
}


class BlobReader(io.BufferedIOBase):
def __init__(self, blob, chunk_size=None, **download_kwargs):
"""docstring note that download_kwargs also used for reload()"""
for kwarg in download_kwargs:
if kwarg not in VALID_DOWNLOAD_KWARGS:
raise ValueError(
"BlobReader does not support keyword argument {}.".format(kwarg)
)

self._blob = blob
self._pos = 0
self._buffer = io.BytesIO()
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
self._download_kwargs = download_kwargs

def read(self, size=-1):
self._checkClosed() # Raises ValueError if closed.

result = self._buffer.read(size)
# If the read request demands more bytes than are buffered, fetch more.
remaining_size = size - len(result)
if remaining_size > 0 or size < 0:
self._buffer.seek(0)
self._buffer.truncate(0) # Clear the buffer to make way for new data.
fetch_start = self._pos + len(result)
if size > 0:
# Fetch the larger of self._chunk_size or the remaining_size.
fetch_end = fetch_start + max(remaining_size, self._chunk_size)
else:
fetch_end = None

# Download the blob.
result += self._blob.download_as_bytes(
start=fetch_start, end=fetch_end, **self._download_kwargs
)

# If more bytes were read than is immediately needed, buffer the
# remainder and then trim the result.
if size > 0 and len(result) > size:
self._buffer.write(result[size:])
self._buffer.seek(0)
result = result[:size]

self._pos += len(result)

return result

def read1(self, size=-1):
return self.read(size)

def seek(self, pos, whence=0):
"""Seek within the blob.
This implementation of seek() uses knowledge of the blob size to
validate that the reported position does not exceed the blob last byte.
If the blob size is not already known it will call blob.reload().
"""
self._checkClosed() # Raises ValueError if closed.

if self._blob.size is None:
self._blob.reload(**self._download_kwargs)

initial_pos = self._pos

if whence == 0:
self._pos = pos
elif whence == 1:
self._pos += pos
elif whence == 2:
self._pos = self._blob.size + pos
if whence not in {0, 1, 2}:
raise ValueError("invalid whence value")

if self._pos > self._blob.size:
self._pos = self._blob.size

# Seek or invalidate buffer as needed.
difference = self._pos - initial_pos
new_buffer_pos = self._buffer.seek(difference, 1)
if new_buffer_pos != difference: # Buffer does not contain new pos.
# Invalidate buffer.
self._buffer.seek(0)
self._buffer.truncate(0)

return self._pos

def close(self):
self._buffer.close()

def _checkClosed(self):
if self._buffer.closed:
raise ValueError("I/O operation on closed file.")

def readable(self):
return True

def writable(self):
return False

def seekable(self):
return True


class BlobWriter(io.BufferedIOBase):
def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs):
for kwarg in upload_kwargs:
if kwarg not in VALID_UPLOAD_KWARGS:
raise ValueError(
"BlobWriter does not support keyword argument {}.".format(kwarg)
)
self._blob = blob
self._buffer = SlidingBuffer()
self._upload_and_transport = None
# Resumable uploads require a chunk size of a multiple of 256KiB.
# self._chunk_size must not be changed after the upload is initiated.
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
# In text mode this class will be wrapped and TextIOWrapper requires a
# different behavior of flush().
self._text_mode = text_mode
self._upload_kwargs = upload_kwargs

def write(self, b):
self._checkClosed() # Raises ValueError if closed.

pos = self._buffer.write(b)

# If there is enough content, upload chunks.
num_chunks = len(self._buffer) // self._chunk_size
if num_chunks:
self._upload_chunks_from_buffer(num_chunks)

return pos

def _initiate_upload(self):
num_retries = self._upload_kwargs.pop("num_retries", None)
content_type = self._upload_kwargs.pop("content_type", None)

if (
self._upload_kwargs.get("if_metageneration_match") is None
and num_retries is None
):
# Uploads are only idempotent (safe to retry) if
# if_metageneration_match is set. If it is not set, the default
# num_retries should be 0. Note: Because retry logic for uploads is
# provided by the google-resumable-media-python package, it doesn't
# use the ConditionalRetryStrategy class used in other API calls in
# this library to solve this problem.
num_retries = 0

self._upload_and_transport = self._blob._initiate_resumable_upload(
self._blob.bucket.client,
self._buffer,
content_type,
None,
num_retries,
chunk_size=self._chunk_size,
**self._upload_kwargs
)

def _upload_chunks_from_buffer(self, num_chunks):
"""Upload a specified number of chunks."""

# Initialize the upload if necessary.
if not self._upload_and_transport:
self._initiate_upload()

upload, transport = self._upload_and_transport

# Upload chunks. The SlidingBuffer class will manage seek position.
for _ in range(num_chunks):
upload.transmit_next_chunk(transport)

# Wipe the buffer of chunks uploaded, preserving any remaining data.
self._buffer.flush()

def tell(self):
return self._buffer.tell() + len(self._buffer)

def flush(self):
if self._text_mode:
# TextIOWrapper expects this method to succeed before calling close().
return

raise io.UnsupportedOperation(
"Cannot flush without finalizing upload. Use close() instead."
)

def close(self):
self._checkClosed() # Raises ValueError if closed.

self._upload_chunks_from_buffer(1)
self._buffer.close()

def _checkClosed(self):
if self._buffer.closed:
raise ValueError("I/O operation on closed file.")

def readable(self):
return False

def writable(self):
return True

def seekable(self):
return False


class SlidingBuffer(object):
"""A non-rewindable buffer that frees memory of chunks already consumed.
This class is necessary because `google-resumable-media-python` expects
`tell()` to work relative to the start of the file, not relative to a place
in an intermediate buffer. Using this class, we present an external
interface with consistent seek and tell behavior without having to actually
store bytes already sent.
Behavior of this class differs from an ordinary BytesIO buffer. `write()`
will always append to the end of the file only and not change the seek
position otherwise. `flush()` will delete all data already read (data to the
left of the seek position). `tell()` will report the seek position of the
buffer including all deleted data. Additionally the class implements
__len__() which will report the size of the actual underlying buffer.
This class does not attempt to implement the entire Python I/O interface.
"""

def __init__(self):
self._buffer = io.BytesIO()
self._cursor = 0

def write(self, b):
"""Append to the end of the buffer without changing the position."""
self._checkClosed() # Raises ValueError if closed.

bookmark = self._buffer.tell()
self._buffer.seek(0, io.SEEK_END)
pos = self._buffer.write(b)
self._buffer.seek(bookmark)
return self._cursor + pos

def read(self, size=-1):
"""Read and move the cursor."""
self._checkClosed() # Raises ValueError if closed.

data = self._buffer.read(size)
self._cursor += len(data)
return data

def flush(self):
"""Delete already-read data (all data to the left of the position)."""
self._checkClosed() # Raises ValueError if closed.

# BytesIO can't be deleted from the left, so save any leftover, unread
# data and truncate at 0, then readd leftover data.
leftover = self._buffer.read()
self._buffer.seek(0)
self._buffer.truncate(0)
self._buffer.write(leftover)
self._buffer.seek(0)

def tell(self):
"""Report how many bytes have been read from the buffer in total."""
return self._cursor

def seek(self, pos, whence=None):
raise io.UnsupportedOperation("seek() is not supported for this class.")

def __len__(self):
"""Determine the size of the buffer by seeking to the end."""
bookmark = self._buffer.tell()
length = self._buffer.seek(0, io.SEEK_END)
self._buffer.seek(bookmark)
return length

def close(self):
return self._buffer.close()

def _checkClosed(self):
return self._buffer._checkClosed()

@property
def closed(self):
return self._buffer.closed

0 comments on commit 6f4d679

Please sign in to comment.