Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #28 from sernst/27-enhanced-syncing
Browse files Browse the repository at this point in the history
Closes #27 enhanced syncing.
  • Loading branch information
sernst committed May 29, 2018
2 parents 6dbaa67 + 2bd9ab5 commit 6701325
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 288 deletions.
14 changes: 11 additions & 3 deletions cauldron/cli/server/routes/synchronize/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import os
import tempfile

import cauldron as cd
import flask

import cauldron as cd
from cauldron import cli
from cauldron import writer
from cauldron.cli import sync
Expand Down Expand Up @@ -115,6 +116,7 @@ def sync_open_project():
sync_status.update({}, time=-1, project=None)

open_response = project_opener.open_project(project_folder, forget=True)
open_response.join()
project = cd.project.internal_project
project.remote_source_directory = source_directory

Expand Down Expand Up @@ -142,6 +144,7 @@ def sync_source_file():
index = args.get('index', 0)
sync_time = args.get('sync_time', -1)
location = args.get('location', 'project')
offset = args.get('offset', 0)

if None in [relative_path, chunk]:
return r.fail(
Expand Down Expand Up @@ -173,11 +176,16 @@ def sync_source_file():
if not os.path.exists(parent_directory):
os.makedirs(parent_directory)

sync.io.write_file_chunk(file_path, chunk, append=index > 0)
sync.io.write_file_chunk(
file_path=file_path,
packed_chunk=chunk,
append=index > 0,
offset=offset
)

sync_status.update({}, time=sync_time)

print('SAVED CHUNK:', file_path)
print('SAVED CHUNK:', offset, file_path)
return r.notify(
kind='SUCCESS',
code='SAVED_CHUNK',
Expand Down
68 changes: 48 additions & 20 deletions cauldron/cli/sync/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def assemble_url(
:return:
The fully-resolved URL for the given endpoint
"""

url_root = (
remote_connection.url
if remote_connection else
Expand Down Expand Up @@ -51,7 +50,6 @@ def parse_http_response(http_response: HttpResponse) -> 'environ.Response':
:return:
The Cauldron response object for the given http response
"""

try:
response = environ.Response.deserialize(http_response.json())
except Exception as error:
Expand All @@ -67,35 +65,65 @@ def parse_http_response(http_response: HttpResponse) -> 'environ.Response':
return response


def get_request_function(data: dict = None, method: str = None):
""" """

default_method = 'post' if data else 'get'
return getattr(requests, method.lower() if method else default_method)


def send_request(
endpoint: str,
data: dict = None,
remote_connection: 'environ.RemoteConnection' = None,
method: str = None,
timeout: int = 10,
max_retries: int = 10,
**kwargs
) -> 'environ.Response':
""" """
"""
Sends a request to the remote kernel specified by the RemoteConnection
object and processes the result. If the request fails or times out it
will be retried until the max retries is reached. After that a failed
response will be returned instead.
:param endpoint:
Remote endpoint where the request will be directed.
:param data:
An optional JSON-serializable dictionary containing the request
body data.
:param remote_connection:
Defines the connection to the remote server where the request will
be sent.
:param method:
The HTTP method type for the request, e.g. GET, POST.
:param timeout:
Number of seconds before the request aborts when trying to either
connect to the target endpoint or receive data from the server.
:param max_retries:
Number of retry attempts to make before giving up if a non-HTTP
error is encountered during communication.
"""
if max_retries < 0:
return environ.Response().fail(
code='COMMUNICATION_ERROR',
error=None,
message='Unable to communicate with the remote kernel.'
).console(whitespace=1).response

url = assemble_url(endpoint, remote_connection)
func = get_request_function(data, method)

try:
http_response = func(url, json=data, **kwargs)
except Exception as error:
return environ.Response().fail(
code='CONNECTION_ERROR',
error=error,
message='Unable to communicate with the remote connection'
).console(
whitespace=1
).response
http_response = requests.request(
method=method,
url=url,
json=data,
timeout=10,
**kwargs
)
except (requests.ConnectionError, requests.HTTPError, requests.Timeout):
return send_request(
endpoint=endpoint,
data=data,
remote_connection=remote_connection,
method=method,
timeout=timeout,
max_retries=max_retries - 1,
**kwargs
)

return parse_http_response(http_response)

Expand Down
28 changes: 14 additions & 14 deletions cauldron/cli/sync/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@
def send_chunk(
chunk: str,
index: int,
offset: int,
relative_path: str,
file_kind: str = '',
remote_connection: 'environ.RemoteConnection' = None,
sync_time: float = -1
):
""" """

args = dict(
relative_path=relative_path,
chunk=chunk,
type=file_kind,
index=index,
sync_time=time.time() if sync_time < 0 else sync_time
)

return sync.comm.send_request(
endpoint='/sync-file',
remote_connection=remote_connection,
data=args
data=dict(
relative_path=relative_path,
chunk=chunk,
offset=offset,
type=file_kind,
index=index,
sync_time=time.time() if sync_time < 0 else sync_time
)
)


Expand All @@ -44,7 +43,6 @@ def send(
sync_time: float = -1
) -> Response:
""" """

response = Response()
sync_time = time.time() if sync_time < 0 else sync_time
callback = (
Expand Down Expand Up @@ -91,15 +89,18 @@ def get_progress(complete_count: int = 0) -> typing.Tuple[int, str]:
)
))

offset = 0
for index, chunk in enumerate(chunks):
response = send_chunk(
chunk=chunk,
index=index,
offset=offset,
relative_path=relative_path,
file_kind=file_kind,
remote_connection=remote_connection,
sync_time=sync_time
)
offset += len(chunk)

if response.failed:
return response
Expand Down Expand Up @@ -146,7 +147,6 @@ def send_all_in(
sync_time: float = -1
) -> Response:
""" """

sync_time = time.time() if sync_time < 0 else sync_time

glob_end = ('**', '*') if recursive else ('*',)
Expand All @@ -156,10 +156,10 @@ def send_all_in(
relative_root_path
if relative_root_path else
directory
).rstrip(os.sep)
).rstrip(os.path.sep)

