Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataloader improvements #2806

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c2e89b1
dataloader changed initial
activesoull Mar 20, 2024
8bc1311
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 20, 2024
34d6b34
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 21, 2024
170e1e1
shuffling fix
activesoull Mar 21, 2024
09206a6
test changes
activesoull Mar 21, 2024
f6a69d1
linter changes
activesoull Mar 21, 2024
0ca7b91
added dicom and nifti support
activesoull Mar 22, 2024
55c7458
handle decode method data if data tensors are object dtype case
activesoull Mar 22, 2024
e685347
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 22, 2024
a940dff
added missing arguments
activesoull Mar 25, 2024
a6662c2
bumb libdeeplake to 0.0.110
activesoull Mar 25, 2024
060c39d
fix dataset slicing logic
activesoull Mar 26, 2024
3ae90ec
black v 24.3.0 fixes
activesoull Mar 26, 2024
99849fd
bumb libdeeplake to 0.0.111
activesoull Mar 26, 2024
f6818f3
bumb libdeeplake to 0.0.112
activesoull Mar 27, 2024
fe28310
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 27, 2024
3d9f8c6
tmp-ssh
activesoull Mar 28, 2024
129119d
revert
activesoull Mar 29, 2024
65dd060
windows fix
activesoull Mar 29, 2024
7e07ac0
debug
activesoull Mar 29, 2024
9b2932e
remove pin_memory_device
activesoull Apr 1, 2024
d4f6152
Merge branch 'main' into torch_dl
activesoull Apr 1, 2024
0d3da6a
bumb libdeeplake to 0.0.113
activesoull Apr 1, 2024
ab4df01
ssh
activesoull Apr 1, 2024
cde1beb
merge with main
activesoull Apr 1, 2024
a5406a1
Bump
activesoull Apr 1, 2024
90a9046
Merge branch 'fix-mypy' into torch_dl
khustup2 Apr 1, 2024
e1eab17
bumb libdeeplake to 0.0.115
activesoull Apr 2, 2024
1cbfbf1
set endpoint
activesoull Apr 2, 2024
4a53cf1
set endpoint
activesoull Apr 2, 2024
d9857ef
bumb libdeeplake to 0.0.116
activesoull Apr 2, 2024
e5f37a6
added MacOS environment variable exception
activesoull Apr 10, 2024
8ee120a
adjust with 3.9 changes
activesoull Apr 18, 2024
dac271c
Merge branch 'torch_dl' of github.com:activeloopai/deeplake into torc…
activesoull Apr 18, 2024
ea21124
set origin path too
activesoull Apr 24, 2024
0e567a4
Merge branch 'main' of github.com:activeloopai/deeplake into torch_dl
activesoull Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 48 additions & 2 deletions deeplake/core/dataset/indra_dataset_view.py
Expand Up @@ -60,6 +60,40 @@ def __init__(
except:
pass

def __getstate__(self) -> Dict[str, Any]:
keys = [
"path",
"_read_only",
"group_index",
"storage",
"_token",
"verbose",
"enabled_tensors",
"index"
]

state = {k: getattr(self, k) for k in keys}
return state

def __setstate__(self, state):
from indra import api # type: ignore

d: Dict[str, Any] = {}
self.storage = state["storage"]
d["indra_ds"] = api.load_from_storage(self.storage.core)
d["group_index"] = state["group_index"]
d["enabled_tensors"] = state["enabled_tensors"]
d["verbose"] = state["verbose"]
d["_token"] = state["_token"]
self.__dict__.update(d)
self._view_base = None
self._view_entry = None
self._read_only = state["_read_only"]
self._locked_out = False
self._query_string = None
index = state["index"]
self.indra_ds = self[list(index.values[0].value)].indra_ds

@property
def meta(self):
return DatasetMeta()
Expand Down Expand Up @@ -97,6 +131,10 @@ def commit_id(self) -> str:
def libdeeplake_dataset(self):
return self.indra_ds

@libdeeplake_dataset.setter
def libdeeplake_dataset(self, new_indra_ds):
self.indra_ds = new_indra_ds

def merge(self, *args, **kwargs):
raise InvalidOperationError(
"merge", "merge method cannot be called on a Dataset view."
Expand Down Expand Up @@ -188,23 +226,31 @@ def __getitem__(
)
for x in item
]
return IndraDatasetView(
ret = IndraDatasetView(
indra_ds=self.indra_ds,
enabled_tensors=enabled_tensors,
)
if hasattr(self, "_tql_query"):
ret._tql_query = self._tql_query
return ret
elif isinstance(item, tuple) and len(item) and isinstance(item[0], str):
ret = self
for x in item:
ret = self[x]
return ret
else:
return IndraDatasetView(
ret = IndraDatasetView(
indra_ds=self.indra_ds[item],
)
if hasattr(self, "_tql_query"):
ret._tql_query = self._tql_query
return ret
else:
raise InvalidKeyTypeError(item)

raise AttributeError("Dataset has no attribute - {item}")


def __getattr__(self, key):
try:
ret = self.__getitem__(key)
Expand Down
11 changes: 8 additions & 3 deletions deeplake/core/io.py
Expand Up @@ -293,21 +293,26 @@ def __init__(
self.tensors = tensors
self.pad_tensors = pad_tensors
self.decode_method = decode_method
jpeg_png_compressed_tensors, json_tensors, list_tensors = check_tensors(
self.dataset, tensors, verbose
)
(
jpeg_png_compressed_tensors,
json_tensors,
list_tensors,
medical_tensors,
) = check_tensors(self.dataset, tensors, verbose)
(
raw_tensors,
pil_compressed_tensors,
json_tensors,
list_tensors,
data_tensors,
medical_tensors,
) = validate_decode_method(
self.decode_method,
tensors,
jpeg_png_compressed_tensors,
json_tensors,
list_tensors,
medical_tensors,
)
sample_info_tensors, tensor_info_tensors = find_additional_tensors_and_info(
dataset, data_tensors
Expand Down
4 changes: 3 additions & 1 deletion deeplake/core/storage/gcs.py
Expand Up @@ -528,5 +528,7 @@ def get_object_from_full_url(self, url: str):

def get_creds(self):
d = self.scoped_credentials.get_token_info()
d["expiration"] = self.expiration or ""
d["expiration"] = (
self.expiration if hasattr(self, "expiration") and self.expiration else ""
)
return d
14 changes: 11 additions & 3 deletions deeplake/enterprise/convert_to_libdeeplake.py
Expand Up @@ -8,6 +8,7 @@
from deeplake.core.storage.azure import AzureProvider
from deeplake.util.remove_cache import get_base_storage
from deeplake.util.exceptions import EmptyTokenException
from deeplake.core.dataset.indra_dataset_view import IndraDatasetView

from deeplake.util.dataset import try_flushing # type: ignore
import importlib
Expand Down Expand Up @@ -65,6 +66,7 @@ def _get_indra_ds_from_azure_provider(
storage = IndraProvider(
path,
read_only=provider.read_only,
origin_path=provider.root,
token=token,
account_name=account_name,
account_key=account_key,
Expand Down Expand Up @@ -168,7 +170,7 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset):
token = (
hub2_dataset.client.get_token()
if (hub2_dataset.token is None or hub2_dataset._token == "")
and hub2_dataset.client
and hasattr(hub2_dataset, "client") and hub2_dataset.client
else hub2_dataset.token
)
if token is None or token == "":
Expand Down Expand Up @@ -247,5 +249,11 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset):
if slice_ != slice(None):
if isinstance(slice_, tuple):
slice_ = list(slice_)
libdeeplake_dataset = libdeeplake_dataset[slice_]
return libdeeplake_dataset
from deeplake.core.index import Index
try:
idx = Index(libdeeplake_dataset.indexes)
except:
idx = Index(slice(0, len(libdeeplake_dataset)))
if isinstance(slice_, slice) or (list(slice_) != list(idx.values[0].value)):
libdeeplake_dataset = libdeeplake_dataset[slice_]
return libdeeplake_dataset
85 changes: 64 additions & 21 deletions deeplake/enterprise/dataloader.py
@@ -1,9 +1,10 @@
from typing import Callable, Dict, List, Optional, Union
import deeplake
from deeplake.enterprise.convert_to_libdeeplake import dataset_to_libdeeplake

from deeplake.enterprise.dummy_dataloader import DummyDataloader # type: ignore
from deeplake.util.scheduling import create_fetching_schedule, find_primary_tensor
from deeplake.core.seed import DeeplakeRandom
from deeplake.util.exceptions import EmptyTensorError, MacOSEnvironmentError
from deeplake.enterprise.util import (
handle_mode,
raise_indra_installation_error,
Expand All @@ -22,6 +23,8 @@
from deeplake.util.dataset import map_tensor_keys
from functools import partial
import importlib
import os
import sys

try:
from torch.utils.data.dataloader import DataLoader, _InfiniteConstantSampler
Expand Down Expand Up @@ -112,6 +115,7 @@ def __init__(
_ignore_errors=False,
_verbose=False,
_offset=None,
_pin_memory=False,
**kwargs,
):
import_indra_loader()
Expand All @@ -137,6 +141,7 @@ def __init__(
self._ignore_errors = _ignore_errors
self._verbose = _verbose
self._offset = _offset
self._pin_memory = _pin_memory
for k, v in kwargs.items():
setattr(self, k, v)

Expand Down Expand Up @@ -343,11 +348,6 @@ def shuffle(self, shuffle: bool = True, buffer_size: int = 2048):
all_vars = self.__dict__.copy()
all_vars["_shuffle"] = shuffle
all_vars["_buffer_size"] = buffer_size
if shuffle:
schedule = create_fetching_schedule(self.dataset, self._primary_tensor_name)
if schedule is not None:
ds = self.dataset.no_view_dataset # type: ignore
all_vars["dataset"] = ds[schedule]
all_vars["_dataloader"] = None
return self.__class__(**all_vars)

Expand Down Expand Up @@ -479,6 +479,7 @@ def pytorch(
return_index: bool = True,
decode_method: Optional[Dict[str, str]] = None,
persistent_workers: bool = False,
pin_memory: bool = False,
):
"""Returns a :class:`DeepLakeDataLoader` object.

Expand All @@ -492,6 +493,7 @@ def pytorch(
distributed (bool): Used for DDP training. Distributes different sections of the dataset to different ranks. Defaults to ``False``.
return_index (bool): Used to idnetify where loader needs to retur sample index or not. Defaults to ``True``.
persistent_workers (bool): If ``True``, the data loader will not shutdown the worker processes after a dataset has been consumed once. Defaults to ``False``.
pin_memory (bool): If ``True``, the data loader will copy Tensors into device/CUDA pinned memory before returning them. Defaults to ``False``.
decode_method (Dict[str, str], Optional): A dictionary of decode methods for each tensor. Defaults to ``None``.


Expand Down Expand Up @@ -548,6 +550,7 @@ def pytorch(
all_vars["_mode"] = mode
all_vars["_persistent_workers"] = persistent_workers
all_vars["_dataloader"] = None
all_vars["_pin_memory"] = pin_memory
if distributed:
all_vars["_world_size"] = torch.distributed.get_world_size()
return self.__class__(**all_vars)
Expand Down Expand Up @@ -734,13 +737,13 @@ def __create_dummy_dataloader(

def __get_indra_dataloader(
self,
dataset,
indra_dataset,
deeplake_dataset,
tensors: Optional[List[str]] = None,
raw_tensors: Optional[List[str]] = None,
pil_compressed_tensors: Optional[List[str]] = None,
json_tensors: Optional[List[str]] = None,
list_tensors: Optional[List[str]] = None,
medical_tensors: Optional[List[str]] = None,
htype_dict: Optional[dict] = None,
ndim_dict: Optional[dict] = None,
tensor_info_dict: Optional[dict] = None,
Expand All @@ -767,26 +770,27 @@ def __get_indra_dataloader(
pil_compressed_tensors=pil_compressed_tensors or [],
json_tensors=json_tensors or [],
list_tensors=list_tensors or [],
medical_tensors=medical_tensors or [],
)

loader_meta = LoaderMetaInfo(
context=self.multiprocessing_context,
distributed=self._distributed,
mode=self._mode,
upcast=self._mode == "pytorch"
and self.__is_upcast_needed(
dataset, tensors
deeplake_dataset, tensors
), # upcast to handle unsupported dtypes,
return_index=self._return_index,
verbose=self._verbose,
ignore_errors=self._ignore_errors,
prefetch_factor=self._prefetch_factor,
offset=self._offset,
primary_tensor=self._primary_tensor_name,
worker_init_fn=self.worker_init_fn,
pin_memory=self.pin_memory,
)

return INDRA_LOADER( # type: ignore [misc]
indra_dataset,
deeplake_dataset=deeplake_dataset,
batch_size=self._batch_size,
num_threads=num_threads,
shuffle=self._shuffle,
Expand All @@ -801,30 +805,62 @@ def __get_indra_dataloader(
info=info,
)

def _fill_sample_info_tensors(
self,
dataset,
sample_info_tensors,
json_tensors,
list_tensors,
):
for tensor_name in sample_info_tensors:
tensor = dataset._get_tensor_from_root(tensor_name)
if len(tensor) == 0:
raise EmptyTensorError(
f" the dataset has an empty tensor {tensor_name}, pytorch dataloader can't be created."
f" Please either populate the tensor or pass tensors argument to .pytorch that excludes this"
f" tensor."
)
meta = tensor.meta
if meta.htype == "json":
json_tensors.append(tensor_name)
elif meta.htype == "list":
list_tensors.append(tensor_name)
elif meta.htype == "tag":
list_tensors.append(tensor_name)

def __iter__(self):
if self._dataloader is None:
dataset = self.dataset
tensors = self._tensors or map_tensor_keys(dataset, None)

jpeg_png_compressed_tensors, json_tensors, list_tensors = check_tensors(
dataset, tensors
)
(
jpeg_png_compressed_tensors,
json_tensors,
list_tensors,
medical_tensors,
) = check_tensors(dataset, tensors)
(
raw_tensors,
pil_compressed_tensors,
json_tensors,
list_tensors,
data_tensors,
medical_tensors,
) = validate_decode_method(
self._decode_method,
tensors,
jpeg_png_compressed_tensors,
json_tensors,
list_tensors,
medical_tensors,
)
sample_info_tensors, tensor_info_tensors = find_additional_tensors_and_info(
dataset, data_tensors
)
self._fill_sample_info_tensors(
dataset, sample_info_tensors, json_tensors, list_tensors
)

tensors.extend(sample_info_tensors)
htype_dict, ndim_dict, tensor_info_dict = get_htype_ndim_tensor_info_dicts(
dataset, data_tensors, tensor_info_tensors
Expand All @@ -837,31 +873,38 @@ def __iter__(self):
pil_compressed_tensors=pil_compressed_tensors,
)
else:
if not hasattr(self, "_indra_dataset"):
indra_dataset = dataset_to_libdeeplake(dataset)
else:
indra_dataset = self._indra_dataset

self._dataloader = self.__get_indra_dataloader(
dataset,
indra_dataset,
tensors=tensors,
raw_tensors=raw_tensors,
pil_compressed_tensors=pil_compressed_tensors,
json_tensors=json_tensors,
list_tensors=list_tensors,
medical_tensors=medical_tensors,
htype_dict=htype_dict,
ndim_dict=ndim_dict,
tensor_info_dict=tensor_info_dict,
)

dataset_read(self.dataset)

self._check_environment()
if self._iterator is not None:
self._iterator = iter(self._dataloader)

return self

def _check_environment(self):
if sys.platform == "darwin":
import multiprocessing as mp

if mp.get_start_method() == "fork":
env_vars = os.environ
no_proxy = env_vars.get("NO_PROXY", "")
init_check = env_vars.get("OBJC_DISABLE_INITIALIZE_FORK_SAFETY", "")
if no_proxy != "*" or init_check != "YES":
raise MacOSEnvironmentError

def __setattr__(self, attr, val):
if (
attr == "_iterator"
Expand Down