Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLA-696][external] Allow registration of single-slotted read-write files from external storage #785

Merged
merged 16 commits into from Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions darwin/backend_v2.py
Expand Up @@ -237,3 +237,19 @@ def import_annotation(
return self._client._post_raw(
f"v2/teams/{team_slug}/items/{item_id}/import", payload=payload
)

@inject_default_team_slug
def register_items(self, payload: Dict[str, Any], team_slug: str) -> None:
"""
Register items from external storage.

Parameters
----------
payload: JSONDict
The payload to register items from external storage.
team_slug: str
The team slug.
"""
return self._client._post_raw(
f"/v2/teams/{team_slug}/items/register_existing", payload
)
81 changes: 81 additions & 0 deletions darwin/client.py
Expand Up @@ -38,6 +38,7 @@
from darwin.future.core.properties import update_property as update_property_future
from darwin.future.core.types.common import JSONDict
from darwin.future.data_objects.properties import FullProperty
from darwin.objectstore import ObjectStore
from darwin.utils import (
get_response_content,
has_json_content_type,
Expand Down Expand Up @@ -1054,3 +1055,83 @@ def update_property(
team_slug=team_slug or self.default_team,
params=params,
)

def get_external_storage(
self, team_slug: Optional[str] = None, name: Optional[str] = None
) -> Optional[ObjectStore]:
"""
Get an external storage connection by name.

If no name is provided, the default team's external storage connection will be returned.

Parameters
----------
team_slug: Optional[str]
The team slug.
name: Optional[str]
The name of the external storage connection.

Returns
-------
Optional[ObjectStore]
The external storage connection with the given name.
"""
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
if not team_slug:
team_slug = self.default_team

connections = self.list_external_storage_connections(team_slug)
if not connections:
raise ValueError(
f"No external storage connections found in the team: {team_slug}. Please configure one.\n\nGuidelines can be found here: https://docs.v7labs.com/docs/external-storage-configuration"
)

if name is None:
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
for connection in connections:
if connection.default:
if connection.readonly:
raise ValueError(
"The default external storage connection is read-only. darwin-py only supports read-write configuration.\n\nPlease use the REST API to register items from read-only storage: https://docs.v7labs.com/docs/registering-items-from-external-storage#read-only-registration"
)
return connection

for connection in connections:
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
if connection.name == name:
if connection.readonly:
raise ValueError(
"The selected external storage connection is read-only. darwin-py only supports read-write configuraiton.\n\nPlease use the REST API to register items from read-only storage: https://docs.v7labs.com/docs/registering-items-from-external-storage#read-only-registration"
)
return connection

raise ValueError(
f"No external storage connection found with the name: {name} in the team {team_slug}. Please configure one.\n\nGuidelines can be found at https://docs.v7labs.com/docs/external-storage-configuration"
)

def list_external_storage_connections(self, team_slug: str) -> List[ObjectStore]:
"""
Returns a list of all available external storage connections.

Parameters
----------
team_slug: str
The team slug.

Returns
-------
List[ObjectStore]
A list of all available external storage connections.
"""
response: List[Dict[str, UnknownType]] = cast(
List[Dict[str, UnknownType]],
self._get(f"/teams/{team_slug}/storage"),
)

return [
ObjectStore(
name=connection["name"],
prefix=connection["prefix"],
readonly=connection["readonly"],
provider=connection["provider"],
default=connection["default"],
)
for connection in response
]
2 changes: 1 addition & 1 deletion darwin/dataset/local_dataset.py
Expand Up @@ -137,7 +137,7 @@ def _setup_annotations_and_images(
keep_empty_annotations: bool = False,
):
# Find all the annotations and their corresponding images
with_folders = any([item.is_dir() for item in images_dir.iterdir()])
with_folders = any(item.is_dir() for item in images_dir.iterdir())
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
annotation_filepaths = get_annotation_filepaths(
release_path, annotations_dir, annotation_type, split, partition, split_type
)
Expand Down
71 changes: 69 additions & 2 deletions darwin/dataset/remote_dataset_v2.py
@@ -1,3 +1,4 @@
import json
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -21,13 +22,18 @@
UploadHandler,
UploadHandlerV2,
)
from darwin.dataset.utils import is_relative_to
from darwin.dataset.utils import (
get_external_file_type,
is_relative_to,
parse_external_file_path,
)
from darwin.datatypes import AnnotationFile, ItemId, PathLike
from darwin.exceptions import NotFound, UnknownExportVersion
from darwin.exporter.formats.darwin import build_image_annotation
from darwin.item import DatasetItem
from darwin.item_sorter import ItemSorter
from darwin.utils import find_files, urljoin
from darwin.objectstore import ObjectStore
from darwin.utils import SUPPORTED_EXTENSIONS, find_files, urljoin

if TYPE_CHECKING:
from darwin.client import Client
Expand Down Expand Up @@ -552,3 +558,64 @@ def _build_image_annotation(
self, annotation_file: AnnotationFile, team_name: str
) -> Dict[str, Any]:
return build_image_annotation(annotation_file, team_name)

def register(
self,
object_store: ObjectStore,
storage_keys: List[str],
fps: Optional[str] = None,
multi_planar_view: bool = False,
preserve_folders: bool = False,
) -> Dict[str, List[str]]:
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
items = []
for storage_key in storage_keys:
file_type = get_external_file_type(storage_key)
if not file_type:
raise TypeError(
f"Unsupported file type for the following storage key: {storage_key}.\nPlease make sure your storage key ends with one of the supported extensions:\n{SUPPORTED_EXTENSIONS}"
)
item = {
"path": parse_external_file_path(storage_key, preserve_folders),
"type": file_type,
"storage_key": storage_key,
"name": (
storage_key.split("/")[-1] if "/" in storage_key else storage_key
),
}
if fps and file_type == "video":
item["fps"] = fps
if multi_planar_view and file_type == "dicom":
item["extract_views"] = "true"
items.append(item)

# Do not register more than 500 items in a single request
chunked_items = [items[i : i + 500] for i in range(0, len(items), 500)]
print(f"Registering {len(items)} items in {len(chunked_items)} batch(es)")
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
results = {
"registered": [],
"blocked": [],
}

for chunk in chunked_items:
payload = {
"items": chunk,
"dataset_slug": self.slug,
"storage_slug": object_store.name,
}
print(f"Registering {len(chunk)} items...")
response = self.client.api_v2.register_items(payload, team_slug=self.team)
for item in json.loads(response.text)["items"]:
item_info = f"Item {item['name']} registered with item ID {item['id']}"
results["registered"].append(item_info)
for item in json.loads(response.text)["blocked_items"]:
item_info = f"Item {item['name']} was blocked for the reason: {item['slots'][0]['reason']}"
results["blocked"].append(item_info)
print(
f"{len(results['registered'])} of {len(storage_keys)} items registered successfully"
)
if results["blocked"]:
print("The following items were blocked:")
for item in results["blocked"]:
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results
65 changes: 59 additions & 6 deletions darwin/dataset/utils.py
Expand Up @@ -15,6 +15,7 @@
from darwin.importer.formats.darwin import parse_path
from darwin.utils import (
SUPPORTED_EXTENSIONS,
SUPPORTED_IMAGE_EXTENSIONS,
SUPPORTED_VIDEO_EXTENSIONS,
attempt_decode,
get_image_path_from_stream,
Expand Down Expand Up @@ -366,9 +367,11 @@ def create_polygon_object(obj, box_mode, classes=None):
"segmentation": segmentation,
"bbox": [np.min(all_px), np.min(all_py), np.max(all_px), np.max(all_py)],
"bbox_mode": box_mode,
"category_id": classes.index(obj.annotation_class.name)
if classes
else obj.annotation_class.name,
"category_id": (
classes.index(obj.annotation_class.name)
if classes
else obj.annotation_class.name
),
"iscrowd": 0,
}

Expand All @@ -380,9 +383,11 @@ def create_bbox_object(obj, box_mode, classes=None):
new_obj = {
"bbox": [bbox["x"], bbox["y"], bbox["x"] + bbox["w"], bbox["y"] + bbox["h"]],
"bbox_mode": box_mode,
"category_id": classes.index(obj.annotation_class.name)
if classes
else obj.annotation_class.name,
"category_id": (
classes.index(obj.annotation_class.name)
if classes
else obj.annotation_class.name
),
"iscrowd": 0,
}

Expand Down Expand Up @@ -840,3 +845,51 @@ def sanitize_filename(filename: str) -> str:
filename = filename.replace(char, "_")

return filename


def get_external_file_type(storage_key: str) -> str or None:
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns the type of file given a storage key.

Parameters
----------
storage_key : str
The storage key to get the type of file from.

Returns
-------
str
The type of file.
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
"""
for extension in SUPPORTED_IMAGE_EXTENSIONS:
if storage_key.endswith(extension):
return "image"
if storage_key.endswith(".pdf"):
return "pdf"
if storage_key.endswith(".dcm"):
return "dicom"
for extension in SUPPORTED_VIDEO_EXTENSIONS:
if storage_key.endswith(extension):
return "video"
return None


def parse_external_file_path(storage_key: str, preserve_folders: bool) -> str:
"""
Returns the Darwin dataset path given a storage key.

Parameters
----------
storage_key : str
The storage key to parse.
preserve_folders : bool
Whether to preserve folders or place the file in the Dataset root.

Returns
-------
str
The parsed external file path.
"""
if not preserve_folders:
return "/"
return "/" + "/".join(storage_key.split("/")[:-1])
saurbhc marked this conversation as resolved.
Show resolved Hide resolved
30 changes: 30 additions & 0 deletions darwin/objectstore.py
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
@@ -0,0 +1,30 @@
class ObjectStore:
"""
Object representing a configured conection to an external storage locaiton

Attributes:
name (str): The alias of the storage connection
prefix (str): The directory that files are written back to in the storage location
readonly (bool): Whether the storage configuration is read-only or not
self.provider (str): The cloud provider (aws, azure, or gcp)
"""

def __init__(
self,
name: str,
prefix: str,
readonly: bool,
provider: str,
default: bool,
) -> None:
self.name = name
self.prefix = prefix
self.readonly = readonly
self.provider = provider
self.default = default

def __str__(self) -> str:
return f"Storage configuration:\n- Name: {self.name}\n- Prefix: {self.prefix}\n- Readonly: {self.readonly}\n- Provider: {self.provider}\n- Default: {self.default}"

def __repr__(self) -> str:
return f"ObjectStore(name={self.name}, prefix={self.prefix}, readonly={self.readonly}, provider={self.provider})"
4 changes: 2 additions & 2 deletions tests/darwin/cli_functions_test.py
Expand Up @@ -47,7 +47,7 @@ def test_default_non_verbose(
remote_dataset: RemoteDataset,
request_upload_endpoint: str,
):
request_upload_response = response = {
request_upload_response = {
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved
"blocked_items": [
{
"id": "3b241101-e2bb-4255-8caf-4136c566a964",
Expand Down Expand Up @@ -150,7 +150,7 @@ def test_with_verbose_flag(
remote_dataset: RemoteDataset,
request_upload_endpoint: str,
):
request_upload_response = response = {
request_upload_response = {
"blocked_items": [
{
"id": "3b241101-e2bb-4255-8caf-4136c566a964",
Expand Down