/
utils.py
1443 lines (1228 loc) · 43.6 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Contains several unrelated utility functions used across the SDK.
"""
import platform
import re
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
import deprecation
import json_stream
import numpy as np
import orjson as json
import requests
from json_stream.base import PersistentStreamingJSONList, PersistentStreamingJSONObject
from jsonschema import validators
from requests import Response
from rich.progress import ProgressType, track
from upolygon import draw_polygon
import darwin.datatypes as dt
from darwin.config import Config
from darwin.exceptions import (
MissingSchema,
OutdatedDarwinJSONFormat,
UnrecognizableFileEncoding,
UnsupportedFileType,
)
from darwin.future.data_objects.properties import SelectedProperty
from darwin.version import __version__
if TYPE_CHECKING:
from darwin.client import Client
SUPPORTED_IMAGE_EXTENSIONS = [
".png",
".jpeg",
".jpg",
".jfif",
".tif",
".tiff",
".bmp",
".svs",
".webp",
".JPEG",
".JPG",
]
SUPPORTED_VIDEO_EXTENSIONS = [
".avi",
".bpm",
".dcm",
".mov",
".mp4",
".pdf",
".nii",
".nii.gz",
".ndpi",
]
SUPPORTED_EXTENSIONS = SUPPORTED_IMAGE_EXTENSIONS + SUPPORTED_VIDEO_EXTENSIONS
_darwin_schema_cache = {}
def is_extension_allowed_by_filename(filename: str) -> bool:
"""
Returns whether or not the given video or image extension is allowed.
Parameters
----------
filename : str
The filename.
Returns
-------
bool
Whether or not the given extension of the filename is allowed.
"""
return any(filename.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS)
@deprecation.deprecated(deprecated_in="0.8.4", current_version=__version__)
def is_extension_allowed(extension: str) -> bool:
"""
Returns whether or not the given extension is allowed.
@Deprecated. Use is_extension_allowed_by_filename instead, and pass full filename.
This is due to the fact that some extensions now include multiple dots, e.g. .nii.gz
Parameters
----------
extension : str
The extension.
Returns
-------
bool
Whether or not the given extension is allowed.
"""
return extension.lower() in SUPPORTED_EXTENSIONS
def is_image_extension_allowed_by_filename(filename: str) -> bool:
"""
Returns whether or not the given image extension is allowed.
Parameters
----------
filename : str
The image extension.
Returns
-------
bool
Whether or not the given extension is allowed.
"""
return any(filename.lower().endswith(ext) for ext in SUPPORTED_IMAGE_EXTENSIONS)
@deprecation.deprecated(deprecated_in="0.8.4", current_version=__version__)
def is_image_extension_allowed(extension: str) -> bool:
"""
Returns whether or not the given image extension is allowed.
Parameters
----------
extension : str
The image extension.
Returns
-------
bool
Whether or not the given extension is allowed.
"""
return extension.lower() in SUPPORTED_IMAGE_EXTENSIONS
def is_video_extension_allowed_by_filename(extension: str) -> bool:
"""
Returns whether or not the given image extension is allowed.
Parameters
----------
extension : str
The image extension.
Returns
-------
bool
Whether or not the given extension is allowed.
"""
return any(extension.lower().endswith(ext) for ext in SUPPORTED_VIDEO_EXTENSIONS)
@deprecation.deprecated(deprecated_in="0.8.4", current_version=__version__)
def is_video_extension_allowed(extension: str) -> bool:
"""
Returns whether or not the given video extension is allowed.
Parameters
----------
extension : str
The video extension.
Returns
-------
bool
Whether or not the given extension is allowed.
"""
return extension.lower() in SUPPORTED_VIDEO_EXTENSIONS
def urljoin(*parts: str) -> str:
"""
Take as input an unpacked list of strings and joins them to form an URL.
Parameters
----------
parts : str
The list of strings to form the url.
Returns
-------
str
The url.
"""
return "/".join(part.strip("/") for part in parts)
def is_project_dir(project_path: Path) -> bool:
"""
Verifies if the directory is a project from Darwin by inspecting its structure.
Parameters
----------
project_path : Path
Directory to examine
Returns
-------
bool
Is the directory a project from Darwin?
"""
return (project_path / "releases").exists() and (project_path / "images").exists()
def get_progress_bar(
array: List[dt.AnnotationFile], description: Optional[str] = None
) -> Iterable[ProgressType]:
"""
Get a rich a progress bar for the given list of annotation files.
Parameters
----------
array : List[dt.AnnotationFile]
The list of annotation files.
description : Optional[str], default: None
A description to show above the progress bar.
Returns
-------
Iterable[ProgressType]
An iterable of ``ProgressType`` to show a progress bar.
"""
if description:
return track(array, description=description)
return track(array)
def prompt(msg: str, default: Optional[str] = None) -> str:
"""
Prompt the user on a CLI to input a message.
Parameters
----------
msg : str
Message to print.
default : Optional[str], default: None
Default values which is put between [] when the user is prompted.
Returns
-------
str
The input from the user or the default value provided as parameter if user does not provide
one.
"""
if default:
msg = f"{msg} [{default}]: "
else:
msg = f"{msg}: "
result = input(msg)
if not result and default:
return default
return result
def find_files(
files: List[dt.PathLike],
*,
files_to_exclude: List[dt.PathLike] = [],
recursive: bool = True,
) -> List[Path]:
"""
Retrieve a list of all files belonging to supported extensions. The exploration can be made
recursive and a list of files can be excluded if desired.
Parameters
----------
files: List[dt.PathLike]
List of files that will be filtered with the supported file extensions and returned.
files_to_exclude : List[dt.PathLike]
List of files to exclude from the search.
recursive : bool
Flag for recursive search.
Returns
-------
List[Path]
List of all files belonging to supported extensions. Can't return None.
"""
found_files: List[Path] = []
pattern = "**/*" if recursive else "*"
for f in files:
path = Path(f)
if path.is_dir():
found_files.extend(
[
path_object
for path_object in path.glob(pattern)
if is_extension_allowed_by_filename(str(path_object))
]
)
elif is_extension_allowed_by_filename(str(path)):
found_files.append(path)
else:
raise UnsupportedFileType(path)
files_to_exclude_full_paths = [str(Path(f)) for f in files_to_exclude]
return [f for f in found_files if str(f) not in files_to_exclude_full_paths]
def secure_continue_request() -> bool:
"""
Asks for explicit approval from the user. Empty string not accepted.
Returns
-------
bool
True if the user wishes to continue, False otherwise.
"""
return input("Do you want to continue? [y/N] ") in ["Y", "y"]
def persist_client_configuration(
client: "Client",
default_team: Optional[str] = None,
config_path: Optional[Path] = None,
) -> Config:
"""
Authenticate user against the server and creates a configuration file for him/her.
Parameters
----------
client : Client
Client to take the configurations from.
default_team : Optional[str], default: None
The default team for the user.
config_path : Optional[Path], default: None
Specifies where to save the configuration file.
Returns
-------
Config
A configuration object to handle YAML files.
"""
if not config_path:
config_path = Path.home() / ".darwin" / "config.yaml"
config_path.parent.mkdir(exist_ok=True)
team_config: Optional[dt.Team] = client.config.get_default_team()
if not team_config:
raise ValueError("Unable to get default team.")
config: Config = Config(config_path)
config.set_team(
team=team_config.slug,
api_key=team_config.api_key,
datasets_dir=team_config.datasets_dir,
)
config.set_global(
api_endpoint=client.url, base_url=client.base_url, default_team=default_team
)
return config
def _get_local_filename(metadata: Dict[str, Any]) -> str:
if "original_filename" in metadata:
return metadata["original_filename"]
else:
return metadata["filename"]
def _get_schema(data: dict) -> Optional[dict]:
version = _parse_version(data)
schema_url = data.get("schema_ref") or _default_schema(version)
if not schema_url:
return None
if schema_url not in _darwin_schema_cache:
response = requests.get(schema_url)
response.raise_for_status()
schema = response.json()
_darwin_schema_cache[schema_url] = schema
return _darwin_schema_cache[schema_url]
def validate_file_against_schema(path: Path) -> List:
data, _ = load_data_from_file(path)
return validate_data_against_schema(data)
def validate_data_against_schema(data) -> List:
try:
schema = _get_schema(data)
except requests.exceptions.RequestException as e:
raise MissingSchema(f"Error retrieving schema from url: {e}")
if not schema:
raise MissingSchema("Schema not found")
validator = validators.Draft202012Validator(schema)
errors = list(validator.iter_errors(data))
return errors
def attempt_decode(path: Path) -> dict:
try:
with path.open() as infile:
data = json.loads(infile.read())
return data
except Exception:
pass
encodings = ["utf-8", "utf-16", "utf-32", "ascii"]
for encoding in encodings:
try:
with path.open(encoding=encoding) as infile:
data = json.loads(infile.read())
return data
except Exception:
continue
raise UnrecognizableFileEncoding(
f"Unable to load file {path} with any encodings: {encodings}"
)
def load_data_from_file(path: Path) -> Tuple[dict, dt.AnnotationFileVersion]:
data = attempt_decode(path)
version = _parse_version(data)
return data, version
def parse_darwin_json(
path: Path, count: Optional[int] = None
) -> Optional[dt.AnnotationFile]:
"""
Parses the given JSON file in v7's darwin proprietary format. Works for images, split frame
videos (treated as images) and playback videos.
Parameters
----------
path : Path
Path to the file to parse.
count : Optional[int]
Optional count parameter. Used only if the 's image sequence is None.
Returns
-------
Optional[dt.AnnotationFile]
An AnnotationFile with the information from the parsed JSON file, or None, if there were no
annotations in the JSON.
Raises
------
OutdatedDarwinJSONFormat
If the given darwin video JSON file is missing the 'width' and 'height' keys in the 'image'
dictionary.
"""
path = Path(path)
data, version = load_data_from_file(path)
if "annotations" not in data:
return None
if version.major == 2:
return _parse_darwin_v2(path, data)
else:
if "fps" in data["image"] or "frame_count" in data["image"]:
return _parse_darwin_video(path, data, count)
else:
return _parse_darwin_image(path, data, count)
def stream_darwin_json(path: Path) -> PersistentStreamingJSONObject:
"""
Returns a Darwin JSON file as a persistent stream. This allows for parsing large files without
loading them entirely into memory.
Parameters
----------
path : Path
Path to the file to parse.
Returns
-------
PersistentStreamingJSONObject
A stream of the JSON file.
"""
with path.open() as infile:
return json_stream.load(infile, persistent=True)
def get_image_path_from_stream(
darwin_json: PersistentStreamingJSONObject,
images_dir: Path,
with_folders: bool,
annotation_filepath: Path,
) -> Path:
"""
Returns the path to the image file associated with the given darwin json file.
Compatible with Darwin JSON V2, as well as releases in folders and flat structures.
Parameters
----------
darwin_json : PersistentStreamingJSONObject
A stream of the JSON file.
images_dir : Path
Path to the directory containing the images.
with_folders: bool
Flag to determine if the release was pulled with or without folders.
Returns
-------
Path
Path to the image file.
"""
try:
if not with_folders:
return images_dir / Path(darwin_json["item"]["name"])
else:
return (
images_dir
/ (Path(darwin_json["item"]["path"].lstrip("/\\")))
/ Path(darwin_json["item"]["name"])
)
except OSError:
# Load in the JSON as normal
darwin_json = parse_darwin_json(path=annotation_filepath)
if not with_folders:
return images_dir / Path(darwin_json.filename)
else:
return images_dir / Path(darwin_json.full_path.lstrip("/\\"))
def is_stream_list_empty(json_list: PersistentStreamingJSONList) -> bool:
try:
json_list[0]
except IndexError:
return True
return False
def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile:
item = data["item"]
item_source = item.get("source_info", {})
slots: List[dt.Slot] = list(
filter(None, map(_parse_darwin_slot, item.get("slots", [])))
)
annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(
data
)
annotation_classes: Set[dt.AnnotationClass] = {
annotation.annotation_class for annotation in annotations
}
if len(slots) == 0:
annotation_file = dt.AnnotationFile(
version=_parse_version(data),
path=path,
filename=item["name"],
item_id=item.get("source_info", {}).get("item_id", None),
dataset_name=item.get("source_info", {})
.get("dataset", {})
.get("name", None),
annotation_classes=annotation_classes,
annotations=annotations,
is_video=False,
image_width=None,
image_height=None,
image_url=None,
image_thumbnail_url=None,
workview_url=item_source.get("workview_url", None),
seq=0,
frame_urls=None,
remote_path=item["path"],
slots=slots,
)
else:
slot = slots[0]
annotation_file = dt.AnnotationFile(
version=_parse_version(data),
path=path,
filename=item["name"],
item_id=item.get("source_info", {}).get("item_id", None),
dataset_name=item.get("source_info", {})
.get("dataset", {})
.get("name", None),
annotation_classes=annotation_classes,
annotations=annotations,
is_video=slot.frame_urls is not None or slot.frame_manifest is not None,
image_width=slot.width,
image_height=slot.height,
image_url=(
None
if len(slot.source_files or []) == 0
else slot.source_files[0]["url"]
),
image_thumbnail_url=slot.thumbnail_url,
workview_url=item_source.get("workview_url", None),
seq=0,
frame_urls=slot.frame_urls,
remote_path=item["path"],
slots=slots,
frame_count=slot.frame_count,
)
return annotation_file
def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot:
return dt.Slot(
name=data["slot_name"],
type=data["type"],
width=data.get("width"),
height=data.get("height"),
source_files=data.get("source_files", []),
thumbnail_url=data.get("thumbnail_url"),
frame_count=data.get("frame_count"),
frame_urls=data.get("frame_urls"),
fps=data.get("fps"),
metadata=data.get("metadata"),
segments=data.get("segments", []),
frame_manifest=data.get("frame_manifests"),
)
def _parse_darwin_image(
path: Path, data: Dict[str, Any], count: Optional[int]
) -> dt.AnnotationFile:
annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(
data
)
annotation_classes: Set[dt.AnnotationClass] = {
annotation.annotation_class for annotation in annotations
}
slot = dt.Slot(
name=None,
type="image",
source_files=[
{
"url": data["image"].get("url"),
"file_name": _get_local_filename(data["image"]),
}
],
thumbnail_url=data["image"].get("thumbnail_url"),
width=data["image"].get("width"),
height=data["image"].get("height"),
metadata=data["image"].get("metadata"),
)
annotation_file = dt.AnnotationFile(
path=path,
filename=_get_local_filename(data["image"]),
annotation_classes=annotation_classes,
annotations=annotations,
is_video=False,
image_width=data["image"].get("width"),
image_height=data["image"].get("height"),
image_url=data["image"].get("url"),
workview_url=data["image"].get("workview_url"),
seq=data["image"].get("seq", count),
frame_urls=None,
remote_path=data["image"].get("path", "/"),
slots=[],
image_thumbnail_url=data["image"].get("thumbnail_url"),
)
annotation_file.slots.append(slot)
return annotation_file
def _parse_darwin_video(
path: Path, data: Dict[str, Any], count: Optional[int]
) -> dt.AnnotationFile:
annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(
data
)
annotation_classes: Set[dt.AnnotationClass] = {
annotation.annotation_class for annotation in annotations
}
if "width" not in data["image"] or "height" not in data["image"]:
raise OutdatedDarwinJSONFormat(
"Missing width/height in video, please re-export"
)
slot = dt.Slot(
name=None,
type="video",
source_files=[
{
"url": data["image"].get("url"),
"file_name": _get_local_filename(data["image"]),
}
],
thumbnail_url=data["image"].get("thumbnail_url"),
width=data["image"].get("width"),
height=data["image"].get("height"),
frame_count=data["image"].get("frame_count"),
frame_urls=data["image"].get("frame_urls"),
fps=data["image"].get("fps"),
metadata=data["image"].get("metadata"),
)
annotation_file = dt.AnnotationFile(
path=path,
filename=_get_local_filename(data["image"]),
annotation_classes=annotation_classes,
annotations=annotations,
is_video=True,
image_width=data["image"].get("width"),
image_height=data["image"].get("height"),
image_url=data["image"].get("url"),
workview_url=data["image"].get("workview_url"),
seq=data["image"].get("seq", count),
frame_urls=data["image"].get("frame_urls"),
remote_path=data["image"].get("path", "/"),
slots=[],
image_thumbnail_url=data["image"].get("thumbnail_url"),
)
annotation_file.slots.append(slot)
return annotation_file
def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotation]:
slot_names = parse_slot_names(annotation)
name: str = annotation["name"]
main_annotation: Optional[dt.Annotation] = None
# Darwin JSON 2.0 representation of complex polygons
if (
"polygon" in annotation
and "paths" in annotation["polygon"]
and len(annotation["polygon"]["paths"]) > 1
):
bounding_box = annotation.get("bounding_box")
paths = annotation["polygon"]["paths"]
main_annotation = dt.make_complex_polygon(
name, paths, bounding_box, slot_names=slot_names
)
# Darwin JSON 2.0 representation of simple polygons
elif (
"polygon" in annotation
and "paths" in annotation["polygon"]
and len(annotation["polygon"]["paths"]) == 1
):
bounding_box = annotation.get("bounding_box")
paths = annotation["polygon"]["paths"]
main_annotation = dt.make_polygon(
name, paths[0], bounding_box, slot_names=slot_names
)
# Darwin JSON 1.0 representation of complex and simple polygons
elif "polygon" in annotation:
bounding_box = annotation.get("bounding_box")
if "additional_paths" in annotation["polygon"]:
paths = [annotation["polygon"]["path"]] + annotation["polygon"][
"additional_paths"
]
main_annotation = dt.make_complex_polygon(
name, paths, bounding_box, slot_names=slot_names
)
else:
main_annotation = dt.make_polygon(
name, annotation["polygon"]["path"], bounding_box, slot_names=slot_names
)
# Darwin JSON 1.0 representation of complex polygons
elif "complex_polygon" in annotation:
bounding_box = annotation.get("bounding_box")
if isinstance(annotation["complex_polygon"]["path"][0], list):
paths = annotation["complex_polygon"]["path"]
else:
paths = [annotation["complex_polygon"]["path"]]
if "additional_paths" in annotation["complex_polygon"]:
paths.extend(annotation["complex_polygon"]["additional_paths"])
main_annotation = dt.make_complex_polygon(
name, paths, bounding_box, slot_names=slot_names
)
elif "bounding_box" in annotation:
bounding_box = annotation["bounding_box"]
main_annotation = dt.make_bounding_box(
name,
bounding_box["x"],
bounding_box["y"],
bounding_box["w"],
bounding_box["h"],
slot_names=slot_names,
)
elif "tag" in annotation:
main_annotation = dt.make_tag(name, slot_names=slot_names)
elif "line" in annotation:
main_annotation = dt.make_line(
name, annotation["line"]["path"], slot_names=slot_names
)
elif "keypoint" in annotation:
main_annotation = dt.make_keypoint(
name,
annotation["keypoint"]["x"],
annotation["keypoint"]["y"],
slot_names=slot_names,
)
elif "ellipse" in annotation:
main_annotation = dt.make_ellipse(
name, annotation["ellipse"], slot_names=slot_names
)
elif "cuboid" in annotation:
main_annotation = dt.make_cuboid(
name, annotation["cuboid"], slot_names=slot_names
)
elif "skeleton" in annotation:
main_annotation = dt.make_skeleton(
name, annotation["skeleton"]["nodes"], slot_names=slot_names
)
elif "table" in annotation:
main_annotation = dt.make_table(
name,
annotation["table"]["bounding_box"],
annotation["table"]["cells"],
slot_names=slot_names,
)
elif "simple_table" in annotation:
main_annotation = dt.make_simple_table(
name,
annotation["simple_table"]["bounding_box"],
annotation["simple_table"]["col_offsets"],
annotation["simple_table"]["row_offsets"],
slot_names=slot_names,
)
elif "string" in annotation:
main_annotation = dt.make_string(
name, annotation["string"]["sources"], slot_names=slot_names
)
elif "graph" in annotation:
main_annotation = dt.make_graph(
name,
annotation["graph"]["nodes"],
annotation["graph"]["edges"],
slot_names=slot_names,
)
elif "mask" in annotation:
main_annotation = dt.make_mask(name, slot_names=slot_names)
elif "raster_layer" in annotation:
raster_layer = annotation["raster_layer"]
main_annotation = dt.make_raster_layer(
name,
raster_layer["mask_annotation_ids_mapping"],
raster_layer["total_pixels"],
raster_layer["dense_rle"],
slot_names=slot_names,
)
if not main_annotation:
print(f"[WARNING] Unsupported annotation type: '{annotation.keys()}'")
return None
if "id" in annotation:
main_annotation.id = annotation["id"]
if "instance_id" in annotation:
main_annotation.subs.append(
dt.make_instance_id(annotation["instance_id"]["value"])
)
if "attributes" in annotation:
main_annotation.subs.append(dt.make_attributes(annotation["attributes"]))
if "text" in annotation:
main_annotation.subs.append(dt.make_text(annotation["text"]["text"]))
if "inference" in annotation:
main_annotation.subs.append(
dt.make_opaque_sub("inference", annotation["inference"])
)
if "directional_vector" in annotation:
main_annotation.subs.append(
dt.make_opaque_sub("directional_vector", annotation["directional_vector"])
)
if "measures" in annotation:
main_annotation.subs.append(
dt.make_opaque_sub("measures", annotation["measures"])
)
if "auto_annotate" in annotation:
main_annotation.subs.append(
dt.make_opaque_sub("auto_annotate", annotation["auto_annotate"])
)
if annotation.get("annotators") is not None:
main_annotation.annotators = _parse_annotators(annotation["annotators"])
if annotation.get("reviewers") is not None:
main_annotation.reviewers = _parse_annotators(annotation["reviewers"])
if "properties" in annotation:
main_annotation.properties = _parse_properties(annotation["properties"])
return main_annotation
def _parse_darwin_video_annotation(annotation: dict) -> Optional[dt.VideoAnnotation]:
name = annotation["name"]
frame_annotations = {}
keyframes: Dict[int, bool] = {}
frames = {**annotation.get("frames", {}), **annotation.get("sections", {})}
for f, frame in frames.items():
frame_annotations[int(f)] = _parse_darwin_annotation(
{**frame, **{"name": name, "id": annotation.get("id", None)}}
)
keyframes[int(f)] = frame.get("keyframe", False)
if not frame_annotations or None in frame_annotations.values():
return None
main_annotation = dt.make_video_annotation(
frame_annotations,
keyframes,
annotation.get("ranges", annotation.get("segments", [])),
annotation.get("interpolated", False),
slot_names=parse_slot_names(annotation),
properties=_parse_properties(annotation.get("properties", [])),
hidden_areas=annotation.get("hidden_areas", []),
)
if "id" in annotation:
main_annotation.id = annotation["id"]
if "annotators" in annotation:
main_annotation.annotators = _parse_annotators(annotation["annotators"])
if annotation.get("reviewers") is not None:
main_annotation.reviewers = _parse_annotators(annotation["reviewers"])
return main_annotation
def _parse_darwin_raster_annotation(annotation: dict) -> Optional[dt.Annotation]:
if not annotation.get("raster_layer"):
raise ValueError("Raster annotation must have a 'raster_layer' field")
id: Optional[str] = annotation.get("id")
name: Optional[str] = annotation.get("name")
raster_layer: Optional[dt.JSONFreeForm] = annotation.get("raster_layer")
slot_names: Optional[List[str]] = parse_slot_names(annotation)
if not id or not name or not raster_layer:
raise ValueError(
"Raster annotation must have an 'id', 'name' and 'raster_layer' field"
)
dense_rle, mask_annotation_ids_mapping, total_pixels = (
raster_layer.get("dense_rle", None),
raster_layer.get("mask_annotation_ids_mapping", None),
raster_layer.get("total_pixels", None),
)
if not dense_rle or not mask_annotation_ids_mapping or not total_pixels:
raise ValueError(
"Raster annotation must have a 'dense_rle', 'mask_annotation_ids_mapping' and 'total_pixels' field"
)
new_annotation = dt.Annotation(
dt.AnnotationClass(name, "raster_layer"),
{
"dense_rle": dense_rle,
"mask_annotation_ids_mapping": mask_annotation_ids_mapping,
"total_pixels": total_pixels,
},
slot_names=slot_names or [],
id=id,
)
return new_annotation
def _parse_darwin_mask_annotation(annotation: dict) -> Optional[dt.Annotation]:
id: Optional[str] = annotation.get("id")
name: Optional[str] = annotation.get("name")
mask: Optional[dt.JSONFreeForm] = annotation.get("mask")
slot_names: Optional[List[str]] = parse_slot_names(annotation)
if not id or not name or mask is None:
raise ValueError("Mask annotation must have an 'id', 'name' and 'mask' field")
if ("sparse_rle" in mask) and (mask["sparse_rle"] is not None):
raise ValueError("Mask annotation field 'sparse_rle' must contain a null value")
new_annotation = dt.Annotation(
dt.AnnotationClass(name, "mask"),
mask,
slot_names=slot_names or [],
id=id,
)
return new_annotation
def _parse_annotators(annotators: List[Dict[str, Any]]) -> List[dt.AnnotationAuthor]:
if not (hasattr(annotators, "full_name") or not hasattr(annotators, "email")):
raise AttributeError(
"JSON file must contain annotators with 'full_name' and 'email' fields"
)