Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLA-699][external] Allow registration of multi-slotted read-write files from external storage #790

Merged
merged 8 commits into from Mar 20, 2024
6 changes: 3 additions & 3 deletions darwin/client.py
Expand Up @@ -131,7 +131,7 @@ def list_local_datasets(self, team_slug: Optional[str] = None) -> Iterator[Path]

def list_remote_datasets(
self, team_slug: Optional[str] = None
) -> Iterator[RemoteDataset]:
) -> Iterator[RemoteDatasetV2]:
"""
Returns a list of all available datasets with the team currently authenticated against.

Expand Down Expand Up @@ -162,7 +162,7 @@ def list_remote_datasets(

def get_remote_dataset(
self, dataset_identifier: Union[str, DatasetIdentifier]
) -> RemoteDataset:
) -> RemoteDatasetV2:
"""
Get a remote dataset based on its identifier.

Expand All @@ -189,7 +189,7 @@ def get_remote_dataset(
parsed_dataset_identifier.team_slug = self.default_team

try:
matching_datasets: List[RemoteDataset] = [
matching_datasets: List[RemoteDatasetV2] = [
Copy link
Contributor Author

@JBWilkie JBWilkie Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These type changes aren't functionally significant, but since we no longer have RemoteDatasetV1 they make some typechecker errors less likely

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still some places (docstrings/types) where we use RemoteDataset - should we change all of them 🤔

Copy link
Contributor Author

@JBWilkie JBWilkie Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, although I don't think it's a high priority change. I made the change here because I noticed Pylance type checking complaining, but didn't comb the rest of the repo

If you agree, I'd like to do that at some point down the line

dataset
for dataset in self.list_remote_datasets(
team_slug=parsed_dataset_identifier.team_slug
Expand Down
126 changes: 124 additions & 2 deletions darwin/dataset/remote_dataset_v2.py
Expand Up @@ -23,6 +23,7 @@
UploadHandlerV2,
)
from darwin.dataset.utils import (
get_external_file_name,
get_external_file_type,
is_relative_to,
parse_external_file_path,
Expand Down Expand Up @@ -562,12 +563,12 @@ def register(
self,
object_store: ObjectStore,
storage_keys: List[str],
fps: Optional[str] = None,
fps: Optional[Union[str, float]] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
"""
Register files in the dataset.
Register files in the dataset in a single slot.

Parameters
----------
Expand All @@ -586,7 +587,18 @@ def register(
-------
Dict[str, List[str]]
A dictionary with the list of registered files.

Raises
------
ValueError
If ``storage_keys`` is not a list of strings.
TypeError
If the file type is not supported.
"""
if not isinstance(storage_keys, list) or not all(
isinstance(item, str) for item in storage_keys
):
raise ValueError("storage_keys must be a list of strings")
items = []
for storage_key in storage_keys:
file_type = get_external_file_type(storage_key)
Expand Down Expand Up @@ -642,3 +654,113 @@ def register(
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results

def register_multi_slotted(
self,
object_store: ObjectStore,
storage_keys: Dict[str, List[str]],
fps: Optional[Union[str, float]] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
"""
Register files in the dataset in multiple slots.

Parameters
----------
object_store : ObjectStore
Object store to use for the registration.
storage_keys : Dict[str, List[str]
Storage keys to register. The keys are the item names and the values are lists of storage keys.
fps : Optional[str], default: None
When the uploading file is a video, specify its framerate.
multi_planar_view : bool, default: False
Uses multiplanar view when uploading files.
preserve_folders : bool, default: False
Specify whether or not to preserve folder paths when uploading

Returns
-------
Dict[str, List[str]]
A dictionary with the list of registered files.

Raises
------
ValueError
If ``storage_keys`` is not a dictionary with keys as item names and values as lists of storage keys.
TypeError
If the file type is not supported.
"""
if not isinstance(storage_keys, dict) or not all(
isinstance(v, list) and all(isinstance(i, str) for i in v)
for v in storage_keys.values()
):
raise ValueError(
"storage_keys must be a dictionary with keys as item names and values as lists of storage keys."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydantic models could be very efficient here! - but i guess we only use pydantic in darwin-py 2.0 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good point. We do, but I don't see a problem with using models here

We could do something like:

class StorageKeysModel(BaseModel):
    storage_keys: Dict[str, List[str]] = Field(..., description="A dictionary with keys as item names and values as lists of storage keys.")

and then:

try:
    validated_input = StorageKeysModel(storage_keys=storage_keys)
except ValidationError as e:
    raise ValueError(e)

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, that'd work! 🙏

items = []

for item in storage_keys:
slots = []
for storage_key in storage_keys[item]:
file_name = get_external_file_name(storage_key)
file_type = get_external_file_type(storage_key)
if not file_type:
raise TypeError(
f"Unsupported file type for the following storage key: {storage_key}.\nPlease make sure your storage key ends with one of the supported extensions:\n{SUPPORTED_EXTENSIONS}"
)
slot = {
"slot_name": file_name,
"type": file_type,
"storage_key": storage_key,
"file_name": file_name,
}
if fps and file_type == "video":
slot["fps"] = fps
if multi_planar_view and file_type == "dicom":
slot["extract_views"] = "true"
slots.append(slot)
items.append(
{
"slots": slots,
"name": item,
"path": parse_external_file_path(
storage_keys[item][0], preserve_folders
),
}
)

# Do not register more than 500 items in a single request
chunk_size = 500
chunked_items = (
items[i : i + chunk_size] for i in range(0, len(items), chunk_size)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've seen the chunking/batching function before somewhere - we can create a new fn and re-use it?

print(f"Registering {len(items)} items in chunks of {chunk_size} items...")
results = {
"registered": [],
"blocked": [],
}

for chunk in chunked_items:
payload = {
"items": chunk,
"dataset_slug": self.slug,
"storage_slug": object_store.name,
}
print(f"Registering {len(chunk)} items...")
response = self.client.api_v2.register_items(payload, team_slug=self.team)
for item in json.loads(response.text)["items"]:
item_info = f"Item {item['name']} registered with item ID {item['id']}"
results["registered"].append(item_info)
for item in json.loads(response.text)["blocked_items"]:
item_info = f"Item {item['name']} was blocked for the reason: {item['slots'][0]['reason']}"
results["blocked"].append(item_info)
print(
f"{len(results['registered'])} of {len(storage_keys)} items registered successfully"
)
if results["blocked"]:
print("The following items were blocked:")
for item in results["blocked"]:
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results
19 changes: 19 additions & 0 deletions darwin/dataset/utils.py
Expand Up @@ -893,3 +893,22 @@ def parse_external_file_path(storage_key: str, preserve_folders: bool) -> str:
if not preserve_folders:
return "/"
return "/" + "/".join(storage_key.split("/")[:-1])


def get_external_file_name(storage_key: str) -> str:
"""
Returns the name of the file given a storage key.

Parameters
----------
storage_key : str
The storage key to get the file name from.

Returns
-------
str
The name of the file.
"""
if "/" not in storage_key:
return storage_key
return storage_key.split("/")[-1]
161 changes: 161 additions & 0 deletions tests/darwin/dataset/remote_dataset_test.py
Expand Up @@ -14,6 +14,7 @@
from darwin.dataset.release import Release
from darwin.dataset.remote_dataset_v2 import RemoteDatasetV2
from darwin.dataset.upload_manager import LocalFile, UploadHandlerV2
from darwin.datatypes import ObjectStore
from darwin.exceptions import UnsupportedExportFormat, UnsupportedFileType
from darwin.item import DatasetItem
from tests.fixtures import *
Expand Down Expand Up @@ -871,3 +872,163 @@ def test_honours_include_authorship(self, remote_dataset: RemoteDatasetV2):
include_url_token=False,
include_authorship=True,
)


@pytest.mark.usefixtures("file_read_write_test")
class TestRegister:
def test_raises_if_storage_keys_not_list_of_strings(
self, remote_dataset: RemoteDatasetV2
):
with pytest.raises(ValueError):
remote_dataset.register(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
[1, 2, 3],
)

def test_raises_if_unsupported_file_type(self, remote_dataset: RemoteDatasetV2):
with pytest.raises(TypeError):
remote_dataset.register(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
["unsupported_file.xyz"],
)

@responses.activate
def test_register_files(self, remote_dataset: RemoteDatasetV2):
responses.add(
responses.POST,
"http://localhost/api/v2/teams/v7-darwin-json-v2/items/register_existing",
json={
"items": [{"id": "1", "name": "test.jpg"}],
"blocked_items": [],
},
status=200,
)
result = remote_dataset.register(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
["test.jpg"],
)
assert len(result["registered"]) == 1
assert len(result["blocked"]) == 0

@responses.activate
def test_register_files_with_blocked_items(self, remote_dataset: RemoteDatasetV2):
responses.add(
responses.POST,
"http://localhost/api/v2/teams/v7-darwin-json-v2/items/register_existing",
json={
"items": [],
"blocked_items": [
{"name": "test.jpg", "slots": [{"reason": "test reason"}]}
],
},
status=200,
)
result = remote_dataset.register(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
["test.jpg"],
)
assert len(result["registered"]) == 0
assert len(result["blocked"]) == 1


@pytest.mark.usefixtures("file_read_write_test")
class TestRegisterMultiSlotted:
def test_raises_if_storage_keys_not_dictionary(
self, remote_dataset: RemoteDatasetV2
):
with pytest.raises(ValueError):
remote_dataset.register_multi_slotted(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
{"item1": [1, 2, 3]},
)

def test_raises_if_unsupported_file_type(self, remote_dataset: RemoteDatasetV2):
with pytest.raises(TypeError):
remote_dataset.register_multi_slotted(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
{"item1": ["unsupported_file.xyz"]},
)

@responses.activate
def test_register_files(self, remote_dataset: RemoteDatasetV2):
responses.add(
responses.POST,
"http://localhost/api/v2/teams/v7-darwin-json-v2/items/register_existing",
json={
"items": [{"id": "1", "name": "test.jpg"}],
"blocked_items": [],
},
status=200,
)
result = remote_dataset.register_multi_slotted(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
{"item1": ["test.jpg"]},
)
assert len(result["registered"]) == 1
assert len(result["blocked"]) == 0

@responses.activate
def test_register_files_with_blocked_items(self, remote_dataset: RemoteDatasetV2):
responses.add(
responses.POST,
"http://localhost/api/v2/teams/v7-darwin-json-v2/items/register_existing",
json={
"items": [],
"blocked_items": [
{"name": "test.jpg", "slots": [{"reason": "test reason"}]}
],
},
status=200,
)
remote_dataset.register_multi_slotted(
ObjectStore(
name="test",
prefix="test_prefix",
readonly=False,
provider="aws",
default=True,
),
{"item1": ["test.jpg"]},
)