Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLA-699][external] Allow registration of multi-slotted read-write files from external storage #790

Merged
merged 8 commits into from Mar 20, 2024
6 changes: 3 additions & 3 deletions darwin/client.py
Expand Up @@ -131,7 +131,7 @@ def list_local_datasets(self, team_slug: Optional[str] = None) -> Iterator[Path]

def list_remote_datasets(
self, team_slug: Optional[str] = None
) -> Iterator[RemoteDataset]:
) -> Iterator[RemoteDatasetV2]:
"""
Returns a list of all available datasets with the team currently authenticated against.

Expand Down Expand Up @@ -162,7 +162,7 @@ def list_remote_datasets(

def get_remote_dataset(
self, dataset_identifier: Union[str, DatasetIdentifier]
) -> RemoteDataset:
) -> RemoteDatasetV2:
"""
Get a remote dataset based on its identifier.

Expand All @@ -189,7 +189,7 @@ def get_remote_dataset(
parsed_dataset_identifier.team_slug = self.default_team

try:
matching_datasets: List[RemoteDataset] = [
matching_datasets: List[RemoteDatasetV2] = [
Copy link
Contributor Author

@JBWilkie JBWilkie Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These type changes aren't functionally significant, but since we no longer have RemoteDatasetV1 they make some typechecker errors less likely

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still some places (docstrings/types) where we use RemoteDataset - should we change all of them 🤔

Copy link
Contributor Author

@JBWilkie JBWilkie Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, although I don't think it's a high priority change. I made the change here because I noticed Pylance type checking complaining, but didn't comb the rest of the repo

If you agree, I'd like to do that at some point down the line

dataset
for dataset in self.list_remote_datasets(
team_slug=parsed_dataset_identifier.team_slug
Expand Down
140 changes: 135 additions & 5 deletions darwin/dataset/remote_dataset_v2.py
Expand Up @@ -11,6 +11,7 @@
Union,
)

from pydantic import ValidationError
from requests.models import Response

from darwin.dataset import RemoteDataset
Expand All @@ -23,11 +24,20 @@
UploadHandlerV2,
)
from darwin.dataset.utils import (
chunk_items,
get_external_file_name,
get_external_file_type,
is_relative_to,
parse_external_file_path,
)
from darwin.datatypes import AnnotationFile, ItemId, ObjectStore, PathLike
from darwin.datatypes import (
AnnotationFile,
ItemId,
ObjectStore,
PathLike,
StorageKeyDictModel,
StorageKeyListModel,
)
from darwin.exceptions import NotFound, UnknownExportVersion
from darwin.exporter.formats.darwin import build_image_annotation
from darwin.item import DatasetItem
Expand Down Expand Up @@ -562,12 +572,12 @@ def register(
self,
object_store: ObjectStore,
storage_keys: List[str],
fps: Optional[str] = None,
fps: Optional[Union[str, float]] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
"""
Register files in the dataset.
Register files in the dataset in a single slot.

Parameters
----------
Expand All @@ -586,7 +596,21 @@ def register(
-------
Dict[str, List[str]]
A dictionary with the list of registered files.

Raises
------
ValueError
If ``storage_keys`` is not a list of strings.
TypeError
If the file type is not supported.
"""
try:
StorageKeyListModel(storage_keys=storage_keys)
except ValidationError as e:
print(
f"Error validating storage keys: {e}\n\nPlease make sure your storage keys are a list of strings"
)
raise e
items = []
for storage_key in storage_keys:
file_type = get_external_file_type(storage_key)
Expand All @@ -610,9 +634,115 @@ def register(

# Do not register more than 500 items in a single request
chunk_size = 500
chunked_items = (
items[i : i + chunk_size] for i in range(0, len(items), chunk_size)
chunked_items = chunk_items(items, chunk_size)
print(f"Registering {len(items)} items in chunks of {chunk_size} items...")
results = {
"registered": [],
"blocked": [],
}

for chunk in chunked_items:
payload = {
"items": chunk,
"dataset_slug": self.slug,
"storage_slug": object_store.name,
}
print(f"Registering {len(chunk)} items...")
response = self.client.api_v2.register_items(payload, team_slug=self.team)
for item in json.loads(response.text)["items"]:
item_info = f"Item {item['name']} registered with item ID {item['id']}"
results["registered"].append(item_info)
for item in json.loads(response.text)["blocked_items"]:
item_info = f"Item {item['name']} was blocked for the reason: {item['slots'][0]['reason']}"
results["blocked"].append(item_info)
print(
f"{len(results['registered'])} of {len(storage_keys)} items registered successfully"
)
if results["blocked"]:
print("The following items were blocked:")
for item in results["blocked"]:
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results

def register_multi_slotted(
self,
object_store: ObjectStore,
storage_keys: Dict[str, List[str]],
fps: Optional[Union[str, float]] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
"""
Register files in the dataset in multiple slots.

Parameters
----------
object_store : ObjectStore
Object store to use for the registration.
storage_keys : Dict[str, List[str]
Storage keys to register. The keys are the item names and the values are lists of storage keys.
fps : Optional[str], default: None
When the uploading file is a video, specify its framerate.
multi_planar_view : bool, default: False
Uses multiplanar view when uploading files.
preserve_folders : bool, default: False
Specify whether or not to preserve folder paths when uploading

Returns
-------
Dict[str, List[str]]
A dictionary with the list of registered files.

Raises
------
ValueError
If ``storage_keys`` is not a dictionary with keys as item names and values as lists of storage keys.
TypeError
If the file type is not supported.
"""

try:
StorageKeyDictModel(storage_keys=storage_keys)
except ValidationError as e:
print(
f"Error validating storage keys: {e}\n\nPlease make sure your storage keys are a dictionary with keys as item names and values as lists of storage keys"
)
raise e
items = []
for item in storage_keys:
slots = []
for storage_key in storage_keys[item]:
file_name = get_external_file_name(storage_key)
file_type = get_external_file_type(storage_key)
if not file_type:
raise TypeError(
f"Unsupported file type for the following storage key: {storage_key}.\nPlease make sure your storage key ends with one of the supported extensions:\n{SUPPORTED_EXTENSIONS}"
)
slot = {
"slot_name": file_name,
"type": file_type,
"storage_key": storage_key,
"file_name": file_name,
}
if fps and file_type == "video":
slot["fps"] = fps
if multi_planar_view and file_type == "dicom":
slot["extract_views"] = "true"
slots.append(slot)
items.append(
{
"slots": slots,
"name": item,
"path": parse_external_file_path(
storage_keys[item][0], preserve_folders
),
}
)

# Do not register more than 500 items in a single request
chunk_size = 500
chunked_items = chunk_items(items, chunk_size)
print(f"Registering {len(items)} items in chunks of {chunk_size} items...")
results = {
"registered": [],
Expand Down
38 changes: 38 additions & 0 deletions darwin/dataset/utils.py
Expand Up @@ -893,3 +893,41 @@ def parse_external_file_path(storage_key: str, preserve_folders: bool) -> str:
if not preserve_folders:
return "/"
return "/" + "/".join(storage_key.split("/")[:-1])


def get_external_file_name(storage_key: str) -> str:
"""
Returns the name of the file given a storage key.

Parameters
----------
storage_key : str
The storage key to get the file name from.

Returns
-------
str
The name of the file.
"""
if "/" not in storage_key:
return storage_key
return storage_key.split("/")[-1]


def chunk_items(items: List[Any], chunk_size: int = 500) -> Iterator[List[Any]]:
"""
Splits the list of items into chunks of specified size.

Parameters
----------
items : List[Any]
The list of items to split.
chunk_size : int, default: 500
The size of each chunk.

Returns
-------
Iterator[List[Any]]
An iterator that yields lists of items, each of length ``chunk_size``.
"""
return (items[i : i + chunk_size] for i in range(0, len(items), chunk_size))
10 changes: 10 additions & 0 deletions darwin/datatypes.py
Expand Up @@ -17,6 +17,8 @@
Union,
)

from pydantic import BaseModel

try:
from numpy.typing import NDArray
except ImportError:
Expand Down Expand Up @@ -1494,3 +1496,11 @@ def __str__(self) -> str:

def __repr__(self) -> str:
return f"ObjectStore(name={self.name}, prefix={self.prefix}, readonly={self.readonly}, provider={self.provider})"


class StorageKeyDictModel(BaseModel):
storage_keys: Dict[str, List[str]]


class StorageKeyListModel(BaseModel):
storage_keys: List[str]