Skip to content

Commit

Permalink
[PLA-699][external] Allow registration of multi-slotted read-write fi…
Browse files Browse the repository at this point in the history
…les from external storage (#790)

* Undo accidental changes

* Moved ObjectStore tests to datatypes_test.py

* Multi-slotted registration function

* Added configurable dataset path destination for multi-slotted registration

* Unit tests for register & register_multi_slotted functions

* Abstracted chunking logic into a helper function

* Pydantic validation for storage keys
  • Loading branch information
JBWilkie committed Mar 20, 2024
1 parent da4478b commit 3c4263c
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 73 deletions.
6 changes: 3 additions & 3 deletions darwin/client.py
Expand Up @@ -131,7 +131,7 @@ def list_local_datasets(self, team_slug: Optional[str] = None) -> Iterator[Path]

def list_remote_datasets(
self, team_slug: Optional[str] = None
) -> Iterator[RemoteDataset]:
) -> Iterator[RemoteDatasetV2]:
"""
Returns a list of all available datasets with the team currently authenticated against.
Expand Down Expand Up @@ -162,7 +162,7 @@ def list_remote_datasets(

def get_remote_dataset(
self, dataset_identifier: Union[str, DatasetIdentifier]
) -> RemoteDataset:
) -> RemoteDatasetV2:
"""
Get a remote dataset based on its identifier.
Expand All @@ -189,7 +189,7 @@ def get_remote_dataset(
parsed_dataset_identifier.team_slug = self.default_team

try:
matching_datasets: List[RemoteDataset] = [
matching_datasets: List[RemoteDatasetV2] = [
dataset
for dataset in self.list_remote_datasets(
team_slug=parsed_dataset_identifier.team_slug
Expand Down
140 changes: 135 additions & 5 deletions darwin/dataset/remote_dataset_v2.py
Expand Up @@ -11,6 +11,7 @@
Union,
)

from pydantic import ValidationError
from requests.models import Response

from darwin.dataset import RemoteDataset
Expand All @@ -23,11 +24,20 @@
UploadHandlerV2,
)
from darwin.dataset.utils import (
chunk_items,
get_external_file_name,
get_external_file_type,
is_relative_to,
parse_external_file_path,
)
from darwin.datatypes import AnnotationFile, ItemId, ObjectStore, PathLike
from darwin.datatypes import (
AnnotationFile,
ItemId,
ObjectStore,
PathLike,
StorageKeyDictModel,
StorageKeyListModel,
)
from darwin.exceptions import NotFound, UnknownExportVersion
from darwin.exporter.formats.darwin import build_image_annotation
from darwin.item import DatasetItem
Expand Down Expand Up @@ -562,12 +572,12 @@ def register(
self,
object_store: ObjectStore,
storage_keys: List[str],
fps: Optional[str] = None,
fps: Optional[Union[str, float]] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
"""
Register files in the dataset.
Register files in the dataset in a single slot.
Parameters
----------
Expand All @@ -586,7 +596,21 @@ def register(
-------
Dict[str, List[str]]
A dictionary with the list of registered files.
Raises
------
ValueError
If ``storage_keys`` is not a list of strings.
TypeError
If the file type is not supported.
"""
try:
StorageKeyListModel(storage_keys=storage_keys)
except ValidationError as e:
print(
f"Error validating storage keys: {e}\n\nPlease make sure your storage keys are a list of strings"
)
raise e
items = []
for storage_key in storage_keys:
file_type = get_external_file_type(storage_key)
Expand All @@ -610,9 +634,115 @@ def register(

# Do not register more than 500 items in a single request
chunk_size = 500
chunked_items = (
items[i : i + chunk_size] for i in range(0, len(items), chunk_size)
chunked_items = chunk_items(items, chunk_size)
print(f"Registering {len(items)} items in chunks of {chunk_size} items...")
results = {
"registered": [],
"blocked": [],
}

for chunk in chunked_items:
payload = {
"items": chunk,
"dataset_slug": self.slug,
"storage_slug": object_store.name,
}
print(f"Registering {len(chunk)} items...")
response = self.client.api_v2.register_items(payload, team_slug=self.team)
for item in json.loads(response.text)["items"]:
item_info = f"Item {item['name']} registered with item ID {item['id']}"
results["registered"].append(item_info)
for item in json.loads(response.text)["blocked_items"]:
item_info = f"Item {item['name']} was blocked for the reason: {item['slots'][0]['reason']}"
results["blocked"].append(item_info)
print(
f"{len(results['registered'])} of {len(storage_keys)} items registered successfully"
)
if results["blocked"]:
print("The following items were blocked:")
for item in results["blocked"]:
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results

def register_multi_slotted(
self,
object_store: ObjectStore,
storage_keys: Dict[str, List[str]],
fps: Optional[Union[str, float]] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
"""
Register files in the dataset in multiple slots.
Parameters
----------
object_store : ObjectStore
Object store to use for the registration.
storage_keys : Dict[str, List[str]
Storage keys to register. The keys are the item names and the values are lists of storage keys.
fps : Optional[str], default: None
When the uploading file is a video, specify its framerate.
multi_planar_view : bool, default: False
Uses multiplanar view when uploading files.
preserve_folders : bool, default: False
Specify whether or not to preserve folder paths when uploading
Returns
-------
Dict[str, List[str]]
A dictionary with the list of registered files.
Raises
------
ValueError
If ``storage_keys`` is not a dictionary with keys as item names and values as lists of storage keys.
TypeError
If the file type is not supported.
"""

try:
StorageKeyDictModel(storage_keys=storage_keys)
except ValidationError as e:
print(
f"Error validating storage keys: {e}\n\nPlease make sure your storage keys are a dictionary with keys as item names and values as lists of storage keys"
)
raise e
items = []
for item in storage_keys:
slots = []
for storage_key in storage_keys[item]:
file_name = get_external_file_name(storage_key)
file_type = get_external_file_type(storage_key)
if not file_type:
raise TypeError(
f"Unsupported file type for the following storage key: {storage_key}.\nPlease make sure your storage key ends with one of the supported extensions:\n{SUPPORTED_EXTENSIONS}"
)
slot = {
"slot_name": file_name,
"type": file_type,
"storage_key": storage_key,
"file_name": file_name,
}
if fps and file_type == "video":
slot["fps"] = fps
if multi_planar_view and file_type == "dicom":
slot["extract_views"] = "true"
slots.append(slot)
items.append(
{
"slots": slots,
"name": item,
"path": parse_external_file_path(
storage_keys[item][0], preserve_folders
),
}
)

# Do not register more than 500 items in a single request
chunk_size = 500
chunked_items = chunk_items(items, chunk_size)
print(f"Registering {len(items)} items in chunks of {chunk_size} items...")
results = {
"registered": [],
Expand Down
38 changes: 38 additions & 0 deletions darwin/dataset/utils.py
Expand Up @@ -893,3 +893,41 @@ def parse_external_file_path(storage_key: str, preserve_folders: bool) -> str:
if not preserve_folders:
return "/"
return "/" + "/".join(storage_key.split("/")[:-1])


def get_external_file_name(storage_key: str) -> str:
"""
Returns the name of the file given a storage key.
Parameters
----------
storage_key : str
The storage key to get the file name from.
Returns
-------
str
The name of the file.
"""
if "/" not in storage_key:
return storage_key
return storage_key.split("/")[-1]


def chunk_items(items: List[Any], chunk_size: int = 500) -> Iterator[List[Any]]:
"""
Splits the list of items into chunks of specified size.
Parameters
----------
items : List[Any]
The list of items to split.
chunk_size : int, default: 500
The size of each chunk.
Returns
-------
Iterator[List[Any]]
An iterator that yields lists of items, each of length ``chunk_size``.
"""
return (items[i : i + chunk_size] for i in range(0, len(items), chunk_size))
10 changes: 10 additions & 0 deletions darwin/datatypes.py
Expand Up @@ -17,6 +17,8 @@
Union,
)

from pydantic import BaseModel

try:
from numpy.typing import NDArray
except ImportError:
Expand Down Expand Up @@ -1491,3 +1493,11 @@ def __str__(self) -> str:

def __repr__(self) -> str:
return f"ObjectStore(name={self.name}, prefix={self.prefix}, readonly={self.readonly}, provider={self.provider})"


class StorageKeyDictModel(BaseModel):
storage_keys: Dict[str, List[str]]


class StorageKeyListModel(BaseModel):
storage_keys: List[str]

0 comments on commit 3c4263c

Please sign in to comment.