Skip to content

Commit

Permalink
feat: add support for allow_missing to interfaces
Browse files Browse the repository at this point in the history
perf: add exception handling for NotFound exceptions on GCS
  • Loading branch information
william-silversmith committed Mar 15, 2024
1 parent 272a731 commit b512841
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions cloudfiles/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from .compression import COMPRESSION_TYPES
from .connectionpools import S3ConnectionPool, GCloudBucketPool, MemoryPool, MEMORY_DATA
from .exceptions import MD5IntegrityError
from .exceptions import MD5IntegrityError, CompressionError
from .lib import mkdir, sip, md5, validate_s3_multipart_etag
from .secrets import http_credentials, CLOUD_FILES_DIR, CLOUD_FILES_LOCK_DIR

Expand Down Expand Up @@ -82,6 +82,21 @@ def reset_connection_pools():
wait=tenacity.wait_random_exponential(0.5, 60.0),
)

def retry_if_not(exception_type):
if type(exception_type) != list:
exception_type = [ exception_type ]

conditions = tenacity.retry_if_not_exception_type(exception_type[0])
for et in exception_type[1:]:
conditions = conditions | tenacity.retry_if_not_exception_type(et)

return tenacity.retry(
retry=conditions,
reraise=True,
stop=tenacity.stop_after_attempt(7),
wait=tenacity.wait_random_exponential(0.5, 60.0),
)

class StorageInterface(object):
exists_batch_size = 1
delete_batch_size = 1
Expand Down Expand Up @@ -528,7 +543,7 @@ def __init__(self, path, secrets=None, request_payer=None, **kwargs):
def get_path_to_file(self, file_path):
return posixpath.join(self._path.path, file_path)

@retry
@retry_if_not(CompressionError)
def put_file(self, file_path, content, content_type,
compress, cache_control=None, storage_class=None):
key = self.get_path_to_file(file_path)
Expand All @@ -545,7 +560,7 @@ def put_file(self, file_path, content, content_type,
elif compress in ("bzip2", "bz2"):
blob.content_encoding = "bz2"
elif compress:
raise ValueError("Compression type {} not supported.".format(compress))
raise CompressionError("Compression type {} not supported.".format(compress))

if cache_control:
blob.cache_control = cache_control
Expand All @@ -562,11 +577,17 @@ def copy_file(self, src_path, dest_bucket, dest_key):
with GCS_BUCKET_POOL_LOCK:
pool = GC_POOL[GCloudBucketPoolParams(dest_bucket, self._request_payer)]
dest_bucket = pool.get_connection(self._secrets, None)
self._bucket.copy_blob(
source_blob, dest_bucket, dest_key
)

@retry
try:
self._bucket.copy_blob(
source_blob, dest_bucket, dest_key
)
except google.api_core.exceptions.NotFound:
return False

return True

@retry_if_not(google.cloud.exceptions.NotFound)
def get_file(self, file_path, start=None, end=None, part_size=None):
key = self.get_path_to_file(file_path)
blob = self._bucket.blob( key )
Expand All @@ -590,7 +611,7 @@ def get_file(self, file_path, start=None, end=None, part_size=None):

return (content, blob.content_encoding, hash_value, hash_type)

@retry
@retry_if_not(google.cloud.exceptions.NotFound)
def head(self, file_path):
key = self.get_path_to_file(file_path)
blob = self._bucket.get_blob(key)
Expand All @@ -609,15 +630,15 @@ def head(self, file_path):
"Component-Count": blob.component_count,
}

@retry
@retry_if_not(google.cloud.exceptions.NotFound)
def size(self, file_path):
key = self.get_path_to_file(file_path)
blob = self._bucket.get_blob(key)
if blob:
return blob.size
return None

@retry
@retry_if_not(google.cloud.exceptions.NotFound)
def exists(self, file_path):
key = self.get_path_to_file(file_path)
blob = self._bucket.blob(key)
Expand Down Expand Up @@ -907,7 +928,15 @@ def copy_file(self, src_path, dest_bucket_name, dest_key):
'Bucket': self._path.bucket,
'Key': key,
}
dest_bucket.copy(CopySource=copy_source, Bucket=dest_bucket_name, Key=dest_key)
try:
dest_bucket.copy(CopySource=copy_source, Bucket=dest_bucket_name, Key=dest_key)
except botocore.exceptions.ClientError as err:
if err.response['Error']['Code'] == 'NoSuchKey':
return False
else:
raise

return True

@retry
def get_file(self, file_path, start=None, end=None, part_size=None):
Expand Down

0 comments on commit b512841

Please sign in to comment.