for file_path in glob.iglob(glob_path, recursive=True):
relative_path = file_path[len(root_path):].lstrip(os.sep)
relative_path = file_path[len(root_path):].lstrip(os.path.sep)

response = send(
file_path=file_path,
Expand Down
32 changes: 19 additions & 13 deletions cauldron/cli/sync/sync_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import os
import zlib


from cauldron import writer


# Default Chunks are 1MB in size
DEFAULT_CHUNK_SIZE = 1048576 # type: int

Expand All @@ -20,7 +18,6 @@ def pack_chunk(source_data: bytes) -> str:
:param source_data:
The data to be converted to a compressed, base64 string
"""

if not source_data:
return ''

Expand Down Expand Up @@ -61,18 +58,17 @@ def get_file_chunk_count(
The number of chunks necessary to send the entire contents of the
specified file for the given chunk size
"""

if not os.path.exists(file_path):
return 0

file_size = os.path.getsize(file_path)
return max(1, math.ceil(file_size / chunk_size))
return max(1, int(math.ceil(file_size / chunk_size)))


def read_file_chunks(
file_path: str,
chunk_size: int = DEFAULT_CHUNK_SIZE
) -> str:
) -> bytes:
"""
Reads the specified file in chunks and returns a generator where
each returned chunk is a compressed base64 encoded string for sync
Expand All @@ -84,7 +80,6 @@ def read_file_chunks(
The size, in bytes, of each chunk. The final chunk will be less than
or equal to this size as the remainder.
"""

chunk_count = get_file_chunk_count(file_path, chunk_size)

if chunk_count < 1:
Expand All @@ -97,7 +92,12 @@ def read_file_chunks(
yield chunk


def write_file_chunk(file_path: str, chunk_data: str, append: bool = True):
def write_file_chunk(
file_path: str,
packed_chunk: str,
append: bool = True,
offset: int = -1
):
"""
Write or append the specified chunk data to the given file path, unpacking
the chunk before writing. If the file does not yet exist, it will be
Expand All @@ -106,13 +106,19 @@ def write_file_chunk(file_path: str, chunk_data: str, append: bool = True):
:param file_path:
The file where the chunk will be written or appended
:param chunk_data:
The packed chunk data to write to the file
:param packed_chunk:
The packed chunk data to write to the file. It will be unpacked before
the file is written.
:param append:
Whether or not the chunk should be appended to the existing file. If
False the chunk data will overwrite the existing file.
:param offset:
The byte offset in the file where the chunk should be written.
If the value is less than zero, the chunk will be written or appended
based on the `append` argument. Note that if you indicate an append
write mode and an offset, the mode will be forced to write instead of
append.
"""

mode = 'ab' if append else 'wb'
contents = unpack_chunk(chunk_data)
writer.write_file(file_path, contents, mode=mode)
contents = unpack_chunk(packed_chunk)
writer.write_file(file_path, contents, mode=mode, offset=offset)
19 changes: 19 additions & 0 deletions cauldron/environ/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,25 @@ def debug_echo(self) -> 'Response':
print(self.echo())
return self

def join(self, timeout: float = None) -> bool:
"""
Joins on the thread associated with the response if it exists, or
just returns after a no-op if no thread exists to join.
:param timeout:
Maximum number of seconds to block on the join before given up
and continuing operation. The default `None` value will wait
forever.
:return:
A boolean indicating whether or not a thread existed to join
upon.
"""
try:
self.thread.join(timeout)
return True
except AttributeError:
return False

def echo(self) -> str:
""" """

Expand Down
17 changes: 13 additions & 4 deletions cauldron/environ/systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import site
import sys
import time
import typing

from cauldron.environ import paths
Expand Down Expand Up @@ -119,13 +120,19 @@ def get_package_data() -> dict:
return json.load(f)


def remove(path: str) -> bool:
def remove(path: str, max_retries: int = 3) -> bool:
"""
Removes the specified path from the local filesystem if it exists.
Directories will be removed along with all files and folders within
them as well as files.
:param path:
The location of the file or folder to remove.
:param max_retries:
The number of times to retry before giving up.
:return:
A boolean indicating whether or not the removal was successful.
"""

if not path:
return False

Expand All @@ -134,12 +141,14 @@ def remove(path: str) -> bool:

remover = os.remove if os.path.isfile(path) else shutil.rmtree

for attempt in range(3):
for attempt in range(max_retries):
try:
remover(path)
return True
except Exception:
pass
# Pause briefly in case there's a race condition on lock
# for the target.
time.sleep(0.02)

return False

Expand Down
2 changes: 1 addition & 1 deletion cauldron/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"version": "0.3.4",
"version": "0.3.5",
"notebookVersion": "v1"
}

0 comments on commit 6701325

Please sign in to comment.