Skip to content

Commit

Permalink
chore: add iterdir and open for read/write streams
Browse files Browse the repository at this point in the history
 - annoyingly GCS doesn't support file-like objects: googleapis/python-storage#29
 - use a small library for doing file-like object support for GCS: https://github.com/xbrianh/gs-chunked-io
  • Loading branch information
justindujardin committed Mar 13, 2020
1 parent b1744fa commit d1648b0
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 125 deletions.
109 changes: 66 additions & 43 deletions gcspath.py
@@ -1,11 +1,13 @@
"""gcspath provides a Pythonic API to GCS by wrapping google.cloud.storage with
a pathlib interface."""
from typing import Optional, Iterable, Union
from contextlib import suppress
from collections import namedtuple
from tempfile import NamedTemporaryFile
from functools import wraps, partial, lru_cache
from pathlib import _PosixFlavour, _Accessor, PurePath, Path
from io import RawIOBase, DEFAULT_BUFFER_SIZE, UnsupportedOperation
import gs_chunked_io

try:
from google.cloud import storage
Expand Down Expand Up @@ -67,6 +69,20 @@ def __init__(self, **kwargs):
self.gcs = storage.Client()
self.configuration_map = _GCSConfigurationMap()

def _get_blob(self, path: "GCSPath") -> Optional[storage.Blob]:
"""Get the blob associated with a path or return None"""
bucket_name = self._bucket_name(path.bucket)
if not bucket_name:
return None
try:
bucket = self.gcs.lookup_bucket(bucket_name)
except gcs_errors.ClientError:
return None
if bucket is None:
return None
key_name = str(path.key)
return bucket.get_blob(key_name)

