Skip to content

Commit

Permalink
Feat/dev 506 large file download (#276)
Browse files Browse the repository at this point in the history
DEV-506: fix blocking issue on failed segment retry

When a worker fails to write its segment, the worker
would immediately exit, causing the main process to block
as it is waiting for all the child processes to read from the
work queue a final message.
  • Loading branch information
jiakf committed Apr 13, 2021
1 parent 3ba1281 commit e84ad87
Show file tree
Hide file tree
Showing 9 changed files with 600 additions and 123 deletions.
6 changes: 4 additions & 2 deletions gdc_client/common/config.py
Expand Up @@ -40,7 +40,7 @@ class GDCClientConfigShared(object):
"n_processes": ConfigParser.getint,
"retry_amount": ConfigParser.getint,
"wait_time": ConfigParser.getfloat,
"no_segment_md5sum": ConfigParser.getboolean,
"no_segment_md5sums": ConfigParser.getboolean,
"no_file_md5sum": ConfigParser.getboolean,
"no_verify": ConfigParser.getboolean,
"no_related_files": ConfigParser.getboolean,
Expand All @@ -49,6 +49,7 @@ class GDCClientConfigShared(object):
"insecure": ConfigParser.getboolean,
"disable_multipart": ConfigParser.getboolean,
"path": ConfigParser.get,
"latest": ConfigParser.getboolean,
}

def __init__(self, config_path=None):
Expand All @@ -71,14 +72,15 @@ def defaults(self):
"dir": ".",
"save_interval": SAVE_INTERVAL,
"http_chunk_size": HTTP_CHUNK_SIZE,
"no_segment_md5sum": False,
"no_segment_md5sums": False,
"no_file_md5sum": False,
"no_verify": False,
"no_related_files": False,
"no_annotations": False,
"no_auto_retry": False,
"retry_amount": 1,
"wait_time": 5.0,
"latest": False,
},
"upload": {
"path": ".",
Expand Down
4 changes: 4 additions & 0 deletions gdc_client/exceptions.py
@@ -1,2 +1,6 @@
class ClientError(Exception):
"""Base client error."""


class MD5ValidationError(Exception):
"""Base MD5 validation error."""
51 changes: 23 additions & 28 deletions gdc_client/parcel/client.py
Expand Up @@ -110,25 +110,6 @@ def stop_timer(self, file_size=None):

log.debug("Download complete" + rate_info)

def validate_file_md5sum(self, stream):

if not stream.check_file_md5sum:
log.debug("checksum validation disabled")
return

log.debug("Validating checksum...")

if not stream.is_regular_file:
raise Exception("Not a regular file")

if not stream.md5sum:
raise Exception(
"Cannot validate this file since the server did not provide an md5sum. Use the '--no-file-md5sum' option to ignore this error."
)

if utils.md5sum_whole_file(stream.path) != stream.md5sum:
raise Exception("File checksum is invalid")

def download_files(self, urls, *args, **kwargs):
"""Download a list of files.
Expand Down Expand Up @@ -158,10 +139,16 @@ def download_files(self, urls, *args, **kwargs):

# Download file
try:
# validate temporary file before renaming to permanent file location
self.parallel_download(stream)
utils.validate_file_md5sum(
stream,
stream.temp_path
if os.path.isfile(stream.temp_path)
else stream.path,
)
if os.path.isfile(stream.temp_path):
utils.remove_partial_extension(stream.temp_path)
self.validate_file_md5sum(stream)
downloaded.append(url)

# Handle file download error, store error to print out later
Expand All @@ -182,15 +169,11 @@ def download_files(self, urls, *args, **kwargs):
return downloaded, errors

def serial_download(self, stream):
"""Download file to directory serially.
"""
"""Download file to directory serially."""
self._download(1, stream)

def parallel_download(self, stream):
"""Download file to directory in parallel.
"""
"""Download file to directory in parallel."""
self._download(self.n_procs, stream)

def _download(self, nprocs, stream):
Expand All @@ -216,19 +199,31 @@ def _download(self, nprocs, stream):
n_procs = 1 if stream.size < 0.01 * const.GB else nprocs
producer = SegmentProducer(stream, n_procs)

if producer.done:
return

def download_worker():
while True:
try:
segment = producer.q_work.get()
if segment is None:
return log.debug("Producer returned with no more work")
log.debug("Producer returned with no more work")
return
stream.write_segment(segment, producer.q_complete)
# write_segment completed successfully, send sentinel value
# to master process to indicate a task was completed
producer.q_complete.put(None)
except Exception as e:
# send sentinel value to master process even though
# write_segment failed to indicate a task is "finished"
producer.q_complete.put(None)
if self.debug:
raise
else:
log.error("Download aborted: {0}".format(str(e)))
break
# worker needs to stay alive until final sentinel value
# from master process is received
continue

# Divide work amongst process pool
pool = [Process(target=download_worker) for i in range(n_procs)]
Expand Down

0 comments on commit e84ad87

Please sign in to comment.