-
Notifications
You must be signed in to change notification settings - Fork 40
/
importer.py
1346 lines (1154 loc) · 51.4 KB
/
importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from collections import defaultdict
from logging import getLogger
from multiprocessing import cpu_count
from pathlib import Path
from time import perf_counter
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
from darwin.datatypes import AnnotationFile, Property, parse_property_classes
from darwin.future.data_objects.properties import (
FullProperty,
PropertyType,
PropertyValue,
SelectedProperty,
)
from darwin.item import DatasetItem
from darwin.path_utils import is_properties_enabled, parse_metadata
Unknown = Any # type: ignore
from tqdm import tqdm
if TYPE_CHECKING:
from darwin.client import Client
from darwin.dataset.remote_dataset import RemoteDataset
import deprecation
from rich.console import Console
from rich.progress import track
from rich.theme import Theme
import darwin.datatypes as dt
from darwin.datatypes import PathLike
from darwin.exceptions import IncompatibleOptions, RequestEntitySizeExceeded
from darwin.utils import secure_continue_request
from darwin.utils.flatten_list import flatten_list
from darwin.version import __version__
logger = getLogger(__name__)
try:
from mpire import WorkerPool
MPIRE_AVAILABLE = True
except ImportError:
MPIRE_AVAILABLE = False
# Classes missing import support on backend side
UNSUPPORTED_CLASSES = ["string", "graph"]
# Classes that are defined on team level automatically and available in all datasets
GLOBAL_CLASSES = ["__raster_layer__"]
DEPRECATION_MESSAGE = """
This function is going to be turned into private. This means that breaking
changes in its interface and implementation are to be expected. We encourage using ``import_annotations``
instead of calling this low-level function directly.
"""
@deprecation.deprecated( # type:ignore
deprecated_in="0.7.12",
removed_in="0.8.0",
current_version=__version__,
details=DEPRECATION_MESSAGE,
)
def build_main_annotations_lookup_table(
annotation_classes: List[Dict[str, Unknown]]
) -> Dict[str, Unknown]:
MAIN_ANNOTATION_TYPES = [
"bounding_box",
"cuboid",
"ellipse",
"keypoint",
"line",
"link",
"polygon",
"skeleton",
"tag",
"string",
"table",
"simple_table",
"graph",
"mask",
"raster_layer",
]
lookup: Dict[str, Unknown] = {}
for cls in annotation_classes:
for annotation_type in cls["annotation_types"]:
if annotation_type in MAIN_ANNOTATION_TYPES:
if annotation_type not in lookup:
lookup[annotation_type] = {}
lookup[annotation_type][cls["name"]] = cls["id"]
return lookup
@deprecation.deprecated( # type:ignore
deprecated_in="0.7.12",
removed_in="0.8.0",
current_version=__version__,
details=DEPRECATION_MESSAGE,
)
def find_and_parse( # noqa: C901
importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]],
file_paths: List[PathLike],
console: Optional[Console] = None,
use_multi_cpu: bool = True,
cpu_limit: int = 1,
) -> Optional[Iterable[dt.AnnotationFile]]:
is_console = console is not None
logger = getLogger(__name__)
def perf_time(reset: bool = False) -> Generator[float, float, None]:
start = perf_counter()
yield start
while True:
if reset:
start = perf_counter()
yield perf_counter() - start
def maybe_console(*args: Union[str, int, float]) -> None:
if console is not None:
console.print(*[f"[{str(next(perf_time()))} seconds elapsed]", *args])
else:
logger.info(*[f"[{str(next(perf_time()))}]", *args])
maybe_console("Parsing files... ")
files: List[Path] = _get_files_for_parsing(file_paths)
maybe_console(f"Found {len(files)} files")
if use_multi_cpu and MPIRE_AVAILABLE and cpu_limit > 1:
maybe_console(f"Using multiprocessing with {cpu_limit} workers")
try:
with WorkerPool(cpu_limit) as pool:
parsed_files = pool.map(importer, files, progress_bar=is_console)
except KeyboardInterrupt:
maybe_console("Keyboard interrupt. Stopping.")
return None
except Exception as e:
maybe_console(f"Error: {e}")
return None
else:
maybe_console("Using single CPU")
parsed_files = list(map(importer, tqdm(files) if is_console else files))
maybe_console("Finished.")
# Sometimes we have a list of lists of AnnotationFile, sometimes we have a list of AnnotationFile
# We flatten the list of lists
if isinstance(parsed_files, list):
if isinstance(parsed_files[0], list):
parsed_files = [item for sublist in parsed_files for item in sublist]
else:
parsed_files = [parsed_files]
parsed_files = [f for f in parsed_files if f is not None]
return parsed_files
def _get_files_for_parsing(file_paths: List[PathLike]) -> List[Path]:
packed_files = [
filepath.glob("**/*") if filepath.is_dir() else [filepath]
for filepath in map(Path, file_paths)
]
return [file for files in packed_files for file in files]
@deprecation.deprecated( # type:ignore
deprecated_in="0.7.12",
removed_in="0.8.0",
current_version=__version__,
details=DEPRECATION_MESSAGE,
)
def build_attribute_lookup(dataset: "RemoteDataset") -> Dict[str, Unknown]:
attributes: List[Dict[str, Unknown]] = dataset.fetch_remote_attributes()
lookup: Dict[str, Unknown] = {}
for attribute in attributes:
class_id = attribute["class_id"]
if class_id not in lookup:
lookup[class_id] = {}
lookup[class_id][attribute["name"]] = attribute["id"]
return lookup
@deprecation.deprecated( # type:ignore
deprecated_in="0.7.12",
removed_in="0.8.0",
current_version=__version__,
details=DEPRECATION_MESSAGE,
)
def get_remote_files(
dataset: "RemoteDataset", filenames: List[str], chunk_size: int = 100
) -> Dict[str, Tuple[int, str]]:
"""
Fetches remote files from the datasets in chunks; by default 100 filenames at a time.
The output is a two-element tuple of:
- file ID
- the name of the first slot of the item
Fetching slot name is necessary here to avoid double-trip to Api downstream for remote files.
"""
remote_files = {}
for i in range(0, len(filenames), chunk_size):
chunk = filenames[i : i + chunk_size]
for remote_file in dataset.fetch_remote_files(
{"types": "image,playback_video,video_frame", "filenames": chunk}
):
slot_name = _get_slot_name(remote_file)
remote_files[remote_file.full_path] = (remote_file.id, slot_name)
return remote_files
def _get_slot_name(remote_file: DatasetItem) -> str:
slot = next(iter(remote_file.slots), {"slot_name": "0"})
if slot:
return slot["slot_name"]
else:
return "0"
def _resolve_annotation_classes(
local_annotation_classes: List[dt.AnnotationClass],
classes_in_dataset: Dict[str, Unknown],
classes_in_team: Dict[str, Unknown],
) -> Tuple[Set[dt.AnnotationClass], Set[dt.AnnotationClass]]:
local_classes_not_in_dataset: Set[dt.AnnotationClass] = set()
local_classes_not_in_team: Set[dt.AnnotationClass] = set()
for local_cls in local_annotation_classes:
local_annotation_type = (
local_cls.annotation_internal_type or local_cls.annotation_type
)
# Only add the new class if it doesn't exist remotely already
if (
local_annotation_type in classes_in_dataset
and local_cls.name in classes_in_dataset[local_annotation_type]
):
continue
# Only add the new class if it's not included in the list of the missing classes already
if local_cls.name in [
missing_class.name for missing_class in local_classes_not_in_dataset
]:
continue
if local_cls.name in [
missing_class.name for missing_class in local_classes_not_in_team
]:
continue
if (
local_annotation_type in classes_in_team
and local_cls.name in classes_in_team[local_annotation_type]
):
local_classes_not_in_dataset.add(local_cls)
else:
local_classes_not_in_team.add(local_cls)
return local_classes_not_in_dataset, local_classes_not_in_team
def _get_team_properties_annotation_lookup(client):
# get team properties -> List[FullProperty]
team_properties = client.get_team_properties()
# (property-name, annotation_class_id): FullProperty object
team_properties_annotation_lookup: Dict[
Tuple[str, Optional[int]], FullProperty
] = {}
for prop in team_properties:
team_properties_annotation_lookup[(prop.name, prop.annotation_class_id)] = prop
return team_properties_annotation_lookup
def _update_payload_with_properties(
annotations: List[Dict[str, Unknown]],
annotation_id_property_map: Dict[str, Dict[str, Dict[str, Set[str]]]],
) -> None:
"""
Updates the annotations with the properties that were created/updated during the import.
"""
if not annotation_id_property_map:
return
for annotation in annotations:
annotation_id = annotation["id"]
if annotation_id_property_map.get(annotation_id):
_map = {}
for _frame_index, _property_map in annotation_id_property_map[
annotation_id
].items():
_map[_frame_index] = {}
for prop_id, prop_val_set in dict(_property_map).items():
prop_val_list = list(prop_val_set)
_map[_frame_index][prop_id] = prop_val_list
annotation["annotation_properties"] = dict(_map)
def _import_properties(
metadata_path: Union[Path, bool],
client: "Client",
annotations: List[dt.Annotation],
annotation_class_ids_map: Dict[Tuple[str, str], str],
) -> Dict[str, Dict[str, Dict[str, Set[str]]]]:
"""
Creates/Updates missing/mismatched properties from annotation & metadata.json file to team-properties.
As the properties are created/updated, the annotation_id_property_map is updated with the new/old property ids.
^ This is used in the import-annotations payload later on.
Args:
metadata_path (Union[Path, bool]): Path object to .v7/metadata.json file
client (Client): Darwin Client object
annotations (List[dt.Annotation]): List of annotations
annotation_class_ids_map (Dict[Tuple[str, str], str]): Dict of annotation class names/types to annotation class ids
Raises:
ValueError: raise error if annotation class not present in metadata
ValueError: raise error if annotation-property not present in metadata
ValueError: raise error if property value is missing for a property that requires a value
ValueError: raise error if property value/type is different in m_prop (.v7/metadata.json) options
Returns:
Dict[str, Dict[str, Dict[str, Set[str]]]]: Dict of annotation.id to frame_index -> property id -> property val ids
"""
annotation_property_map: Dict[str, Dict[str, Dict[str, Set[str]]]] = {}
if not isinstance(metadata_path, Path):
# No properties to import
return {}
# parse metadata.json file -> list[PropertyClass]
metadata = parse_metadata(metadata_path)
metadata_property_classes = parse_property_classes(metadata)
# get team properties
team_properties_annotation_lookup = _get_team_properties_annotation_lookup(client)
# (annotation-cls-name, annotation-cls-name): PropertyClass object
metadata_classes_lookup: Set[Tuple[str, str]] = set()
# (annotation-cls-name, property-name): Property object
metadata_cls_prop_lookup: Dict[Tuple[str, str], Property] = {}
# (annotation-cls-id, property-name): Property object
metadata_cls_id_prop_lookup: Dict[Tuple[int, str], Property] = {}
for _cls in metadata_property_classes:
metadata_classes_lookup.add((_cls.name, _cls.type))
for _prop in _cls.properties or []:
metadata_cls_prop_lookup[(_cls.name, _prop.name)] = _prop
# (annotation-id): dt.Annotation object
annotation_id_map: Dict[str, dt.Annotation] = {}
create_properties: List[FullProperty] = []
update_properties: List[FullProperty] = []
for annotation in annotations:
annotation_name = annotation.annotation_class.name
annotation_type = annotation_type = (
annotation.annotation_class.annotation_internal_type
or annotation.annotation_class.annotation_type
)
annotation_name_type = (annotation_name, annotation_type)
if annotation_name_type not in annotation_class_ids_map:
continue
annotation_class_id = int(annotation_class_ids_map[annotation_name_type])
if not annotation.id:
continue
annotation_id = annotation.id
if annotation_id not in annotation_property_map:
annotation_property_map[annotation_id] = defaultdict(
lambda: defaultdict(set)
)
annotation_id_map[annotation_id] = annotation
# raise error if annotation class not present in metadata
if annotation_name_type not in metadata_classes_lookup:
raise ValueError(
f"Annotation: '{annotation_name}' not found in {metadata_path}"
)
# loop on annotation properties and check if they exist in metadata & team
for a_prop in annotation.properties or []:
a_prop: SelectedProperty
# raise error if annotation-property not present in metadata
if (annotation_name, a_prop.name) not in metadata_cls_prop_lookup:
raise ValueError(
f"Annotation: '{annotation_name}' -> Property '{a_prop.name}' not found in {metadata_path}"
)
# get metadata property
m_prop: Property = metadata_cls_prop_lookup[(annotation_name, a_prop.name)]
# update metadata-property lookup
metadata_cls_id_prop_lookup[(annotation_class_id, a_prop.name)] = m_prop
# get metadata property type
m_prop_type: PropertyType = m_prop.type
# get metadata property options
m_prop_options: List[Dict[str, str]] = m_prop.property_values or []
# check if property value is missing for a property that requires a value
if m_prop.required and not a_prop.value:
raise ValueError(
f"Annotation: '{annotation_name}' -> Property '{a_prop.name}' requires a value!"
)
# check if property and annotation class exists in team
if (
a_prop.name,
annotation_class_id,
) not in team_properties_annotation_lookup:
# check if fullproperty exists in create_properties
for full_property in create_properties:
if (
full_property.name == a_prop.name
and full_property.annotation_class_id == annotation_class_id
):
# make sure property_values is not None
if full_property.property_values is None:
full_property.property_values = []
property_values = full_property.property_values
if a_prop.value is None:
# skip creating property if property value is None
continue
# find property value in m_prop (.v7/metadata.json) options
for m_prop_option in m_prop_options:
if m_prop_option.get("value") == a_prop.value:
# check if property value exists in property_values
for prop_val in property_values:
if prop_val.value == a_prop.value:
break
else:
# update property_values with new value
full_property.property_values.append(
PropertyValue(
value=m_prop_option.get("value"), # type: ignore
color=m_prop_option.get("color"), # type: ignore
)
)
break
break
else:
property_values = []
if a_prop.value is None:
# skip creating property if property value is None
continue
# find property value in m_prop (.v7/metadata.json) options
for m_prop_option in m_prop_options:
if m_prop_option.get("value") == a_prop.value:
property_values.append(
PropertyValue(
value=m_prop_option.get("value"), # type: ignore
color=m_prop_option.get("color"), # type: ignore
)
)
break
# if it doesn't exist, create it
full_property = FullProperty(
name=a_prop.name,
type=m_prop_type, # type from .v7/metadata.json
required=m_prop.required, # required from .v7/metadata.json
description=m_prop.description
or "property-created-during-annotation-import",
slug=client.default_team,
annotation_class_id=int(annotation_class_id),
property_values=property_values,
)
create_properties.append(full_property)
continue
# check if property value is different in m_prop (.v7/metadata.json) options
for m_prop_option in m_prop_options:
if m_prop_option.get("value") == a_prop.value:
break
else:
if a_prop.value:
raise ValueError(
f"Annotation: '{annotation_name}' -> Property '{a_prop.value}' not found in .v7/metadata.json, found: {m_prop.property_values}"
)
# get team property
t_prop: FullProperty = team_properties_annotation_lookup[
(a_prop.name, annotation_class_id)
]
if a_prop.value is None:
# if property value is None, update annotation_property_map with empty set
assert t_prop.id is not None
annotation_property_map[annotation_id][str(a_prop.frame_index)][
t_prop.id
] = set()
continue
# check if property value is different in t_prop (team) options
for t_prop_val in t_prop.property_values or []:
if t_prop_val.value == a_prop.value:
break
else:
# if it is, update it
full_property = FullProperty(
id=t_prop.id,
name=a_prop.name,
type=m_prop_type,
required=m_prop.required,
description=m_prop.description
or "property-updated-during-annotation-import",
slug=client.default_team,
annotation_class_id=int(annotation_class_id),
property_values=[
PropertyValue(
value=a_prop.value,
color=m_prop_option.get("color"), # type: ignore
)
],
)
update_properties.append(full_property)
continue
assert t_prop.id is not None
assert t_prop_val.id is not None
annotation_property_map[annotation_id][str(a_prop.frame_index)][
t_prop.id
].add(t_prop_val.id)
console = Console(theme=_console_theme())
created_properties = []
if create_properties:
console.print(f"Creating {len(create_properties)} properties", style="info")
for full_property in create_properties:
console.print(
f"Creating property {full_property.name} ({full_property.type})",
style="info",
)
prop = client.create_property(
team_slug=full_property.slug, params=full_property
)
created_properties.append(prop)
updated_properties = []
if update_properties:
console.print(f"Updating {len(update_properties)} properties", style="info")
for full_property in update_properties:
console.print(
f"Updating property {full_property.name} ({full_property.type})",
style="info",
)
prop = client.update_property(
team_slug=full_property.slug, params=full_property
)
updated_properties.append(prop)
# get latest team properties
team_properties_annotation_lookup = _get_team_properties_annotation_lookup(client)
# loop over metadata_cls_id_prop_lookup, and update additional metadata property values
for (annotation_class_id, prop_name), m_prop in metadata_cls_id_prop_lookup.items():
# does the annotation-property exist in the team? if not, skip
if (prop_name, annotation_class_id) not in team_properties_annotation_lookup:
continue
# get metadata property values
m_prop_values = {
m_prop_val["value"]: m_prop_val
for m_prop_val in m_prop.property_values or []
if m_prop_val["value"]
}
# get team property
t_prop: FullProperty = team_properties_annotation_lookup[
(prop_name, annotation_class_id)
]
# get team property values
t_prop_values = [prop_val.value for prop_val in t_prop.property_values or []]
# get diff of metadata property values and team property values
extra_values = set(m_prop_values.keys()) - set(t_prop_values)
# if there are extra values in metadata, create a new FullProperty with the extra values
if extra_values:
extra_property_values = [
PropertyValue(
value=m_prop_values[extra_value].get("value"), # type: ignore
color=m_prop_values[extra_value].get("color"), # type: ignore
)
for extra_value in extra_values
]
full_property = FullProperty(
id=t_prop.id,
name=t_prop.name,
type=t_prop.type,
required=t_prop.required,
description=t_prop.description,
slug=client.default_team,
annotation_class_id=t_prop.annotation_class_id,
property_values=extra_property_values,
)
console.print(
f"Updating property {full_property.name} ({full_property.type}) with extra metadata values {extra_values}",
style="info",
)
prop = client.update_property(
team_slug=full_property.slug, params=full_property
)
# update annotation_property_map with property ids from created_properties & updated_properties
for annotation_id, _ in annotation_property_map.items():
if not annotation_id_map.get(annotation_id):
continue
annotation = annotation_id_map[annotation_id]
for a_prop in annotation.properties or []:
frame_index = str(a_prop.frame_index)
for prop in created_properties + updated_properties:
if prop.name == a_prop.name:
if a_prop.value is None:
if not annotation_property_map[annotation_id][frame_index][
prop.id
]:
annotation_property_map[annotation_id][frame_index][
prop.id
] = set()
break
# find the property-id and property-value-id in the response
for prop_val in prop.property_values or []:
if prop_val.value == a_prop.value:
annotation_property_map[annotation_id][frame_index][
prop.id
].add(prop_val.id)
break
break
return annotation_property_map
def import_annotations( # noqa: C901
dataset: "RemoteDataset",
importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]],
file_paths: List[PathLike],
append: bool,
class_prompt: bool = True,
delete_for_empty: bool = False,
import_annotators: bool = False,
import_reviewers: bool = False,
use_multi_cpu: bool = False, # Set to False to give time to resolve MP behaviours
cpu_limit: Optional[int] = None, # 0 because it's set later in logic
) -> None:
"""
Imports the given given Annotations into the given Dataset.
Parameters
----------
dataset : RemoteDataset
Dataset where the Annotations will be imported to.
importer : Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]]
Parsing module containing the logic to parse the given Annotation files given in
``files_path``. See ``importer/format`` for a list of out of supported parsers.
file_paths : List[PathLike]
A list of ``Path``'s or strings containing the Annotations we wish to import.
append : bool
If ``True`` appends the given annotations to the datasets. If ``False`` will override them.
Incompatible with ``delete-for-empty``.
class_prompt : bool
If ``False`` classes will be created and added to the datasets without requiring a user's prompt.
delete_for_empty : bool, default: False
If ``True`` will use empty annotation files to delete all annotations from the remote file.
If ``False``, empty annotation files will simply be skipped.
Only works for V2 datasets.
Incompatible with ``append``.
import_annotators : bool, default: False
If ``True`` it will import the annotators from the files to the dataset, if available.
If ``False`` it will not import the annotators.
import_reviewers : bool, default: False
If ``True`` it will import the reviewers from the files to the dataset, if .
If ``False`` it will not import the reviewers.
use_multi_cpu : bool, default: True
If ``True`` will use multiple available CPU cores to parse the annotation files.
If ``False`` will use only the current Python process, which runs in one core.
Processing using multiple cores is faster, but may slow down a machine also running other processes.
Processing with one core is slower, but will run well alongside other processes.
cpu_limit : int, default: 2 less than total cpu count
The maximum number of CPU cores to use when ``use_multi_cpu`` is ``True``.
If ``cpu_limit`` is greater than the number of available CPU cores, it will be set to the number of available cores.
If ``cpu_limit`` is less than 1, it will be set to CPU count - 2.
If ``cpu_limit`` is omitted, it will be set to CPU count - 2.
Raises
-------
ValueError
- If ``file_paths`` is not a list.
- If the application is unable to fetch any remote classes.
- If the application was unable to find/parse any annotation files.
- If the application was unable to fetch remote file list.
IncompatibleOptions
- If both ``append`` and ``delete_for_empty`` are specified as ``True``.
"""
console = Console(theme=_console_theme())
if append and delete_for_empty:
raise IncompatibleOptions(
"The options 'append' and 'delete_for_empty' cannot be used together. Use only one of them."
)
cpu_limit, use_multi_cpu = _get_multi_cpu_settings(
cpu_limit, cpu_count(), use_multi_cpu
)
if use_multi_cpu:
console.print(f"Using {cpu_limit} CPUs for parsing...", style="info")
else:
console.print("Using 1 CPU for parsing...", style="info")
if not isinstance(file_paths, list):
raise ValueError(
f"file_paths must be a list of 'Path' or 'str'. Current value: {file_paths}"
)
console.print("Fetching remote class list...", style="info")
team_classes: List[dt.DictFreeForm] = dataset.fetch_remote_classes(True)
if not team_classes:
raise ValueError("Unable to fetch remote class list.")
classes_in_dataset: dt.DictFreeForm = build_main_annotations_lookup_table(
[
cls
for cls in team_classes
if cls["available"] or cls["name"] in GLOBAL_CLASSES
]
)
classes_in_team: dt.DictFreeForm = build_main_annotations_lookup_table(
[
cls
for cls in team_classes
if not cls["available"] and cls["name"] not in GLOBAL_CLASSES
]
)
attributes = build_attribute_lookup(dataset)
console.print("Retrieving local annotations ...", style="info")
local_files = []
local_files_missing_remotely = []
# ! Other place we can use multiprocessing - hard to pass in the importer though
maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = find_and_parse(
importer, file_paths, console, use_multi_cpu, cpu_limit
)
if not maybe_parsed_files:
raise ValueError("Not able to parse any files.")
parsed_files: List[AnnotationFile] = flatten_list(list(maybe_parsed_files))
filenames: List[str] = [
parsed_file.filename for parsed_file in parsed_files if parsed_file is not None
]
console.print("Fetching remote file list...", style="info")
# This call will only filter by filename; so can return a superset of matched files across different paths
# There is logic in this function to then include paths to narrow down to the single correct matching file
remote_files: Dict[str, Tuple[int, str]] = {}
# Try to fetch files in large chunks; in case the filenames are too large and exceed the url size
# retry in smaller chunks
chunk_size = 100
while chunk_size > 0:
try:
remote_files = get_remote_files(dataset, filenames, chunk_size)
break
except RequestEntitySizeExceeded:
chunk_size -= 8
if chunk_size <= 0:
raise ValueError("Unable to fetch remote file list.")
for parsed_file in parsed_files:
if parsed_file.full_path not in remote_files:
local_files_missing_remotely.append(parsed_file)
else:
local_files.append(parsed_file)
console.print(
f"{len(local_files) + len(local_files_missing_remotely)} annotation file(s) found.",
style="info",
)
if local_files_missing_remotely:
console.print(
f"{len(local_files_missing_remotely)} file(s) are missing from the dataset",
style="warning",
)
for local_file in local_files_missing_remotely:
console.print(
f"\t{local_file.path}: '{local_file.full_path}'", style="warning"
)
if class_prompt and not secure_continue_request():
return
(
local_classes_not_in_dataset,
local_classes_not_in_team,
) = _resolve_annotation_classes(
[
annotation_class
for file in local_files
for annotation_class in file.annotation_classes
],
classes_in_dataset,
classes_in_team,
)
console.print(
f"{len(local_classes_not_in_team)} classes needs to be created.", style="info"
)
console.print(
f"{len(local_classes_not_in_dataset)} classes needs to be added to {dataset.identifier}",
style="info",
)
missing_skeletons: List[dt.AnnotationClass] = list(
filter(_is_skeleton_class, local_classes_not_in_team)
)
missing_skeleton_names: str = ", ".join(map(_get_skeleton_name, missing_skeletons))
if missing_skeletons:
console.print(
f"Found missing skeleton classes: {missing_skeleton_names}. Missing Skeleton classes cannot be created. Exiting now.",
style="error",
)
return
if local_classes_not_in_team:
console.print("About to create the following classes", style="info")
for missing_class in local_classes_not_in_team:
console.print(
f"\t{missing_class.name}, type: {missing_class.annotation_internal_type or missing_class.annotation_type}",
style="info",
)
if class_prompt and not secure_continue_request():
return
for missing_class in local_classes_not_in_team:
dataset.create_annotation_class(
missing_class.name,
missing_class.annotation_internal_type or missing_class.annotation_type,
)
if local_classes_not_in_dataset:
console.print(
f"About to add the following classes to {dataset.identifier}", style="info"
)
for cls in local_classes_not_in_dataset:
dataset.add_annotation_class(cls)
# Refetch classes to update mappings
if local_classes_not_in_team or local_classes_not_in_dataset:
maybe_remote_classes: List[dt.DictFreeForm] = dataset.fetch_remote_classes()
if not maybe_remote_classes:
raise ValueError("Unable to fetch remote classes.")
remote_classes = build_main_annotations_lookup_table(maybe_remote_classes)
else:
remote_classes = build_main_annotations_lookup_table(team_classes)
if delete_for_empty:
console.print(
"Importing annotations...\nEmpty annotation file(s) will clear all existing annotations in matching remote files.",
style="info",
)
else:
console.print(
"Importing annotations...\nEmpty annotations will be skipped, if you want to delete annotations rerun with '--delete-for-empty'.",
style="info",
)
# Need to re parse the files since we didn't save the annotations in memory
for local_path in set(local_file.path for local_file in local_files): # noqa: C401
imported_files: Union[
List[dt.AnnotationFile], dt.AnnotationFile, None
] = importer(local_path)
if imported_files is None:
parsed_files = []
elif not isinstance(imported_files, List):
parsed_files = [imported_files]
else:
parsed_files = imported_files
# remove files missing on the server
missing_files = [
missing_file.full_path for missing_file in local_files_missing_remotely
]
parsed_files = [
parsed_file
for parsed_file in parsed_files
if parsed_file.full_path not in missing_files
]
files_to_not_track = [
file_to_track
for file_to_track in parsed_files
if not file_to_track.annotations and (not delete_for_empty)
]
for file in files_to_not_track:
console.print(
f"{file.filename} has no annotations. Skipping upload...",
style="warning",
)
files_to_track = [
file for file in parsed_files if file not in files_to_not_track
]
if files_to_track:
_warn_unsupported_annotations(files_to_track)
for parsed_file in track(files_to_track):
image_id, default_slot_name = remote_files[parsed_file.full_path]
# We need to check if name is not-None as Darwin JSON 1.0
# defaults to name=None
if parsed_file.slots and parsed_file.slots[0].name:
default_slot_name = parsed_file.slots[0].name
metadata_path = is_properties_enabled(parsed_file.path)
errors, _ = _import_annotations(
dataset.client,
image_id,
remote_classes,
attributes,
parsed_file.annotations, # type: ignore
default_slot_name,
dataset,
append,
delete_for_empty,
import_annotators,
import_reviewers,
metadata_path,
)
if errors:
console.print(
f"Errors importing {parsed_file.filename}", style="error"
)
for error in errors:
console.print(f"\t{error}", style="error")
def _get_multi_cpu_settings(
cpu_limit: Optional[int], cpu_count: int, use_multi_cpu: bool
) -> Tuple[int, bool]:
if cpu_limit == 1 or cpu_count == 1 or not use_multi_cpu:
return 1, False
if cpu_limit is None:
return max([cpu_count - 2, 2]), True
return cpu_limit if cpu_limit <= cpu_count else cpu_count, True
def _warn_unsupported_annotations(parsed_files: List[AnnotationFile]) -> None:
console = Console(theme=_console_theme())
for parsed_file in parsed_files:
skipped_annotations = []
for annotation in parsed_file.annotations:
if annotation.annotation_class.annotation_type in UNSUPPORTED_CLASSES:
skipped_annotations.append(annotation)
if len(skipped_annotations) > 0:
types = {
c.annotation_class.annotation_type for c in skipped_annotations
} # noqa: C417
console.print(
f"Import of annotation class types '{', '.join(types)}' is not yet supported. Skipping {len(skipped_annotations)} "
+ "annotations from '{parsed_file.full_path}'.\n",
style="warning",
)
def _is_skeleton_class(the_class: dt.AnnotationClass) -> bool:
return (
the_class.annotation_internal_type or the_class.annotation_type
) == "skeleton"