def stat(self, path):
bucket = self.gcs.get_bucket(self._bucket_name(path.bucket))
blob: storage.Blob = bucket.get_blob(str(path.key))
Expand Down Expand Up @@ -147,14 +163,21 @@ def listdir(self, path):
def open(
self, path, *, mode="r", buffering=-1, encoding=None, errors=None, newline=None
):
bucket_name = self._bucket_name(path.bucket)
key_name = str(path.key)
object_summery = self.gcs.ObjectSummary(bucket_name, key_name)
file_object = (
GCSKeyReadableFileObject if "r" in mode else GCSKeyWritableFileObject
)
return file_object(
object_summery,
object_blob = self._get_blob(path)
if object_blob is None and "w" not in mode:
raise FileNotFoundError(str(path))
if "r" in mode:
return GCSKeyReadableFileObject(
object_blob,
path=path,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
)
return GCSKeyWritableFileObject(
self.gcs.lookup_bucket(self._bucket_name(path.bucket)),
path=path,
mode=mode,
buffering=buffering,
Expand All @@ -166,15 +189,13 @@ def open(
def owner(self, path):
bucket_name = self._bucket_name(path.bucket)
key_name = str(path.key)
object_summery = self.gcs.ObjectSummary(bucket_name, key_name)
# return object_summery.owner['DisplayName']
object_data = self.gcs.ObjectSummary(bucket_name, key_name)
# return object_data.owner['DisplayName']
# This is a hack till boto3 resolve this issue:
# https://github.com/boto/boto3/issues/1950
# todo: need to clean up
responce = object_summery.meta.client.list_objects_v2(
Bucket=object_summery.bucket_name,
Prefix=object_summery.key,
FetchOwner=True,
responce = object_data.meta.client.list_objects_v2(
Bucket=object_data.bucket_name, Prefix=object_data.key, FetchOwner=True,
)
return responce["Contents"][0]["Owner"]["DisplayName"]

Expand All @@ -186,30 +207,30 @@ def rename(self, path, target):

if not self.is_dir(path):
target_bucket = self.gcs.Bucket(target_bucket_name)
object_summery = self.gcs.ObjectSummary(source_bucket_name, source_key_name)
object_data = self.gcs.ObjectSummary(source_bucket_name, source_key_name)
old_source = {
"Bucket": object_summery.bucket_name,
"Key": object_summery.key,
"Bucket": object_data.bucket_name,
"Key": object_data.key,
}
self.boto3_method_with_parameters(
target_bucket.copy, path=target, args=(old_source, target_key_name)
)
self.boto3_method_with_parameters(object_summery.delete)
self.boto3_method_with_parameters(object_data.delete)
return
bucket = self.gcs.Bucket(source_bucket_name)
target_bucket = self.gcs.Bucket(target_bucket_name)
for object_summery in bucket.objects.filter(Prefix=source_key_name):
for object_data in bucket.objects.filter(Prefix=source_key_name):
old_source = {
"Bucket": object_summery.bucket_name,
"Key": object_summery.key,
"Bucket": object_data.bucket_name,
"Key": object_data.key,
}
new_key = object_summery.key.replace(source_key_name, target_key_name)
new_key = object_data.key.replace(source_key_name, target_key_name)
self.boto3_method_with_parameters(
target_bucket.copy,
path=GCSPath(target_bucket_name, new_key),
args=(old_source, new_key),
)
self.boto3_method_with_parameters(object_summery.delete)
self.boto3_method_with_parameters(object_data.delete)

def replace(self, path, target):
return self.rename(path, target)
Expand All @@ -218,8 +239,8 @@ def rmdir(self, path):
bucket_name = self._bucket_name(path.bucket)
key_name = str(path.key)
bucket = self.gcs.Bucket(bucket_name)
for object_summery in bucket.objects.filter(Prefix=key_name):
self.boto3_method_with_parameters(object_summery.delete, path=path)
for object_data in bucket.objects.filter(Prefix=key_name):
self.boto3_method_with_parameters(object_data.delete, path=path)

def mkdir(self, path, mode):
self.boto3_method_with_parameters(
Expand Down Expand Up @@ -273,7 +294,7 @@ def _string_parser(text, *, mode, encoding):
if "b" in mode:
return text
return text.obj.decode(encoding or "utf-8")
if isinstance(text, bytes):
if isinstance(text, (bytes, bytearray)):
if "b" in mode:
return text
return text.decode(encoding or "utf-8")
Expand Down Expand Up @@ -706,7 +727,7 @@ def _init(self, template=None):
class GCSKeyWritableFileObject(RawIOBase):
def __init__(
self,
object_summery,
bucket: storage.Bucket,
*,
path,
mode="w",
Expand All @@ -716,8 +737,8 @@ def __init__(
newline=None
):
super().__init__()
self.object_summery = object_summery
self.path = path
self.bucket: storage.Bucket = bucket
self.path: GCSPath = path
self.mode = mode
self.buffering = buffering
self.encoding = encoding
Expand Down Expand Up @@ -752,19 +773,17 @@ def writable(self, *args, **kwargs):
return "w" in self.mode

@writable_check
def write(self, text):
def write(self, text: Union[bytes, bytearray]):
self._cache.write(self._string_parser(text))
self._cache.seek(0)
_gcs_accessor.boto3_method_with_parameters(
self.object_summery.put, path=self.path, kwargs={"Body": self._cache}
)
with gs_chunked_io.Writer(str(self.path.key), self.bucket) as writer:
writer.write(self._cache.read())

def writelines(self, lines):
self.write(
self._string_parser("\n").join(self._string_parser(line) for line in lines)
)
def writelines(self, lines: Iterable[Union[bytes, bytearray]]):
strings = [self._string_parser(line) for line in lines]
self.write("\n".join(strings))

def readable(self):
def readable(self) -> bool:
return False

def read(self, *args, **kwargs):
Expand All @@ -773,11 +792,17 @@ def read(self, *args, **kwargs):
def readlines(self, *args, **kwargs):
raise UnsupportedOperation("not readable")

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


class GCSKeyReadableFileObject(RawIOBase):
def __init__(
self,
object_summery,
blob: storage.Blob,
*,
path,
mode="b",
Expand All @@ -787,7 +812,7 @@ def __init__(
newline=None
):
super().__init__()
self.object_summery = object_summery
self.blob = blob
self.path = path
self.mode = mode
self.buffering = buffering
Expand Down Expand Up @@ -831,9 +856,7 @@ def readable(self):
return False
with suppress(gcs_errors.ClientError):
if self._streaming_body is None:
self._streaming_body = _gcs_accessor.boto3_method_with_parameters(
self.object_summery.get, path=self.path
)["Body"]
self._streaming_body = gs_chunked_io.Reader(self.blob)
return True
return False

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
@@ -1 +1,2 @@
google-cloud-storage
google-cloud-storage
gs-chunked-io

0 comments on commit d1648b0

Please sign in to comment.