Skip to content

Commit

Permalink
[SYNPY-1356] Refactor sync to synapse (#1083)
Browse files Browse the repository at this point in the history
* Refactoring synapseutils syncToSynapse to take advantage of the re-written upload algorithm
  • Loading branch information
BryanFauble committed May 14, 2024
1 parent 4646c7b commit be9f166
Show file tree
Hide file tree
Showing 16 changed files with 1,233 additions and 913 deletions.
23 changes: 23 additions & 0 deletions docs/explanations/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,29 @@ will give us a way to measure the impact of changes to the client.

## Results

### 05/10/2024: Uploading files to Synapse
These benchmarking results were collected due to the following changes:
- The upload algorithm for the Synapseutils `syncToSynapse` being re-written to take
advantage of the new AsyncIO upload algorithm for individual files.
- An updated limit on concurrent file transfers to match `max_threads * 2`


| Test | Total Transfer Size | Synapseutils | OOP Models Interface | syn.Store() | S3 Sync CLI |
|---------------------|---------------------|--------------|----------------------|-------------|-------------|
| 10 File/10GiB ea | 100GiB | 1656.64s | 1656.77s | 1674.63s | 1519.75s |
| 1 File/10GiB ea | 10GiB | 166.83s | 166.41s | 167.21 | 149.55s |
| 10 File/1GiB ea | 10GiB | 168.74s | 167.15s | 184.78s | 166.39s |
| 100 File/100 MiB ea | 10GiB | 158.98 | 125.98s | 293.07s | 162.57s |
| 10 File/100 MiB ea | 1GiB | 16.55s | 14.37s | 29.23s | 19.18s |
| 100 File/10 MiB ea | 1GiB | 15.92s | 15.49s | 129.90s | 18.66s |
| 1000 File/1 MiB ea | 1GiB | 135.77s | 137.15s | 1021.32s | 26.03s |

#### A high level overview of the differences between each of the upload methods:
- **OOP Models Interface:** Uploads all files and 8MB chunks of each file in parallel using a new upload algorithm
- **Synapseutils:** Uploads all files and 8MB chunks of each file in parallel using a new upload algorithm
- **syn.Store():** Uploads files sequentally, but 8MB chunks in parallel using a new upload algorithm
- **S3 Sync CLI:** Executing the `aws s3 sync` command through Python `subprocess.run()`

### 04/01/2024: Uploading files to Synapse
These benchmarking results bring together some important updates to the Upload logic. It
has been re-written to bring a focus to concurrent file uploads and more effecient use
Expand Down
8 changes: 1 addition & 7 deletions docs/scripts/uploadBenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,12 @@ def execute_synapseutils_test(
def execute_walk_test(
path: str,
test_name: str,
async_file_handle_upload: bool = False,
) -> None:
"""Execute the test that uses os.walk to sync all files/folders to synapse.
Arguments:
path: The path to the root directory
test_name: The name of the test to add to the span name
async_file_handle_upload: Whether to use the async_file_handle_upload option
"""
with tracer.start_as_current_span(f"manual_walk__{test_name}"):
time_before_walking_tree = perf_counter()
Expand Down Expand Up @@ -254,9 +252,7 @@ def execute_walk_test(
path=filepath,
parent=parents[directory_path],
)
saved_file = syn.store(
file, async_file_handle_upload=async_file_handle_upload
)
saved_file = syn.store(file)
saved_files.append(saved_file)

# Store annotations on the file ------------------------------------------
Expand Down Expand Up @@ -377,8 +373,6 @@ def execute_test_suite(

# execute_walk_test(path, test_name)

# execute_walk_test(path, test_name, True)

# execute_walk_test_oop(path, test_name)

# execute_sync_to_s3(path, test_name)
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ tests_require =
pytest-rerunfailures~=12.0
func-timeout~=4.3
pytest-cov~=4.1.0
pandas>=1.5,<3.0

[options.extras_require]
dev =
Expand All @@ -83,6 +84,7 @@ dev =
pytest-cov~=4.1.0
black
pre-commit
pandas>=1.5,<3.0

tests =
pytest>=7.0.0,<8.0
Expand All @@ -94,6 +96,7 @@ tests =
pytest-rerunfailures~=12.0
func-timeout~=4.3
pytest-cov~=4.1.0
pandas>=1.5,<3.0

pandas =
pandas>=1.5,<3.0
Expand Down
78 changes: 46 additions & 32 deletions synapseclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@
multipart_upload_string_async,
)
from synapseclient.core.remote_file_storage_wrappers import S3ClientWrapper, SFTPWrapper
from synapseclient.core.upload.upload_functions import (
upload_file_handle,
)
from synapseclient.core.upload.upload_functions_async import (
upload_file_handle as upload_file_handle_async,
upload_synapse_s3,
Expand Down Expand Up @@ -401,6 +398,7 @@ def log_response(response: httpx.Response) -> None:
self.max_threads = transfer_config["max_threads"]
self._thread_executor = {}
self._process_executor = {}
self._parallel_file_transfer_semaphore = {}
self._md5_semaphore = {}
self.use_boto_sts_transfers = transfer_config["use_boto_sts"]

Expand Down Expand Up @@ -561,6 +559,45 @@ def _get_md5_semaphore(

return self._md5_semaphore[asyncio_event_loop]

def _get_parallel_file_transfer_semaphore(
self, asyncio_event_loop: asyncio.AbstractEventLoop
) -> asyncio.Semaphore:
"""
Retrieve the semaphore for the Synapse client. Or create a new one if it does
not exist. This semaphore is used to limit the number of files that can actively
enter the uploading/downloading process.
This is expected to be called from within an AsyncIO loop.
By default the number of files that can enter the "uploading" state will be
limited to 2 * max_threads. This is to ensure that the files that are entering
into the "uploading" state will have priority to finish. Additionally, it means
that there should be a good spread of files getting up to the "uploading"
state, entering the "uploading" state, and finishing the "uploading" state.
If we break these states down into large components they would look like:
- Before "uploading" state: HTTP rest calls to retrieve what data Synapse has
- Entering "uploading" state: MD5 calculation and HTTP rest calls to determine
how/where to upload a file to.
- During "uploading" state: Uploading the file to a storage provider.
- After "uploading" state: HTTP rest calls to finalize the upload.
This has not yet been applied to parallel file downloads. That will take place
later on.
"""
if (
hasattr(self, "_parallel_file_transfer_semaphore")
and asyncio_event_loop in self._parallel_file_transfer_semaphore
and self._parallel_file_transfer_semaphore[asyncio_event_loop] is not None
):
return self._parallel_file_transfer_semaphore[asyncio_event_loop]

self._parallel_file_transfer_semaphore.update(
{asyncio_event_loop: asyncio.Semaphore(max(self.max_threads * 2, 1))}
)

return self._parallel_file_transfer_semaphore[asyncio_event_loop]

# initialize logging
def _init_logger(self):
"""
Expand Down Expand Up @@ -1728,7 +1765,6 @@ def store(
activityName=None,
activityDescription=None,
set_annotations=True,
async_file_handle_upload: bool = True,
):
"""
Creates a new Entity or updates an existing Entity, uploading any files in the process.
Expand All @@ -1751,11 +1787,6 @@ def store(
You will be contacted with regards to the specific data being restricted and the
requirements of access.
set_annotations: If True, set the annotations on the entity. If False, do not set the annotations.
async_file_handle_upload: Temporary feature flag that will be removed at an
unannounced later date. This is used during the Synapse Utils
syncToSynapse to disable the async file handle upload. The is because
the multi-threaded logic in the syncToSynapse is not compatible with
the async file handle upload.
Returns:
A Synapse Entity, Evaluation, or Wiki
Expand Down Expand Up @@ -1915,37 +1946,20 @@ def store(
"Entities of type File must have a parentId."
)

if async_file_handle_upload:
fileHandle = wrap_async_to_sync(
upload_file_handle_async(
self,
parent_id_for_upload,
local_state["path"]
if (
synapseStore
or local_state_fh.get("externalURL") is None
)
else local_state_fh.get("externalURL"),
synapse_store=synapseStore,
md5=local_file_md5_hex or local_state_fh.get("contentMd5"),
file_size=local_state_fh.get("contentSize"),
mimetype=local_state_fh.get("contentType"),
),
self,
)
else:
fileHandle = upload_file_handle(
fileHandle = wrap_async_to_sync(
upload_file_handle_async(
self,
parent_id_for_upload,
local_state["path"]
if (synapseStore or local_state_fh.get("externalURL") is None)
else local_state_fh.get("externalURL"),
synapseStore=synapseStore,
synapse_store=synapseStore,
md5=local_file_md5_hex or local_state_fh.get("contentMd5"),
file_size=local_state_fh.get("contentSize"),
mimetype=local_state_fh.get("contentType"),
max_threads=self.max_threads,
)
),
self,
)
properties["dataFileHandleId"] = fileHandle["id"]
local_state["_file_handle"] = fileHandle

Expand Down
28 changes: 25 additions & 3 deletions synapseclient/core/upload/multipart_upload_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

# pylint: disable=protected-access
import asyncio
from contextlib import contextmanager
import gc
import mimetypes
import os
Expand Down Expand Up @@ -126,6 +127,22 @@
DEFAULT_PART_SIZE = 8 * MB
MAX_RETRIES = 7

_thread_local = threading.local()


@contextmanager
def shared_progress_bar(progress_bar):
"""An outside process that will eventually trigger an upload through this module
can configure a shared Progress Bar by running its code within this context manager.
"""
_thread_local.progress_bar = progress_bar
try:
yield
finally:
_thread_local.progress_bar.close()
_thread_local.progress_bar.refresh()
del _thread_local.progress_bar


@dataclass
class HandlePartResult:
Expand Down Expand Up @@ -173,6 +190,7 @@ def __init__(
self._aborted = False
self._storage_str = storage_str

self._close_progress_bar = getattr(_thread_local, "progress_bar", None) is None
# populated later
self._upload_id: Optional[str] = None
self._pre_signed_part_urls: Optional[Mapping[int, str]] = None
Expand Down Expand Up @@ -329,7 +347,9 @@ async def _upload_parts(
if self._is_copy():
# we won't have bytes to measure during a copy so the byte oriented
# progress bar is not useful
self._progress_bar = tqdm(
self._progress_bar = getattr(
_thread_local, "progress_bar", None
) or tqdm(
total=part_count,
desc=self._storage_str or "Copying",
unit_scale=True,
Expand All @@ -343,7 +363,9 @@ async def _upload_parts(
file_size,
)

self._progress_bar = tqdm(
self._progress_bar = getattr(
_thread_local, "progress_bar", None
) or tqdm(
total=file_size,
desc=self._storage_str or "Uploading",
unit="B",
Expand Down Expand Up @@ -448,7 +470,7 @@ async def _complete_upload(self) -> Dict[str, str]:
Returns:
The response from the server for the completed upload.
"""
if not self._syn.silent and self._progress_bar:
if not self._syn.silent and self._progress_bar and self._close_progress_bar:
self._progress_bar.close()
upload_status_response = await put_file_multipart_complete(
upload_id=self._upload_id,
Expand Down
8 changes: 6 additions & 2 deletions synapseclient/models/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from synapseclient.models.services.storable_entity_components import (
store_entity_components,
)
from synapseutils.copy_functions import changeFileMetaData, copy

if TYPE_CHECKING:
from synapseclient.models import Folder, Project
Expand Down Expand Up @@ -727,7 +726,10 @@ async def store_async(

if self.path:
self.path = os.path.expanduser(self.path)
await self._upload_file(synapse_client=client)
async with client._get_parallel_file_transfer_semaphore(
asyncio_event_loop=asyncio.get_running_loop()
):
await self._upload_file(synapse_client=client)
elif self.data_file_handle_id:
self.path = client.cache.get(file_handle_id=self.data_file_handle_id)

Expand Down Expand Up @@ -814,6 +816,7 @@ async def change_metadata_async(
"""
if not self.id:
raise ValueError("The file must have an ID to change metadata.")
from synapseutils.copy_functions import changeFileMetaData

loop = asyncio.get_event_loop()

Expand Down Expand Up @@ -1085,6 +1088,7 @@ async def copy_async(
"""
if not self.id or not parent_id:
raise ValueError("The file must have an ID and parent_id to copy.")
from synapseutils.copy_functions import copy

loop = asyncio.get_event_loop()

Expand Down
6 changes: 5 additions & 1 deletion synapseclient/models/mixins/storable_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ def _create_task_for_child(
if if_collision:
file.if_collision = if_collision

pending_tasks.append(asyncio.create_task(wrap_coroutine(file.get_async())))
pending_tasks.append(
asyncio.create_task(
wrap_coroutine(file.get_async(include_activity=True))
)
)
return pending_tasks

def _resolve_sync_from_synapse_result(
Expand Down
3 changes: 2 additions & 1 deletion synapseutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
[Synapse](http://www.synapse.org). The behavior of these functions are subject to change.
"""

# flake8: noqa F401 unclear who is using these
from .copy_functions import copy, copyWiki, copyFileHandles, changeFileMetaData
from .walk_functions import walk
from .sync import syncFromSynapse, syncToSynapse, generate_sync_manifest
from .migrate_functions import index_files_for_migration, migrate_indexed_files
from .monitor import notifyMe, with_progress_bar
from .monitor import notifyMe, with_progress_bar, notify_me_async
from .describe_functions import describe

0 comments on commit be9f166

Please sign in to comment.