/
dataset.py
4863 lines (4272 loc) · 191 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# type: ignore
import os
import uuid
import json
import posixpath
from logging import warning
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, Set
from functools import partial
import pathlib
import numpy as np
from time import time, sleep
from jwt import DecodeError
from tqdm import tqdm
import deeplake
from deeplake.client.config import DEEPLAKE_AUTH_TOKEN
from deeplake.core import index_maintenance
from deeplake.core.index.index import IndexEntry
from deeplake.core.link_creds import LinkCreds
from deeplake.core.sample import Sample
from deeplake.core.linked_sample import LinkedSample
from deeplake.util.connect_dataset import connect_dataset_entry
from deeplake.util.downsample import validate_downsampling
from deeplake.util.version_control import (
save_version_info,
integrity_check,
save_commit_info,
rebuild_version_info,
_squash_main,
)
from deeplake.util.invalid_view_op import invalid_view_op
from deeplake.util.spinner import spinner
from deeplake.util.iteration_warning import (
suppress_iteration_warning,
check_if_iteration,
)
from deeplake.util.tensor_db import parse_runtime_parameters
from deeplake.api.info import load_info
from deeplake.client.log import logger
from deeplake.client.client import DeepLakeBackendClient
from deeplake.constants import (
FIRST_COMMIT_ID,
DEFAULT_MEMORY_CACHE_SIZE,
DEFAULT_LOCAL_CACHE_SIZE,
MB,
SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE,
DEFAULT_READONLY,
ENV_HUB_DEV_USERNAME,
QUERY_MESSAGE_MAX_SIZE,
_INDEX_OPERATION_MAPPING,
)
from deeplake.core.fast_forwarding import ffw_dataset_meta
from deeplake.core.index import Index
from deeplake.core.lock import lock_dataset, unlock_dataset, Lock
from deeplake.core.meta.dataset_meta import DatasetMeta
from deeplake.core.storage import (
LRUCache,
S3Provider,
GCSProvider,
MemoryProvider,
LocalProvider,
AzureProvider,
)
from deeplake.core.tensor import Tensor, create_tensor, delete_tensor
from deeplake.core.version_control.commit_node import CommitNode # type: ignore
from deeplake.core.version_control.dataset_diff import load_dataset_diff
from deeplake.htype import (
HTYPE_CONFIGURATIONS,
UNSPECIFIED,
verify_htype_key_value,
)
from deeplake.compression import COMPRESSION_ALIASES
from deeplake.integrations import dataset_to_tensorflow
from deeplake.util.bugout_reporter import deeplake_reporter, feature_report_path
from deeplake.util.dataset import try_flushing
from deeplake.util.cache_chain import generate_chain
from deeplake.util.hash import hash_inputs
from deeplake.util.htype import parse_complex_htype
from deeplake.util.link import save_link_creds
from deeplake.util.merge import merge
from deeplake.util.notebook import is_colab
from deeplake.util.path import (
convert_pathlib_to_string_if_needed,
)
from deeplake.util.scheduling import create_random_split_views
from deeplake.util.logging import log_visualizer_link
from deeplake.util.warnings import always_warn
from deeplake.util.exceptions import (
CouldNotCreateNewDatasetException,
InvalidKeyTypeError,
MemoryDatasetCanNotBePickledError,
PathNotEmptyException,
TensorAlreadyExistsError,
TensorDoesNotExistError,
TensorGroupDoesNotExistError,
InvalidTensorNameError,
InvalidTensorGroupNameError,
LockedException,
TensorGroupAlreadyExistsError,
ReadOnlyModeError,
RenameError,
EmptyCommitError,
DatasetViewSavingError,
SampleAppendingError,
DatasetTooLargeToDelete,
TensorTooLargeToDelete,
GroupInfoNotSupportedError,
TokenPermissionError,
CheckoutError,
DatasetCorruptError,
BadRequestException,
SampleAppendError,
SampleExtendError,
DatasetHandlerError,
)
from deeplake.util.keys import (
dataset_exists,
get_dataset_info_key,
get_dataset_meta_key,
tensor_exists,
get_queries_key,
get_queries_lock_key,
get_sample_id_tensor_key,
get_sample_info_tensor_key,
get_sample_shape_tensor_key,
get_downsampled_tensor_key,
filter_name,
get_dataset_linked_creds_key,
get_tensor_meta_key,
get_chunk_id_encoder_key,
get_tensor_tile_encoder_key,
get_creds_encoder_key,
)
from deeplake.util.path import get_path_from_storage, relpath
from deeplake.util.remove_cache import get_base_storage
from deeplake.util.diff import get_all_changes_string, get_changes_and_messages
from deeplake.util.version_control import (
auto_checkout,
checkout,
delete_branch,
commit,
current_commit_has_change,
load_meta,
warn_node_checkout,
load_version_info,
save_version_info,
replace_head,
reset_and_checkout,
)
from deeplake.util.pretty_print import summary_dataset
from deeplake.core.dataset.view_entry import ViewEntry
from deeplake.core.dataset.invalid_view import InvalidView
from deeplake.hooks import dataset_read
from collections import defaultdict
from itertools import chain
import warnings
import jwt
_LOCKABLE_STORAGES = {S3Provider, GCSProvider, AzureProvider, LocalProvider}
def _load_tensor_metas(dataset):
meta_keys = [
get_tensor_meta_key(key, dataset.version_state["commit_id"])
for key in dataset.meta.tensors
]
for _ in dataset.storage.get_items(meta_keys):
pass
dataset._tensors() # access all tensors to set chunk engines
class Dataset:
def __init__(
self,
storage: LRUCache,
index: Optional[Index] = None,
group_index: str = "",
read_only: Optional[bool] = None,
public: Optional[bool] = False,
token: Optional[str] = None,
org_id: Optional[str] = None,
verbose: bool = True,
version_state: Optional[Dict[str, Any]] = None,
path: Optional[Union[str, pathlib.Path]] = None,
address: Optional[str] = None,
is_iteration: bool = False,
link_creds=None,
pad_tensors: bool = False,
lock_enabled: bool = True,
lock_timeout: Optional[int] = 0,
enabled_tensors: Optional[List[str]] = None,
view_base: Optional["Dataset"] = None,
libdeeplake_dataset=None,
index_params: Optional[Dict[str, Union[int, str]]] = None,
**kwargs,
):
"""Initializes a new or existing dataset.
Args:
storage (LRUCache): The storage provider used to access the dataset.
index (Index, Optional): The Index object restricting the view of this dataset's tensors.
group_index (str): Name of the group this dataset instance represents.
read_only (bool, Optional): Opens dataset in read only mode if this is passed as True. Defaults to False.
Datasets stored on Deep Lake cloud that your account does not have write access to will automatically open in read mode.
public (bool, Optional): Applied only if storage is Deep Lake cloud storage and a new Dataset is being created. Defines if the dataset will have public access.
token (str, Optional): Activeloop token, used for fetching credentials for Deep Lake datasets. This is Optional, tokens are normally autogenerated.
org_id (str, Optional): Organization id to be used for enabling high-performance features. Only applicable for local datasets.
verbose (bool): If ``True``, logs will be printed. Defaults to True.
version_state (Dict[str, Any], Optional): The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
path (str, pathlib.Path): The path to the dataset.
address (Optional[str]): The version address of the dataset.
is_iteration (bool): If this Dataset is being used as an iterator.
link_creds (LinkCreds, Optional): The LinkCreds object used to access tensors that have external data linked to them.
pad_tensors (bool): If ``True``, shorter tensors will be padded to the length of the longest tensor.
**kwargs: Passing subclass variables through without errors.
lock_enabled (bool): Whether the dataset should be locked for writing. Only applicable for S3, Deep Lake storage and GCS datasets. No effect if read_only=True.
lock_timeout (int): How many seconds to wait for a lock before throwing an exception. If None, wait indefinitely
enabled_tensors (List[str], Optional): List of tensors that are enabled in this view. By default all tensors are enabled.
view_base (Optional["Dataset"]): Base dataset of this view.
libdeeplake_dataset : The libdeeplake dataset object corresponding to this dataset.
index_params: (Dict[str, Union[int, str]] Optional) VDB index parameter. Defaults to ``None.``
Raises:
ValueError: If an existing local path is given, it must be a directory.
ImproperDatasetInitialization: Exactly one argument out of 'path' and 'storage' needs to be specified.
This is raised if none of them are specified or more than one are specifed.
InvalidHubPathException: If a Deep Lake cloud path (path starting with `hub://`) is specified and it isn't of the form `hub://username/datasetname`.
AuthorizationException: If a Deep Lake cloud path (path starting with `hub://`) is specified and the user doesn't have access to the dataset.
PathNotEmptyException: If the path to the dataset doesn't contain a Deep Lake dataset and is also not empty.
LockedException: If read_only is False but the dataset is locked for writing by another machine.
ReadOnlyModeError: If read_only is False but write access is not available.
"""
d: Dict[str, Any] = {}
d["_client"] = d["ds_name"] = None
# uniquely identifies dataset
d["path"] = convert_pathlib_to_string_if_needed(path) or get_path_from_storage(
storage
)
d["storage"] = storage
d["_read_only_error"] = read_only is False
d["base_storage"] = get_base_storage(storage)
d["_read_only"] = d["base_storage"].read_only
d["_locked_out"] = False # User requested write access but was denied
d["is_iteration"] = is_iteration
d["is_first_load"] = version_state is None
d["_is_filtered_view"] = False
d["index"] = index or Index()
d["group_index"] = group_index
d["_token"] = token
d["org_id"] = org_id
d["public"] = public
d["verbose"] = verbose
d["version_state"] = version_state or {}
d["link_creds"] = link_creds
d["enabled_tensors"] = enabled_tensors
d["libdeeplake_dataset"] = libdeeplake_dataset
d["index_params"] = index_params
d["_info"] = None
d["_ds_diff"] = None
d["_view_id"] = str(uuid.uuid4())
d["_view_base"] = view_base
d["_view_use_parent_commit"] = False
d["_update_hooks"] = {}
d["_commit_hooks"] = {}
d["_checkout_hooks"] = {}
d["_parent_dataset"] = None
d["_pad_tensors"] = pad_tensors
d["_locking_enabled"] = lock_enabled
d["_lock_timeout"] = lock_timeout
d["_temp_tensors"] = []
d["_vc_info_updated"] = True
d["_query_string"] = None
dct = self.__dict__
dct.update(d)
try:
self._set_derived_attributes(address=address)
except LockedException:
raise LockedException(
"This dataset cannot be open for writing as it is locked by another machine. Try loading the dataset with `read_only=True`."
)
except ReadOnlyModeError as e:
raise ReadOnlyModeError(
"This dataset cannot be open for writing as you don't have permissions. Try loading the dataset with `read_only=True."
)
dct["enabled_tensors"] = (
set(self._resolve_tensor_list(enabled_tensors, root=True))
if enabled_tensors
else None
)
self._first_load_init()
self._initial_autoflush: List[bool] = (
[]
) # This is a stack to support nested with contexts
self._indexing_history: List[int] = []
if not self.read_only:
for temp_tensor in self._temp_tensors:
self._delete_tensor(temp_tensor, large_ok=True)
self._temp_tensors = []
if self.is_first_load:
_load_tensor_metas(self)
def _lock_lost_handler(self):
"""This is called when lock is acquired but lost later on due to slow update."""
self.read_only = True
if self.verbose:
always_warn(
"Unable to update dataset lock as another machine has locked it for writing or deleted / overwriten it. Switching to read only mode."
)
self._locked_out = True
def __enter__(self):
self._initial_autoflush.append(self.storage.autoflush)
self.storage.autoflush = False
return self
def __exit__(self, exc_type, exc_val, exc_tb):
autoflush = self._initial_autoflush.pop()
if not self._read_only and autoflush:
if self._vc_info_updated:
self._flush_vc_info()
spinner(self.storage.flush)()
self.storage.autoflush = autoflush
def maybe_flush(self):
if not self._read_only:
if self.storage.autoflush:
if self._vc_info_updated:
self._flush_vc_info()
self.storage.flush()
@property
def username(self) -> str:
if not self.token:
return "public"
try:
return jwt.decode(self.token, options={"verify_signature": False})["id"]
except DecodeError:
return "public"
@property
def num_samples(self) -> int:
"""Returns the length of the smallest tensor.
Ignores any applied indexing and returns the total length.
"""
return min(
map(
len,
filter(
lambda t: t.key not in self.meta.hidden_tensors,
self.version_state["full_tensors"].values(),
),
),
default=0,
)
@property
def meta(self) -> DatasetMeta:
"""Returns the metadata of the dataset."""
return self.version_state["meta"]
@property
def client(self):
"""Returns the client of the dataset."""
return self._client
def __len__(self, warn: bool = True):
"""Returns the length of the smallest tensor."""
tensor_lengths = [len(tensor) for tensor in self.tensors.values()]
pad_tensors = self._pad_tensors
if (
warn
and not pad_tensors
and min(tensor_lengths, default=0) != max(tensor_lengths, default=0)
):
warning(
"The length of tensors in the dataset is different. The len(ds) returns the length of the "
"smallest tensor in the dataset. If you want the length of the longest tensor in the dataset use "
"ds.max_len."
)
length_fn = max if pad_tensors else min
return length_fn(tensor_lengths, default=0)
@property
def max_len(self):
"""Return the maximum length of the tensor."""
return max([len(tensor) for tensor in self.tensors.values()])
@property
def min_len(self):
"""Return the minimum length of the tensor."""
return min([len(tensor) for tensor in self.tensors.values()])
def __getstate__(self) -> Dict[str, Any]:
"""Returns a dict that can be pickled and used to restore this dataset.
Note:
Pickling a dataset does not copy the dataset, it only saves attributes that can be used to restore the dataset.
If you pickle a local dataset and try to access it on a machine that does not have the data present, the dataset will not work.
"""
if self.path.startswith("mem://"):
raise MemoryDatasetCanNotBePickledError
keys = [
"path",
"_read_only",
"index",
"group_index",
"public",
"storage",
"_token",
"verbose",
"version_state",
"org_id",
"ds_name",
"_is_filtered_view",
"_view_id",
"_view_use_parent_commit",
"_parent_dataset",
"_pad_tensors",
"_locking_enabled",
"_lock_timeout",
"enabled_tensors",
"is_iteration",
]
state = {k: getattr(self, k) for k in keys}
state["link_creds"] = self.link_creds
return state
def __setstate__(self, state: Dict[str, Any]):
"""Restores dataset from a pickled state.
Args:
state (dict): The pickled state used to restore the dataset.
"""
state["is_first_load"] = True
state["_info"] = None
state["_read_only_error"] = False
state["_initial_autoflush"] = []
state["_ds_diff"] = None
state["_view_base"] = None
state["_update_hooks"] = {}
state["_commit_hooks"] = {}
state["_checkout_hooks"] = {}
state["_client"] = state["org_id"] = state["ds_name"] = None
state["_temp_tensors"] = []
state["libdeeplake_dataset"] = None
state["_vc_info_updated"] = False
state["_locked_out"] = False
self.__dict__.update(state)
self.__dict__["base_storage"] = get_base_storage(self.storage)
# clear cache while restoring
self.storage.clear_cache_without_flush()
self._set_derived_attributes(verbose=False)
self._indexing_history = []
def _reload_version_state(self):
version_state = self.version_state
# share version state if at HEAD
if (
not self._view_use_parent_commit
and self._view_base
and version_state["commit_node"].is_head_node
):
uid = self._view_id
def commit_hook():
del self._view_base._commit_hooks[uid]
del self._view_base._checkout_hooks[uid]
del self._view_base._update_hooks[uid]
self._view_use_parent_commit = True
self._reload_version_state()
def checkout_hook():
del self._view_base._commit_hooks[uid]
del self._view_base._checkout_hooks[uid]
del self._view_base._update_hooks[uid]
self.__class__ = InvalidView
self.__init__(reason="checkout")
def update_hook():
del self._view_base._commit_hooks[uid]
del self._view_base._checkout_hooks[uid]
del self._view_base._update_hooks[uid]
self.__class__ = InvalidView
self.__init__(reason="update")
if not self.is_iteration:
self._view_base._commit_hooks[uid] = commit_hook
self._view_base._checkout_hooks[uid] = checkout_hook
self._view_base._update_hooks[uid] = update_hook
return version_state
vs_copy = {}
vs_copy["branch"] = version_state["branch"]
# share branch_commit_map and commit_node_map
vs_copy["branch_commit_map"] = version_state["branch_commit_map"]
vs_copy["commit_node_map"] = version_state["commit_node_map"]
commit_node = version_state["commit_node"]
if self._view_use_parent_commit:
vs_copy["commit_node"] = commit_node.parent
else:
vs_copy["commit_node"] = commit_node
vs_copy["commit_id"] = vs_copy["commit_node"].commit_id
vs_copy["tensor_names"] = version_state["tensor_names"].copy()
vs_copy["meta"] = DatasetMeta()
vs_copy["meta"].__setstate__(version_state["meta"].__getstate__())
self.version_state = vs_copy
vs_copy["full_tensors"] = {
key: Tensor(key, self) for key in version_state["full_tensors"]
}
self._view_base = None
def __getitem__(
self,
item: Union[
str, int, slice, List[int], Tuple[Union[int, slice, Tuple[int]]], Index
],
is_iteration: bool = False,
):
is_iteration = is_iteration or self.is_iteration
if isinstance(item, str):
fullpath = posixpath.join(self.group_index, item)
enabled_tensors = self.enabled_tensors
if enabled_tensors is None or fullpath in enabled_tensors:
tensor = self._get_tensor_from_root(fullpath)
if tensor is not None:
index = self.index
if index.is_trivial() and is_iteration == tensor.is_iteration:
return tensor
return tensor.__getitem__(index, is_iteration=is_iteration)
if self._has_group_in_root(fullpath):
ret = self.__class__(
storage=self.storage,
index=self.index,
group_index=posixpath.join(self.group_index, item),
read_only=self.read_only,
token=self._token,
verbose=False,
version_state=self.version_state,
path=self.path,
is_iteration=is_iteration,
link_creds=self.link_creds,
pad_tensors=self._pad_tensors,
enabled_tensors=self.enabled_tensors,
view_base=self._view_base or self,
libdeeplake_dataset=self.libdeeplake_dataset,
)
elif "/" in item:
splt = posixpath.split(item)
ret = self[splt[0]][splt[1]]
else:
raise TensorDoesNotExistError(item)
elif isinstance(item, (int, slice, list, tuple, Index, type(Ellipsis))):
if (
isinstance(item, list)
and len(item)
and (
isinstance(item[0], str)
or (
isinstance(item[0], (list, tuple))
and len(item[0])
and isinstance(item[0][0], str)
)
)
):
group_index = self.group_index
enabled_tensors = [
posixpath.join(
group_index, (x if isinstance(x, str) else "/".join(x))
)
for x in item
]
for x in enabled_tensors:
enabled_tensors.extend(
self[relpath(x, self.group_index)].meta.links.keys()
)
ret = self.__class__(
storage=self.storage,
index=self.index,
group_index=self.group_index,
read_only=self._read_only,
token=self._token,
verbose=False,
version_state=self.version_state,
path=self.path,
is_iteration=is_iteration,
link_creds=self.link_creds,
pad_tensors=self._pad_tensors,
enabled_tensors=enabled_tensors,
view_base=self._view_base or self,
libdeeplake_dataset=self.libdeeplake_dataset,
)
elif isinstance(item, tuple) and len(item) and isinstance(item[0], str):
ret = self
for x in item:
ret = self[x]
return ret
else:
if not is_iteration and isinstance(item, int):
is_iteration = check_if_iteration(self._indexing_history, item)
if is_iteration and deeplake.constants.SHOW_ITERATION_WARNING:
warnings.warn(
"Indexing by integer in a for loop, like `for i in range(len(ds)): ... ds[i]` can be quite slow. Use `for i, sample in enumerate(ds)` instead."
)
ret = self.__class__(
storage=self.storage,
index=self.index[item],
group_index=self.group_index,
read_only=self._read_only,
token=self._token,
verbose=False,
version_state=self.version_state,
path=self.path,
is_iteration=is_iteration,
link_creds=self.link_creds,
pad_tensors=self._pad_tensors,
enabled_tensors=self.enabled_tensors,
view_base=self._view_base or self,
libdeeplake_dataset=self.libdeeplake_dataset,
)
else:
raise InvalidKeyTypeError(item)
if hasattr(self, "_view_entry"):
ret._view_entry = self._view_entry
return ret
def __setitem__(self, item: str, value: Any):
if not isinstance(item, str):
raise TypeError("Datasets do not support item assignment")
tensor = self[item]
if isinstance(tensor, Dataset):
raise TypeError("Tensor groups do not support item assignment")
tensor.index = Index()
tensor._update(self.index, value)
@invalid_view_op
def create_tensor(
self,
name: str,
htype: str = UNSPECIFIED,
dtype: Union[str, np.dtype] = UNSPECIFIED,
sample_compression: str = UNSPECIFIED,
chunk_compression: str = UNSPECIFIED,
hidden: bool = False,
create_sample_info_tensor: bool = True,
create_shape_tensor: bool = True,
create_id_tensor: bool = True,
verify: bool = True,
exist_ok: bool = False,
verbose: bool = True,
downsampling: Optional[Tuple[int, int]] = None,
tiling_threshold: Optional[int] = None,
**kwargs,
):
"""Creates a new tensor in the dataset.
Examples:
>>> # create dataset
>>> ds = deeplake.dataset("path/to/dataset")
>>> # create tensors
>>> ds.create_tensor("images", htype="image", sample_compression="jpg")
>>> ds.create_tensor("videos", htype="video", sample_compression="mp4")
>>> ds.create_tensor("data")
>>> ds.create_tensor("point_clouds", htype="point_cloud")
>>> # append data
>>> ds.images.append(np.ones((400, 400, 3), dtype='uint8'))
>>> ds.videos.append(deeplake.read("videos/sample_video.mp4"))
>>> ds.data.append(np.zeros((100, 100, 2)))
Args:
name (str): The name of the tensor to be created.
htype (str):
- The class of data for the tensor.
- The defaults for other parameters are determined in terms of this value.
- For example, ``htype="image"`` would have ``dtype`` default to ``uint8``.
- These defaults can be overridden by explicitly passing any of the other parameters to this function.
- May also modify the defaults for other parameters.
dtype (str): Optionally override this tensor's ``dtype``. All subsequent samples are required to have this ``dtype``.
sample_compression (str): All samples will be compressed in the provided format. If ``None``, samples are uncompressed. For ``link[]`` tensors, ``sample_compression`` is used only for optimizing dataset views.
chunk_compression (str): All chunks will be compressed in the provided format. If ``None``, chunks are uncompressed. For ``link[]`` tensors, ``chunk_compression`` is used only for optimizing dataset views.
hidden (bool): If ``True``, the tensor will be hidden from ds.tensors but can still be accessed via ``ds[tensor_name]``.
create_sample_info_tensor (bool): If ``True``, meta data of individual samples will be saved in a hidden tensor. This data can be accessed via :attr:`tensor[i].sample_info <deeplake.core.tensor.Tensor.sample_info>`.
create_shape_tensor (bool): If ``True``, an associated tensor containing shapes of each sample will be created.
create_id_tensor (bool): If ``True``, an associated tensor containing unique ids for each sample will be created. This is useful for merge operations.
verify (bool): Valid only for link htypes. If ``True``, all links will be verified before they are added to the tensor.
If ``False``, links will be added without verification but note that ``create_shape_tensor`` and ``create_sample_info_tensor`` will be set to ``False``.
exist_ok (bool): If ``True``, the group is created if it does not exist. if ``False``, an error is raised if the group already exists.
verbose (bool): Shows warnings if ``True``.
downsampling (tuple[int, int]): If not ``None``, the tensor will be downsampled by the provided factors. For example, ``(2, 5)`` will downsample the tensor by a factor of 2 in both dimensions and create 5 layers of downsampled tensors.
Only support for image and mask htypes.
tiling_threshold (Optional, int): In bytes. Tiles large images if their size exceeds this threshold. Set to -1 to disable tiling.
**kwargs:
- ``htype`` defaults can be overridden by passing any of the compatible parameters.
- To see all htypes and their correspondent arguments, check out :ref:`Htypes`.
Returns:
Tensor: The new tensor, which can be accessed by ``dataset[name]`` or ``dataset.name``.
Raises:
TensorAlreadyExistsError: If the tensor already exists and ``exist_ok`` is ``False``.
TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
InvalidTensorNameError: If ``name`` is in dataset attributes.
NotImplementedError: If trying to override ``chunk_compression``.
TensorMetaInvalidHtype: If invalid htype is specified.
ValueError: If an illegal argument is specified.
"""
deeplake_reporter.feature_report(
feature_name="create_tensor",
parameters={
"name": name,
"htype": htype,
"dtype": dtype,
"sample_compression": sample_compression,
"chunk_compression": chunk_compression,
},
)
return self._create_tensor(
name,
htype,
dtype,
sample_compression,
chunk_compression,
hidden,
create_sample_info_tensor,
create_shape_tensor,
create_id_tensor,
verify,
exist_ok,
verbose,
downsampling,
tiling_threshold=tiling_threshold,
**kwargs,
)
@invalid_view_op
def _create_tensor(
self,
name: str,
htype: str = UNSPECIFIED,
dtype: Union[str, np.dtype] = UNSPECIFIED,
sample_compression: str = UNSPECIFIED,
chunk_compression: str = UNSPECIFIED,
hidden: bool = False,
create_sample_info_tensor: bool = True,
create_shape_tensor: bool = True,
create_id_tensor: bool = True,
verify: bool = True,
exist_ok: bool = False,
verbose: bool = True,
downsampling: Optional[Tuple[int, int]] = None,
**kwargs,
):
# if not the head node, checkout to an auto branch that is newly created
auto_checkout(self)
name = filter_name(name, self.group_index)
key = self.version_state["tensor_names"].get(name)
is_sequence, is_link, htype = parse_complex_htype(htype)
if key:
if not exist_ok:
raise TensorAlreadyExistsError(name)
tensor = self.root[key]
current_config = tensor._config
new_config = {
"htype": htype,
"dtype": dtype,
"sample_compression": COMPRESSION_ALIASES.get(
sample_compression, sample_compression
),
"chunk_compression": COMPRESSION_ALIASES.get(
chunk_compression, chunk_compression
),
"hidden": hidden,
"is_link": is_link,
"is_sequence": is_sequence,
}
base_config = HTYPE_CONFIGURATIONS.get(htype, {}).copy()
for key in new_config:
if new_config[key] == UNSPECIFIED:
new_config[key] = base_config.get(key) or UNSPECIFIED
if current_config != new_config:
raise ValueError(
f"Tensor {name} already exists with different configuration. "
f"Current config: {current_config}. "
f"New config: {new_config}"
)
return tensor
elif name in self.version_state["full_tensors"]:
key = f"{name}_{uuid.uuid4().hex[:4]}"
else:
key = name
if name in self._groups:
raise TensorGroupAlreadyExistsError(name)
tensor_name = posixpath.split(name)[1]
if not tensor_name or tensor_name in dir(self):
raise InvalidTensorNameError(tensor_name)
downsampling_factor, number_of_layers = validate_downsampling(downsampling)
kwargs["is_sequence"] = kwargs.get("is_sequence") or is_sequence
kwargs["is_link"] = kwargs.get("is_link") or is_link
kwargs["verify"] = verify
if kwargs["is_link"] and not kwargs["verify"]:
create_shape_tensor = False
create_sample_info_tensor = False
if not self._is_root():
return self.root._create_tensor(
name=key,
htype=htype,
dtype=dtype,
sample_compression=sample_compression,
chunk_compression=chunk_compression,
hidden=hidden,
create_sample_info_tensor=create_sample_info_tensor,
create_shape_tensor=create_shape_tensor,
create_id_tensor=create_id_tensor,
exist_ok=exist_ok,
downsampling=downsampling,
**kwargs,
)
if "/" in name:
self._create_group(posixpath.split(name)[0])
# Seperate meta and info
htype_config = HTYPE_CONFIGURATIONS.get(htype, {}).copy()
info_keys = htype_config.pop("_info", [])
info_kwargs = {}
meta_kwargs = {}
for k, v in kwargs.items():
if k in info_keys:
verify_htype_key_value(htype, k, v)
info_kwargs[k] = v
else:
meta_kwargs[k] = v
# Set defaults
for k in info_keys:
if k not in info_kwargs:
if k == "class_names":
info_kwargs[k] = htype_config[k].copy()
else:
info_kwargs[k] = htype_config[k]
create_tensor(
key,
self.storage,
htype=htype,
dtype=dtype,
sample_compression=sample_compression,
chunk_compression=chunk_compression,
version_state=self.version_state,
hidden=hidden,
overwrite=True,
**meta_kwargs,
)
meta: DatasetMeta = self.meta
ffw_dataset_meta(meta)
meta.add_tensor(name, key, hidden=hidden)
tensor = Tensor(key, self) # type: ignore
tensor.meta.name = name
self.version_state["full_tensors"][key] = tensor
self.version_state["tensor_names"][name] = key
if info_kwargs:
tensor.info.update(info_kwargs)
self.storage.maybe_flush()
if create_sample_info_tensor and htype in (
"image",
"audio",
"video",
"dicom",
"point_cloud",
"mesh",
"nifti",
):
self._create_sample_info_tensor(name)
if create_shape_tensor and htype not in ("text", "json", "tag"):
self._create_sample_shape_tensor(name, htype=htype)
if create_id_tensor:
self._create_sample_id_tensor(name)
if downsampling:
downsampling_htypes = {
"image",
"image.rgb",
"image.gray",
"binary_mask",
"segment_mask",
}
if htype not in downsampling_htypes:
warnings.warn(
f"Downsampling is only supported for tensor with htypes {downsampling_htypes}, got {htype}. Skipping downsampling."
)
else:
self._create_downsampled_tensor(
name,
htype,
dtype,
sample_compression,
chunk_compression,
meta_kwargs,
downsampling_factor,
number_of_layers,
)
return tensor
def _create_sample_shape_tensor(self, tensor: str, htype: str):
shape_tensor = get_sample_shape_tensor_key(tensor)
self._create_tensor(
shape_tensor,
dtype="int64",
hidden=True,
create_id_tensor=False,
create_sample_info_tensor=False,
create_shape_tensor=False,
max_chunk_size=SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE,
)
if htype == "list":
extend_f = "extend_len"
update_f = "update_len"
else:
extend_f = "extend_shape"
update_f = "update_shape"
self._link_tensors(
tensor,
shape_tensor,
extend_f=extend_f,
update_f=update_f,
flatten_sequence=True,
)
def _create_sample_id_tensor(self, tensor: str):
id_tensor = get_sample_id_tensor_key(tensor)
self._create_tensor(
id_tensor,
hidden=True,
create_id_tensor=False,
create_sample_info_tensor=False,
create_shape_tensor=False,
)
self._link_tensors(
tensor,
id_tensor,
extend_f="extend_id",
flatten_sequence=False,
)
def _create_sample_info_tensor(self, tensor: str):
sample_info_tensor = get_sample_info_tensor_key(tensor)
self._create_tensor(
sample_info_tensor,
htype="json",
max_chunk_size=SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE,
hidden=True,
create_id_tensor=False,
create_sample_info_tensor=False,
create_shape_tensor=False,
)
self._link_tensors(
tensor,
sample_info_tensor,
"extend_info",
"update_info",
flatten_sequence=True,
)
def _create_downsampled_tensor(
self,
tensor: str,
htype: str,
dtype: Union[str, np.dtype],
sample_compression: str,
chunk_compression: str,
meta_kwargs: Dict[str, Any],
downsampling_factor: int,
number_of_layers: int,
):
downsampled_tensor = get_downsampled_tensor_key(tensor, downsampling_factor)
if number_of_layers == 1:
downsampling = None
else:
downsampling = (downsampling_factor, number_of_layers - 1)
meta_kwargs = meta_kwargs.copy()
meta_kwargs.pop("is_link", None)
new_tensor = self._create_tensor(
downsampled_tensor,
htype=htype,