Skip to content

Commit

Permalink
[SYNPY-1356] Cleaning up code and provding benchmarking results (#1097)
Browse files Browse the repository at this point in the history
* Cleaning up code and provding benchmarking results
  • Loading branch information
BryanFauble committed May 14, 2024
1 parent 1bb3725 commit 7ea004d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 142 deletions.
23 changes: 23 additions & 0 deletions docs/explanations/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,29 @@ will give us a way to measure the impact of changes to the client.

## Results

### 05/10/2024: Uploading files to Synapse
These benchmarking results were collected due to the following changes:
- The upload algorithm for the Synapseutils `syncToSynapse` being re-written to take
advantage of the new AsyncIO upload algorithm for individual files.
- An updated limit on concurrent file transfers to match `max_threads * 2`


| Test | Total Transfer Size | Synapseutils | OOP Models Interface | syn.Store() | S3 Sync CLI |
|---------------------|---------------------|--------------|----------------------|-------------|-------------|
| 10 File/10GiB ea | 100GiB | 1656.64s | 1656.77s | 1674.63s | 1519.75s |
| 1 File/10GiB ea | 10GiB | 166.83s | 166.41s | 167.21 | 149.55s |
| 10 File/1GiB ea | 10GiB | 168.74s | 167.15s | 184.78s | 166.39s |
| 100 File/100 MiB ea | 10GiB | 158.98 | 125.98s | 293.07s | 162.57s |
| 10 File/100 MiB ea | 1GiB | 16.55s | 14.37s | 29.23s | 19.18s |
| 100 File/10 MiB ea | 1GiB | 15.92s | 15.49s | 129.90s | 18.66s |
| 1000 File/1 MiB ea | 1GiB | 135.77s | 137.15s | 1021.32s | 26.03s |

#### A high level overview of the differences between each of the upload methods:
- **OOP Models Interface:** Uploads all files and 8MB chunks of each file in parallel using a new upload algorithm
- **Synapseutils:** Uploads all files and 8MB chunks of each file in parallel using a new upload algorithm
- **syn.Store():** Uploads files sequentally, but 8MB chunks in parallel using a new upload algorithm
- **S3 Sync CLI:** Executing the `aws s3 sync` command through Python `subprocess.run()`

### 04/01/2024: Uploading files to Synapse
These benchmarking results bring together some important updates to the Upload logic. It
has been re-written to bring a focus to concurrent file uploads and more effecient use
Expand Down
8 changes: 1 addition & 7 deletions docs/scripts/uploadBenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,12 @@ def execute_synapseutils_test(
def execute_walk_test(
path: str,
test_name: str,
async_file_handle_upload: bool = False,
) -> None:
"""Execute the test that uses os.walk to sync all files/folders to synapse.
Arguments:
path: The path to the root directory
test_name: The name of the test to add to the span name
async_file_handle_upload: Whether to use the async_file_handle_upload option
"""
with tracer.start_as_current_span(f"manual_walk__{test_name}"):
time_before_walking_tree = perf_counter()
Expand Down Expand Up @@ -254,9 +252,7 @@ def execute_walk_test(
path=filepath,
parent=parents[directory_path],
)
saved_file = syn.store(
file, async_file_handle_upload=async_file_handle_upload
)
saved_file = syn.store(file)
saved_files.append(saved_file)

# Store annotations on the file ------------------------------------------
Expand Down Expand Up @@ -377,8 +373,6 @@ def execute_test_suite(

# execute_walk_test(path, test_name)

# execute_walk_test(path, test_name, True)

# execute_walk_test_oop(path, test_name)

# execute_sync_to_s3(path, test_name)
Expand Down
38 changes: 6 additions & 32 deletions synapseclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@
multipart_upload_string_async,
)
from synapseclient.core.remote_file_storage_wrappers import S3ClientWrapper, SFTPWrapper
from synapseclient.core.upload.upload_functions import (
upload_file_handle,
)
from synapseclient.core.upload.upload_functions_async import (
upload_file_handle as upload_file_handle_async,
upload_synapse_s3,
Expand Down Expand Up @@ -1768,7 +1765,6 @@ def store(
activityName=None,
activityDescription=None,
set_annotations=True,
async_file_handle_upload: bool = True,
):
"""
Creates a new Entity or updates an existing Entity, uploading any files in the process.
Expand All @@ -1791,11 +1787,6 @@ def store(
You will be contacted with regards to the specific data being restricted and the
requirements of access.
set_annotations: If True, set the annotations on the entity. If False, do not set the annotations.
async_file_handle_upload: Temporary feature flag that will be removed at an
unannounced later date. This is used during the Synapse Utils
syncToSynapse to disable the async file handle upload. The is because
the multi-threaded logic in the syncToSynapse is not compatible with
the async file handle upload.
Returns:
A Synapse Entity, Evaluation, or Wiki
Expand Down Expand Up @@ -1955,37 +1946,20 @@ def store(
"Entities of type File must have a parentId."
)

if async_file_handle_upload:
fileHandle = wrap_async_to_sync(
upload_file_handle_async(
self,
parent_id_for_upload,
local_state["path"]
if (
synapseStore
or local_state_fh.get("externalURL") is None
)
else local_state_fh.get("externalURL"),
synapse_store=synapseStore,
md5=local_file_md5_hex or local_state_fh.get("contentMd5"),
file_size=local_state_fh.get("contentSize"),
mimetype=local_state_fh.get("contentType"),
),
self,
)
else:
fileHandle = upload_file_handle(
fileHandle = wrap_async_to_sync(
upload_file_handle_async(
self,
parent_id_for_upload,
local_state["path"]
if (synapseStore or local_state_fh.get("externalURL") is None)
else local_state_fh.get("externalURL"),
synapseStore=synapseStore,
synapse_store=synapseStore,
md5=local_file_md5_hex or local_state_fh.get("contentMd5"),
file_size=local_state_fh.get("contentSize"),
mimetype=local_state_fh.get("contentType"),
max_threads=self.max_threads,
)
),
self,
)
properties["dataFileHandleId"] = fileHandle["id"]
local_state["_file_handle"] = fileHandle

Expand Down
2 changes: 1 addition & 1 deletion synapseclient/core/upload/multipart_upload_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
@contextmanager
def shared_progress_bar(progress_bar):
"""An outside process that will eventually trigger an upload through this module
can configure a shared Executor by running its code within this context manager.
can configure a shared Progress Bar by running its code within this context manager.
"""
_thread_local.progress_bar = progress_bar
try:
Expand Down
117 changes: 15 additions & 102 deletions tests/unit/synapseclient/unit_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2410,7 +2410,9 @@ def test_store__needsUploadFalse__fileHandleId_not_in_local_state(syn):
with patch.object(
syn, "_getEntityBundle", return_value=returned_bundle
), patch.object(
synapseclient.client, "upload_file_handle", return_value=returned_file_handle
synapseclient.client,
"upload_file_handle_async",
return_value=returned_file_handle,
), patch.object(
syn.cache, "contains", return_value=True
), patch.object(
Expand Down Expand Up @@ -2486,7 +2488,9 @@ def test_store__existing_processed_as_update(syn):
}

with patch.object(syn, "_getEntityBundle") as mock_get_entity_bundle, patch.object(
synapseclient.client, "upload_file_handle", return_value=returned_file_handle
synapseclient.client,
"upload_file_handle_async",
return_value=returned_file_handle,
), patch.object(syn.cache, "contains", return_value=True), patch.object(
syn, "_createEntity"
) as mock_createEntity, patch.object(
Expand Down Expand Up @@ -2617,103 +2621,6 @@ def test_store__409_processed_as_update(syn):
mock_findEntityId.assert_called_once_with(file_name, parent_id)


def test_store__409_processed_as_update_non_async_file_handle(syn):
"""Test that if we get a 409 conflict when creating an entity we re-retrieve its
associated bundle and process it as an entity update instead."""
file_handle_id = "123412341234"
returned_file_handle = {"id": file_handle_id}

parent_id = "syn122"
synapse_id = "syn123"
etag = "db9bc70b-1eb6-4a21-b3e8-9bf51d964031"
file_name = "fake_file.txt"

existing_bundle_annotations = {
"foo": {"type": "LONG", "value": ["1"]},
"bar": {"type": "LONG", "value": ["2"]},
}
new_annotations = {
"foo": [3],
"baz": [4],
}

returned_bundle = {
"entity": {
"name": file_name,
"id": synapse_id,
"etag": etag,
"concreteType": "org.sagebionetworks.repo.model.FileEntity",
"dataFileHandleId": file_handle_id,
},
"entityType": "file",
"fileHandles": [
{
"id": file_handle_id,
"concreteType": "org.sagebionetworks.repo.model.file.S3FileHandle",
}
],
"annotations": {
"id": synapse_id,
"etag": etag,
"annotations": existing_bundle_annotations,
},
}

expected_create_properties = {
"name": file_name,
"concreteType": "org.sagebionetworks.repo.model.FileEntity",
"dataFileHandleId": file_handle_id,
"parentId": parent_id,
"versionComment": None,
}
expected_update_properties = {
**expected_create_properties,
"id": synapse_id,
"etag": etag,
}

# we expect the annotations to be merged
expected_annotations = {
"foo": [3],
"bar": [2],
"baz": [4],
}

with patch.object(syn, "_getEntityBundle") as mock_get_entity_bundle, patch.object(
synapseclient.client, "upload_file_handle", return_value=returned_file_handle
), patch.object(syn.cache, "contains", return_value=True), patch.object(
syn, "_createEntity"
) as mock_createEntity, patch.object(
syn, "_updateEntity"
) as mock_updateEntity, patch.object(
syn, "findEntityId"
) as mock_findEntityId, patch.object(
syn, "set_annotations"
) as mock_set_annotations, patch.object(
Entity, "create"
), patch.object(
syn, "get"
):
mock_get_entity_bundle.side_effect = [None, returned_bundle]
mock_createEntity.side_effect = SynapseHTTPError(
response=DictObject({"status_code": 409})
)
mock_findEntityId.return_value = synapse_id

f = File(f"/{file_name}", parent=parent_id, **new_annotations)
syn.store(f, async_file_handle_upload=False)

mock_updateEntity.assert_called_once_with(
expected_update_properties,
True, # createOrUpdate
None, # versionLabel
)

mock_set_annotations.assert_called_once_with(expected_annotations)
mock_createEntity.assert_called_once_with(expected_create_properties)
mock_findEntityId.assert_called_once_with(file_name, parent_id)


def test_store__no_need_to_update_annotation(syn):
"""
Verify if the annotations don't change, no need to call set_annotation method
Expand Down Expand Up @@ -2758,7 +2665,9 @@ def test_store__no_need_to_update_annotation(syn):
}

with patch.object(syn, "_getEntityBundle") as mock_get_entity_bundle, patch.object(
synapseclient.client, "upload_file_handle", return_value=returned_file_handle
synapseclient.client,
"upload_file_handle_async",
return_value=returned_file_handle,
), patch.object(syn.cache, "contains", return_value=True), patch.object(
syn, "_createEntity"
), patch.object(
Expand Down Expand Up @@ -2827,7 +2736,9 @@ def test_store__update_versionComment(syn):
}

with patch.object(syn, "_getEntityBundle") as mock_get_entity_bundle, patch.object(
synapseclient.client, "upload_file_handle", return_value=returned_file_handle
synapseclient.client,
"upload_file_handle_async",
return_value=returned_file_handle,
), patch.object(syn.cache, "contains", return_value=True), patch.object(
syn, "_createEntity"
) as mock_createEntity, patch.object(
Expand Down Expand Up @@ -2934,7 +2845,9 @@ def test_store__existing_no_update(syn):
}

with patch.object(syn, "_getEntityBundle") as mock_get_entity_bundle, patch.object(
synapseclient.client, "upload_file_handle", return_value=returned_file_handle
synapseclient.client,
"upload_file_handle_async",
return_value=returned_file_handle,
), patch.object(syn.cache, "contains", return_value=True), patch.object(
syn, "_createEntity"
) as mock_createEntity, patch.object(
Expand Down

0 comments on commit 7ea004d

Please sign in to comment.