/
utils.py
933 lines (776 loc) · 29.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
import itertools
import multiprocessing as mp
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any, Dict, Generator, Iterator, List, Optional, Set, Tuple, Union
import numpy as np
from PIL import Image as PILImage
from rich.live import Live
from rich.progress import ProgressBar, track
import darwin.datatypes as dt
from darwin.datatypes import PathLike
from darwin.exceptions import NotFound
from darwin.importer.formats.darwin import parse_path
from darwin.utils import (
SUPPORTED_EXTENSIONS,
SUPPORTED_IMAGE_EXTENSIONS,
SUPPORTED_VIDEO_EXTENSIONS,
attempt_decode,
get_image_path_from_stream,
is_unix_like_os,
parse_darwin_json,
)
from darwin.utils.utils import stream_darwin_json
# E.g.: {"partition" => {"class_name" => 123}}
AnnotationDistribution = Dict[str, Counter]
def get_release_path(dataset_path: Path, release_name: Optional[str] = None) -> Path:
"""
Given a dataset path and a release name, returns the path to the release.
Parameters
----------
dataset_path : Path
Path to the location of the dataset on the file system.
release_name : Optional[str], default: None
Version of the dataset.
Returns
-------
Path
Path to the location of the dataset release on the file system.
Raises
------
NotFound
If no dataset is found in the location provided by ``dataset_path``.
"""
assert dataset_path is not None
if not release_name:
release_name = "latest"
release_path: Path = dataset_path / "releases" / release_name
if not release_path.exists():
raise NotFound(
f"Local copy of release {release_name} not found: "
f"Pull this release from Darwin using 'darwin dataset pull {dataset_path.name}:{release_name}' "
f"or use a different release."
)
return release_path
def extract_classes(
annotations_path: Path, annotation_type: Union[str, List[str]]
) -> Tuple[Dict[str, Set[int]], Dict[int, Set[str]]]:
"""
Given the GT as json files extracts all classes and maps images index to classes.
Parameters
----------
annotations_files : Path
Path to the json files with the GT information of each image.
annotation_type : Union[str, List[str]]
Type(s) of annotation to use to extract the GT information.
Returns
-------
Tuple[Dict[str, Set[int]], Dict[int, Set[str]]]
A Tuple where the first element is a ``Dictionary`` where keys are the classes found in the
GT and values are a list of file numbers which contain it; and the second element is
``Dictionary`` where keys are image indices and values are all classes
contained in that image.
"""
if isinstance(annotation_type, str):
annotation_types_to_load = [annotation_type]
else:
annotation_types_to_load = annotation_type
for atype in annotation_types_to_load:
assert atype in ["bounding_box", "polygon", "tag"]
classes: Dict[str, Set[int]] = defaultdict(set)
indices_to_classes: Dict[int, Set[str]] = defaultdict(set)
for i, file_name in enumerate(sorted(annotations_path.glob("**/*.json"))):
annotation_file = parse_path(file_name)
if not annotation_file:
continue
for annotation in annotation_file.annotations:
if (
annotation.annotation_class.annotation_type
not in annotation_types_to_load
):
continue
class_name = annotation.annotation_class.name
indices_to_classes[i].add(class_name)
classes[class_name].add(i)
return classes, indices_to_classes
def make_class_lists(release_path: Path) -> None:
"""
Support function to extract classes and save the output to file.
Parameters
----------
release_path : Path
Path to the location of the dataset on the file system.
"""
assert release_path is not None
if isinstance(release_path, str):
release_path = Path(release_path)
annotations_path = release_path / "annotations"
assert annotations_path.exists()
lists_path = release_path / "lists"
lists_path.mkdir(exist_ok=True)
for annotation_type in ["tag", "polygon", "bounding_box"]:
fname = lists_path / f"classes_{annotation_type}.txt"
classes, _ = extract_classes(annotations_path, annotation_type=annotation_type)
classes_names = list(classes.keys())
if len(classes_names) > 0:
classes_names.sort()
with open(str(fname), "w") as f:
f.write("\n".join(classes_names))
def get_classes_from_file(path: Path) -> List[str]:
"""Helper function to read class names from a file."""
if path.exists():
return path.read_text().splitlines()
return []
def available_annotation_types(release_path: Path) -> List[str]:
"""Returns a list of available annotation types based on the existing files."""
files = [p.name for p in release_path.glob("lists/classes_*.txt")]
return [f[len("classes_") : -len(".txt")] for f in files]
def get_classes(
dataset_path: PathLike,
release_name: Optional[str] = None,
annotation_type: Union[str, List[str]] = "polygon",
remove_background: bool = True,
) -> List[str]:
"""
Given a dataset and an annotation_type returns the list of classes.
Parameters
----------
dataset_path : PathLike
Path to the location of the dataset on the file system.
release_name : Optional[str], default: None
Version of the dataset.
annotation_type : str, default: "polygon"
The type of annotation classes [tag, polygon, bounding_box].
remove_background : bool, default: True
Removes the background class (if exists) from the list of classes.
Returns
-------
List[str]
List of classes in the dataset of type classes_type.
"""
assert dataset_path is not None
dataset_path = Path(dataset_path)
release_path = get_release_path(dataset_path, release_name)
if isinstance(annotation_type, str):
annotation_types_to_load = [annotation_type]
else:
annotation_types_to_load = annotation_type
classes = [] # Use a list to maintain order
for atype in annotation_types_to_load:
classes_file_path = release_path / f"lists/classes_{atype}.txt"
class_per_annotations = get_classes_from_file(classes_file_path)
if (
remove_background
and class_per_annotations
and class_per_annotations[0] == "__background__"
):
class_per_annotations = class_per_annotations[1:]
for cls in class_per_annotations:
if cls not in classes: # Only add if it's not already in the list
classes.append(cls)
available_types = available_annotation_types(release_path)
assert (
len(classes) > 0
), f"No classes found for {annotation_type}. Supported types are: {', '.join(available_types)}"
return classes
def _f(x: Any) -> Any:
"""Support function for ``pool.map()`` in ``_exhaust_generator()``."""
if callable(x):
return x()
return x
def exhaust_generator(
progress: Generator,
count: int,
multi_processed: bool,
worker_count: Optional[int] = None,
) -> Tuple[List[Dict[str, Any]], List[Exception]]:
"""
Exhausts the generator passed as parameter. Can be done multi processed if desired.
Creates and returns a coco record from the given annotation.
Uses ``BoxMode.XYXY_ABS`` from ``detectron2.structures`` if available, defaults to ``box_mode = 0``
otherwise.
Parameters
----------
annotation_path : Path
``Path`` to the annotation file.
annotation_type : str = "polygon"
Type of the annotation we want to retrieve.
image_path : Optional[Path], default: None
``Path`` to the image the annotation refers to.
image_id : Optional[Union[str, int]], default: None
Id of the image the annotation refers to.
classes : Optional[List[str]], default: None
Classes of the annotation.
Returns
-------
Dict[str, Any]
A coco record with the following keys:
.. code-block:: python
{
"height": 100,
"width": 100,
"file_name": "a file name",
"image_id": 1,
"annotations": [ ... ]
}
"""
successes = []
errors = []
if multi_processed:
progress_bar: ProgressBar = ProgressBar(total=count)
responses = []
def update(*a):
progress_bar.completed += 1
if worker_count is None:
worker_count = mp.cpu_count()
with Live(progress_bar):
with mp.Pool(worker_count) as pool:
for f in progress:
responses.append(pool.apply_async(_f, args=(f,), callback=update))
pool.close()
pool.join()
for response in responses:
try:
successes.append(response.get())
except Exception as e:
errors.append(e)
else:
for f in track(progress, total=count, description="Progress"):
try:
successes.append(_f(f))
except Exception as e:
errors.append(e)
return successes, errors
def get_coco_format_record(
annotation_path: Path,
annotation_type: str = "polygon",
image_path: Optional[Path] = None,
image_id: Optional[Union[str, int]] = None,
classes: Optional[List[str]] = None,
) -> Dict[str, Any]:
assert annotation_type in ["tag", "polygon", "bounding_box"]
try:
from detectron2.structures import BoxMode
box_mode = BoxMode.XYXY_ABS
except ImportError:
box_mode = 0
data = parse_darwin_json(annotation_path)
record: Dict[str, Any] = {}
if image_path is not None:
record["file_name"] = str(image_path)
if image_id is not None:
record["image_id"] = image_id
record["height"] = data.image_height
record["width"] = data.image_width
objs = []
for obj in data.annotations:
if annotation_type != obj.annotation_class.annotation_type:
if (
annotation_type not in obj.data
): # Allows training object detection with bboxes
continue
if annotation_type == "polygon":
new_obj = create_polygon_object(obj, box_mode, classes)
elif annotation_type == "bounding_box":
new_obj = create_bbox_object(obj, box_mode, classes)
else:
continue
objs.append(new_obj)
record["annotations"] = objs
return record
def create_polygon_object(obj, box_mode, classes=None):
if "paths" in obj.data:
paths = obj.data["paths"]
elif "path" in obj.data:
paths = [obj.data["path"]]
else:
raise ValueError("polygon path not found")
all_px, all_py = [], []
segmentation = []
for path in paths:
if len(path) < 3:
continue
px, py = [], []
for point in path:
px.append(point["x"])
py.append(point["y"])
poly = list(zip(px, py))
segmentation.append(list(itertools.chain.from_iterable(poly)))
all_px.extend(px)
all_py.extend(py)
new_obj = {
"segmentation": segmentation,
"bbox": [np.min(all_px), np.min(all_py), np.max(all_px), np.max(all_py)],
"bbox_mode": box_mode,
"category_id": (
classes.index(obj.annotation_class.name)
if classes
else obj.annotation_class.name
),
"iscrowd": 0,
}
return new_obj
def create_bbox_object(obj, box_mode, classes=None):
bbox = obj.data["bounding_box"]
new_obj = {
"bbox": [bbox["x"], bbox["y"], bbox["x"] + bbox["w"], bbox["y"] + bbox["h"]],
"bbox_mode": box_mode,
"category_id": (
classes.index(obj.annotation_class.name)
if classes
else obj.annotation_class.name
),
"iscrowd": 0,
}
return new_obj
def get_annotations(
dataset_path: PathLike,
partition: Optional[str] = None,
split: Optional[str] = "default",
split_type: Optional[str] = None,
annotation_type: str = "polygon",
release_name: Optional[str] = None,
annotation_format: Optional[str] = "coco",
ignore_inconsistent_examples: bool = False,
) -> Iterator[Dict[str, Any]]:
"""
Returns all the annotations of a given dataset and split in a single dictionary.
Parameters
----------
dataset_path : PathLike
Path to the location of the dataset on the file system.
partition : Optional[str], default: None
Selects one of the partitions ``[train, val, test]``.
split : Optional[str], default: "default"
Selects the split that defines the percentages used (use 'default' to select the default split).
split_type : Optional[str], default: None
Heuristic used to do the split ``[random, stratified, None]``.
annotation_type : str, default: "polygon"
The type of annotation classes ``[tag, bounding_box, polygon]``.
release_name : Optional[str], default: None
Version of the dataset.
annotation_format : Optional[str], default: "coco"
Re-formatting of the annotation when loaded ``[coco, darwin]``.
ignore_inconsistent_examples : bool, default: False
Ignore examples for which we have annotations, but either images are missing,
or more than one images exist for the same annotation.
If set to ``True``, then filter those examples out of the dataset.
If set to ``False``, then raise an error as soon as such an example is found.
Returns
-------
Iterator[Dict[str, Any]]
Dictionary containing all the annotations of the dataset.
Raises
------
ValueError
- If the ``partition`` given is not valid.
- If the ``split_type`` given is not valid.
- If the ``annotation_type`` given is not valid.
- If an annotation has no corresponding image.
- If an image is present with multiple extensions.
FileNotFoundError
If no dataset in ``dataset_path`` is found.
"""
assert dataset_path is not None
dataset_path = Path(dataset_path)
release_path: Path = get_release_path(dataset_path, release_name)
annotations_dir = release_path / "annotations"
assert annotations_dir.exists()
images_dir = dataset_path / "images"
assert images_dir.exists()
_validate_inputs(partition, split_type, annotation_type)
classes = get_classes(
dataset_path,
release_name,
annotation_type=annotation_type,
remove_background=True,
)
if partition:
stems = _get_stems_from_split(
release_path, split, split_type, annotation_type, partition
)
else:
stems = (e.stem for e in annotations_dir.glob("**/*.json"))
(
images_paths,
annotations_paths,
invalid_annotation_paths,
) = _map_annotations_to_images(
stems, annotations_dir, images_dir, ignore_inconsistent_examples
)
print(f"Found {len(invalid_annotation_paths)} invalid annotations")
for p in invalid_annotation_paths:
print(p)
if len(images_paths) == 0:
raise ValueError(
f"Could not find any {SUPPORTED_EXTENSIONS} file"
f" in {dataset_path / 'images'}"
)
assert len(images_paths) == len(annotations_paths)
yield from _load_and_format_annotations(
images_paths, annotations_paths, annotation_format, annotation_type, classes
)
def _validate_inputs(partition, split_type, annotation_type):
"""
Validates the input parameters for partition, split_type, and annotation_type.
Args:
partition (str, None): Dataset partition. Should be 'train', 'val', 'test' or None.
split_type (str, None): Type of dataset split. Can be 'random', 'stratified' or None.
annotation_type (str): Type of annotations. Can be 'tag', 'polygon', or 'bounding_box'.
Raises:
ValueError: If the input parameters do not match the expected values.
"""
if partition not in ["train", "val", "test", None]:
raise ValueError("partition should be either 'train', 'val', 'test', or None")
if split_type not in ["random", "stratified", None]:
raise ValueError("split_type should be either 'random', 'stratified', or None")
if annotation_type not in ["tag", "polygon", "bounding_box"]:
raise ValueError(
"annotation_type should be either 'tag', 'bounding_box', or 'polygon'"
)
def _get_stems_from_split(release_path, split, split_type, annotation_type, partition):
"""
Determines the file stems based on the dataset split and other parameters.
Args:
release_path (Path): Path to the dataset release.
split (str): Dataset split identifier.
split_type (str, None): Type of dataset split. Can be 'random', 'stratified' or None.
annotation_type (str): Type of annotations. Can be 'tag', 'polygon', or 'bounding_box'.
partition (str, None): Dataset partition. Should be 'train', 'val', 'test' or None.
Returns:
Generator[str]: File stems for the dataset.
Raises:
ValueError: If the split_type is invalid.
FileNotFoundError: If the dataset partition file is not found.
"""
if split_type is None:
split_file = f"{partition}.txt"
elif split_type == "random":
split_file = f"{split_type}_{partition}.txt"
elif split_type == "stratified":
split_file = f"{split_type}_{annotation_type}_{partition}.txt"
else:
raise ValueError(f"Invalid split_type ({split_type})")
split_path: Path = release_path / "lists" / str(split) / split_file
if split_path.is_file():
return (e.rstrip("\n\r") for e in split_path.open())
else:
raise FileNotFoundError(
"Could not find a dataset partition. ",
"To split the dataset you can use 'split_dataset' from darwin.dataset.split_manager",
)
def _map_annotations_to_images(
stems, annotations_dir, images_dir, ignore_inconsistent_examples
):
"""
Maps annotations to their corresponding images based on the file stems.
Args:
stems (List[str]): List of file stems.
annotations_dir (Path): Directory containing annotation files.
images_dir (Path): Directory containing image files.
ignore_inconsistent_examples (bool): Flag to determine if inconsistent examples should be ignored.
Returns:
Tuple[List[Path], List[Path], List[Path]]: Lists of paths for images, annotations, and invalid annotations respectively.
Raises:
ValueError: If there are inconsistencies with the annotations and images.
"""
images_paths = []
annotations_paths = []
invalid_annotation_paths = []
for annotation_path in annotations_dir.glob("**/*.json"):
darwin_json = stream_darwin_json(annotation_path)
image_path = get_image_path_from_stream(darwin_json, images_dir)
if image_path.exists():
images_paths.append(image_path)
annotations_paths.append(annotation_path)
continue
else:
if ignore_inconsistent_examples:
invalid_annotation_paths.append(annotation_path)
continue
else:
raise ValueError(
f"Annotation ({annotation_path}) does not have a corresponding image"
)
return images_paths, annotations_paths, invalid_annotation_paths
def _load_and_format_annotations(
images_paths, annotations_paths, annotation_format, annotation_type, classes
):
"""
Loads and formats annotations based on the specified format and type.
Args:
images_paths (List[Path]): List of paths to image files.
annotations_paths (List[Path]): List of paths to annotation files.
annotation_format (str): Desired output format for annotations. Can be 'coco' or 'darwin'.
annotation_type (str): Type of annotations. Can be 'tag', 'polygon', or 'bounding_box'.
classes (List[str]): List of class names.
Yields:
Dict: Formatted annotation record.
Notes:
- If the annotation format is 'coco', video annotations cannot be loaded and will be skipped.
"""
if annotation_format == "coco":
images_ids = list(range(len(images_paths)))
for annotation_path, image_path, image_id in zip(
annotations_paths, images_paths, images_ids
):
if image_path.suffix.lower() in SUPPORTED_VIDEO_EXTENSIONS:
print(
f"[WARNING] Cannot load video annotation into COCO format. Skipping {image_path}"
)
continue
yield get_coco_format_record(
annotation_path=annotation_path,
annotation_type=annotation_type,
image_path=image_path,
image_id=image_id,
classes=classes,
)
elif annotation_format == "darwin":
for annotation_path in annotations_paths:
record = attempt_decode(annotation_path)
yield record
def load_pil_image(path: Path, to_rgb: Optional[bool] = True) -> PILImage.Image:
"""
Loads a PIL image and converts it into RGB (optional).
Parameters
----------
path : Path
Path to the image file.
to_rgb : Optional[bool], default: True
Converts the image to RGB.
Returns
-------
PILImage.Image
The loaded image.
"""
pic = PILImage.open(path)
if to_rgb:
pic = convert_to_rgb(pic)
return pic
def convert_to_rgb(pic: PILImage.Image) -> PILImage.Image:
"""
Converts a PIL image to RGB.
Parameters
----------
pic : PILImage.Image
The image to convert.
Returns
-------
PIL Image
Values between 0 and 255.
Raises
------
TypeError
If the image given via ``pic`` has an unsupported type.
"""
if pic.mode == "RGB":
pass
elif pic.mode in ("CMYK", "RGBA", "P"):
pic = pic.convert("RGB")
elif pic.mode == "I":
img = (np.divide(np.array(pic, np.int32), 2**16 - 1) * 255).astype(np.uint8)
pic = PILImage.fromarray(np.stack((img, img, img), axis=2))
elif pic.mode == "I;16":
img = (np.divide(np.array(pic, np.int16), 2**8 - 1) * 255).astype(np.uint8)
pic = PILImage.fromarray(np.stack((img, img, img), axis=2))
elif pic.mode == "L":
img = np.array(pic).astype(np.uint8)
pic = PILImage.fromarray(np.stack((img, img, img), axis=2))
elif pic.mode == "1":
pic = pic.convert("L")
img = np.array(pic).astype(np.uint8)
pic = PILImage.fromarray(np.stack((img, img, img), axis=2))
else:
raise TypeError(f"unsupported image type {pic.mode}")
return pic
def compute_max_density(annotations_dir: Path) -> int:
"""
Calculates the maximum density of all of the annotations in the given folder.
Density is calculated as the number of polygons / complex_polygons present in an annotation
file.
Parameters
----------
annotations_dir : Path
Directory where the annotations are present.
Returns
-------
int
The maximum density.
"""
max_density = 0
for annotation_path in annotations_dir.glob("**/*.json"):
annotation_density = 0
darwin_json = parse_darwin_json(annotation_path)
for annotation in darwin_json.annotations:
if "path" not in annotation.data and "paths" not in annotation.data:
continue
annotation_density += 1
if annotation_density > max_density:
max_density = annotation_density
return max_density
def compute_distributions(
annotations_dir: Path,
split_path: Path,
partitions: List[str] = ["train", "val", "test"],
annotation_types: List[str] = ["polygon"],
) -> Dict[str, AnnotationDistribution]:
"""
Builds and returns the following dictionaries:
- class_distribution: count of all files where at least one instance of a given class exists for each partition
- instance_distribution: count of all instances of a given class exist for each partition
Note that this function can only be used after a dataset has been split with "stratified" strategy.
Parameters
----------
annotations_dir : Path
Directory where the annotations are.
split_path : Path
Path to the split.
partitions : List[str], default: ["train", "val", "test"]
Partitions to use.
annotation_types : List[str], default: ["polygon"]
Annotation types to consider.
Returns
-------
Dict[str, AnnotationDistribution]
- class_distribution: count of all files where at least one instance of a given class exists for each partition
- instance_distribution: count of all instances of a given class exist for each partition
"""
class_distribution: AnnotationDistribution = {
partition: Counter() for partition in partitions
}
instance_distribution: AnnotationDistribution = {
partition: Counter() for partition in partitions
}
for partition in partitions:
for annotation_type in annotation_types:
split_file: Path = (
split_path / f"stratified_{annotation_type}_{partition}.txt"
)
if not split_file.exists():
split_file = split_path / f"random_{partition}.txt"
stems: List[str] = [e.rstrip("\n\r") for e in split_file.open()]
for stem in stems:
annotation_path: Path = annotations_dir / f"{stem}.json"
annotation_file: Optional[dt.AnnotationFile] = parse_path(
annotation_path
)
if annotation_file is None:
continue
annotation_class_names: List[str] = [
annotation.annotation_class.name
for annotation in annotation_file.annotations
]
class_distribution[partition] += Counter(set(annotation_class_names))
instance_distribution[partition] += Counter(annotation_class_names)
return {"class": class_distribution, "instance": instance_distribution}
# https://github.com/python/cpython/blob/main/Lib/pathlib.py#L812
# TODO implemented here because it's not supported in Pythton < 3.9
def is_relative_to(path: Path, *other) -> bool:
"""
Returns ``True`` if the path is relative to another path or ``False`` otherwise.
It also returns ``False`` in the event of an exception, making ``False`` the default value.
Parameters
----------
path : Path
The path to evaluate.
other : Path
The other path to compare against.
Returns
--------
bool
``True`` if the path is relative to ``other`` or ``False`` otherwise.
"""
try:
path.relative_to(*other)
return True
except ValueError:
return False
def sanitize_filename(filename: str) -> str:
"""
Sanitizes the given filename, removing/replacing forbiden characters.
Parameters
----------
filename : str
The filename to sanitize.
Returns
-------
str
The sanitized filename.
"""
chars = ["<", ">", '"', "/", "\\", "|", "?", "*"]
if not is_unix_like_os():
chars.append(":")
for char in chars:
filename = filename.replace(char, "_")
return filename
def get_external_file_type(storage_key: str) -> Optional[str]:
"""
Returns the type of file given a storage key.
Parameters
----------
storage_key : str
The storage key to get the type of file from.
Returns
-------
Optional[str]
The type of file, or ``None`` if the file type is not supported.
"""
for extension in SUPPORTED_IMAGE_EXTENSIONS:
if storage_key.endswith(extension):
return "image"
if storage_key.endswith(".pdf"):
return "pdf"
if storage_key.endswith(".dcm"):
return "dicom"
for extension in SUPPORTED_VIDEO_EXTENSIONS:
if storage_key.endswith(extension):
return "video"
return None
def parse_external_file_path(storage_key: str, preserve_folders: bool) -> str:
"""
Returns the Darwin dataset path given a storage key.
Parameters
----------
storage_key : str
The storage key to parse.
preserve_folders : bool
Whether to preserve folders or place the file in the Dataset root.
Returns
-------
str
The parsed external file path.
"""
if not preserve_folders:
return "/"
return "/" + "/".join(storage_key.split("/")[:-1])
def get_external_file_name(storage_key: str) -> str:
"""
Returns the name of the file given a storage key.
Parameters
----------
storage_key : str
The storage key to get the file name from.
Returns
-------
str
The name of the file.
"""
if "/" not in storage_key:
return storage_key
return storage_key.split("/")[-1]
def chunk_items(items: List[Any], chunk_size: int = 500) -> Iterator[List[Any]]:
"""
Splits the list of items into chunks of specified size.
Parameters
----------
items : List[Any]
The list of items to split.
chunk_size : int, default: 500
The size of each chunk.
Returns
-------
Iterator[List[Any]]
An iterator that yields lists of items, each of length ``chunk_size``.
"""
return (items[i : i + chunk_size] for i in range(0, len(items), chunk_size))