Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for audio/video support in hub.ingest #2072

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
40 changes: 36 additions & 4 deletions deeplake/api/dataset.py
Expand Up @@ -6,6 +6,11 @@
from typing import Dict, Optional, Union, List

from deeplake.auto.unstructured.kaggle import download_kaggle_dataset
from deeplake.auto.unstructured.image_classification import (
ImageClassification,
AudioClassification,
VideoClassification,
)
from deeplake.auto.unstructured.image_classification import ImageClassification
from deeplake.auto.unstructured.coco.coco import CocoDataset
from deeplake.auto.unstructured.yolo.yolo import YoloDataset
Expand Down Expand Up @@ -48,6 +53,13 @@
TokenPermissionError,
UnsupportedParameterException,
)
from hub.compression import (
IMAGE_COMPRESSIONS,
VIDEO_COMPRESSIONS,
AUDIO_COMPRESSIONS,
BYTE_COMPRESSIONS,
COMPRESSION_ALIASES,
)
from deeplake.util.storage import (
get_storage_and_cache_chain,
storage_provider_from_path,
Expand All @@ -57,6 +69,13 @@
from deeplake.util.cache_chain import generate_chain
from deeplake.core.storage.deeplake_memory_object import DeepLakeMemoryObject

_image_compressions = (
IMAGE_COMPRESSIONS[:] + BYTE_COMPRESSIONS + list(COMPRESSION_ALIASES)
)
_image_compressions.remove("dcm")
_video_compressions = VIDEO_COMPRESSIONS
_audio_compressions = AUDIO_COMPRESSIONS


class dataset:
@staticmethod
Expand Down Expand Up @@ -1273,6 +1292,7 @@ def ingest_yolo(
def ingest_classification(
src: Union[str, pathlib.Path],
dest: Union[str, pathlib.Path],
sample_compression: str = "auto",
aadityasinha-dotcom marked this conversation as resolved.
Show resolved Hide resolved
image_params: Optional[Dict] = None,
aadityasinha-dotcom marked this conversation as resolved.
Show resolved Hide resolved
label_params: Optional[Dict] = None,
dest_creds: Optional[Dict] = None,
Expand All @@ -1293,6 +1313,7 @@ def ingest_classification(
- an s3 path of the form ``s3://bucketname/path/to/dataset``. Credentials are required in either the environment or passed to the creds argument.
- a local file system path of the form ``./path/to/dataset`` or ``~/path/to/dataset`` or ``path/to/dataset``.
- a memory path of the form ``mem://path/to/dataset`` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
sample_compression (str): For image classification datasets, this compression will be used for the `images` tensor. If ``sample_compression`` is "auto", compression will be automatically determined by the most common extension in the directory.
image_params (Optional[Dict]): A dictionary containing parameters for the images tensor.
aadityasinha-dotcom marked this conversation as resolved.
Show resolved Hide resolved
label_params (Optional[Dict]): A dictionary containing parameters for the labels tensor.
dest_creds (Optional[Dict]): A dictionary containing credentials used to access the destination path of the dataset.
Expand Down Expand Up @@ -1359,6 +1380,7 @@ def ingest_classification(
dest,
"ingest_classification",
{
"sample_Compression": sample_compression,
"Progressbar": progressbar,
"Summary": summary,
},
Expand All @@ -1385,6 +1407,9 @@ def ingest_classification(
if not os.path.isdir(src):
raise InvalidPathException(src)

if sample_compression == "auto":
sample_compression = get_most_common_extension(src)
if sample_compression is None:
if image_params is None:
image_params = {}
if label_params is None:
Expand All @@ -1397,7 +1422,12 @@ def ingest_classification(
image_params["sample_compression"] = images_compression

# TODO: support more than just image classification (and update docstring)
unstructured = ImageClassification(source=src)
if sample_compression in _image_compressions:
unstructured = ImageClassification(source=src, htype="image") # type: ignore
elif sample_compression in _audio_compressions:
unstructured = AudioClassification(source=src, htype="audio") # type: ignore
elif sample_compression in _video_compressions:
unstructured = VideoClassification(source=src, htype="video") # type: ignore

ds = deeplake.empty(
dest, creds=dest_creds, token=token, verbose=False, **dataset_kwargs
Expand All @@ -1411,6 +1441,7 @@ def ingest_classification(
ds, # type: ignore
progressbar=progressbar,
generate_summary=summary,
tensor_args={"sample_compression": sample_compression},
image_tensor_args=image_params,
label_tensor_args=label_params,
num_workers=num_workers,
Expand All @@ -1425,7 +1456,7 @@ def ingest_kaggle(
src: Union[str, pathlib.Path],
dest: Union[str, pathlib.Path],
exist_ok: bool = False,
images_compression: str = "auto",
sample_compression: str = "auto",
dest_creds: Optional[Dict] = None,
kaggle_credentials: Optional[dict] = None,
progressbar: bool = True,
Expand All @@ -1444,7 +1475,7 @@ def ingest_kaggle(
- a local file system path of the form ``./path/to/dataset`` or ``~/path/to/dataset`` or ``path/to/dataset``.
- a memory path of the form ``mem://path/to/dataset`` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
exist_ok (bool): If the kaggle dataset was already downloaded and ``exist_ok`` is ``True``, ingestion will proceed without error.
images_compression (str): For image classification datasets, this compression will be used for the ``images`` tensor. If ``images_compression`` is "auto", compression will be automatically determined by the most common extension in the directory.
sample_compression (str): For image classification datasets, this compression will be used for the ``images`` tensor. If ``sample_compression`` is "auto", compression will be automatically determined by the most common extension in the directory.
dest_creds (Optional[Dict]): A dictionary containing credentials used to access the destination path of the dataset.
kaggle_credentials (dict): A dictionary containing kaggle credentials {"username":"YOUR_USERNAME", "key": "YOUR_KEY"}. If ``None``, environment variables/the kaggle.json file will be used if available.
progressbar (bool): Enables or disables ingestion progress bar. Set to ``True`` by default.
Expand All @@ -1468,7 +1499,7 @@ def ingest_kaggle(
dest,
"ingest_kaggle",
{
"Images_Compression": images_compression,
"sample_Compression": sample_compression,
"Exist_Ok": exist_ok,
"Progressbar": progressbar,
"Summary": summary,
Expand All @@ -1490,6 +1521,7 @@ def ingest_kaggle(
ds = deeplake.ingest_classification(
src=src,
dest=dest,
sample_compression=sample_compression,
image_params={"sample_compression": images_compression},
dest_creds=dest_creds,
progressbar=progressbar,
Expand Down
57 changes: 56 additions & 1 deletion deeplake/auto/tests/test_ingestion.py
Expand Up @@ -119,7 +119,62 @@ def test_image_classification_sets(memory_ds: Dataset):
assert ds["train/labels"].info.class_names == ["class0", "class1", "class2"]


def test_ingestion_exception(memory_path: str):
def test_audio(memory_ds: Dataset):
path = get_dummy_data_path("tests_auto/audio_classification")
src = "test_auto/invalid_path"
ds = deeplake.ingest(
src=path, dest=memory_ds.path, progressbar=False, summary=False, overwrite=False
)

with pytest.raises(InvalidPathException):
deeplake.ingest(
src=src,
dest=memory_ds.path,
progressbar=False,
summary=False,
overwrite=False,
)

with pytest.raises(SamePathException):
deeplake.ingest(
src=path, dest=path, progressbar=False, summary=False, overwrite=False
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 183 it says memory_path is not defined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe because of the undeclared variable

assert ds["audios"].meta.sample_compression == "mp3"
assert list(ds.tensors.keys()) == ["audios", "labels"]
assert ds["audios"].numpy().shape == (0,)
assert ds["audios"].numpy().shape == (0,)
assert ds["labels"].info.class_names == ("class0", "class1", "class2")

def test_video(memory_ds: Dataset):
path = get_dummy_data_path("tests_auto/video_classification")
src = "test_auto/invalid_path"
ds = deeplake.ingest(
src=path, dest=memory_ds.path, progressbar=False, summary=False, overwrite=False
)

with pytest.raises(InvalidPathException):
deeplake.ingest(
src=src,
dest=memory_ds.path,
progressbar=False,
summary=False,
overwrite=False,
)

with pytest.raises(SamePathException):
deeplake.ingest(
src=path, dest=path, progressbar=False, summary=False, overwrite=False
)

assert ds["videos"].meta.sample_compression == "mp4"
assert list(ds.tensors.keys()) == ["videos", "labels"]
aadityasinha-dotcom marked this conversation as resolved.
Show resolved Hide resolved
assert ds["videos"].numpy().shape == (0,)
assert ds["videos"].numpy().shape == (0,)
assert ds["labels"].info.class_names == ("class0", "class1", "class2")


def test_ingestion_exception(memory_ds: Dataset):
path = get_dummy_data_path("tests_auto/image_classification_with_sets")
with pytest.raises(InvalidPathException):
deeplake.ingest_classification(
Expand Down
78 changes: 71 additions & 7 deletions deeplake/auto/unstructured/image_classification.py
Expand Up @@ -17,7 +17,6 @@

import deeplake

IMAGES_TENSOR_NAME = "images"
LABELS_TENSOR_NAME = "labels"


Expand All @@ -43,8 +42,8 @@ def _set_name_from_path(path: Path) -> str:
return path.parts[-3]


class ImageClassification(UnstructuredDataset):
def __init__(self, source: str):
class Classification(UnstructuredDataset):
def __init__(self, source: str, htype: str):
"""Convert an unstructured dataset to a structured dataset.

Note:
Expand All @@ -70,6 +69,7 @@ def __init__(self, source: str):
f"No files found in {self.source}. Please ensure that the source path is correct."
)

self.htype = htype
self.set_names = self.get_set_names()
self.class_names = self.get_class_names()

Expand All @@ -94,6 +94,7 @@ def structure( # type: ignore
ds: Dataset,
progressbar: bool = True,
generate_summary: bool = True,
tensor_args: dict = {},
shuffle: bool = True,
image_tensor_args: dict = {},
label_tensor_args: dict = {},
Expand All @@ -105,6 +106,7 @@ def structure( # type: ignore
ds (Dataset): A Deep Lake dataset object.
progressbar (bool): Defines if the method uses a progress bar. Defaults to True.
generate_summary (bool): Defines if the method generates ingestion summary. Defaults to True.
tensor_args (dict): Defines the sample compression of the dataset (jpeg or png).
shuffle (bool): Defines if the file paths should be shuffled prior to ingestion. Defaults to True.
image_tensor_args (dict): Defines the parameters for the images tensor.
label_tensor_args (dict): Defines the parameters for the class_labels tensor.
Expand All @@ -115,7 +117,7 @@ def structure( # type: ignore

"""

images_tensor_map = {}
tensor_map = {}
labels_tensor_map = {}

use_set_prefix = len(self.set_names) > 1
Expand All @@ -124,6 +126,9 @@ def structure( # type: ignore
if not use_set_prefix:
set_name = ""

tensor_name = os.path.join(set_name, self.htype + "s")
labels_tensor_name = os.path.join(set_name, LABELS_TENSOR_NAME)
tensor_map[set_name] = tensor_name.replace("\\", "/")
images_tensor_name = os.path.join(
set_name, image_tensor_args.pop("name", IMAGES_TENSOR_NAME)
)
Expand All @@ -135,9 +140,9 @@ def structure( # type: ignore

# TODO: infer sample_compression
ds.create_tensor(
images_tensor_name.replace("\\", "/"),
htype="image",
**image_tensor_args,
tensor_name.replace("\\", "/"),
htype=self.htype,
**tensor_args,
)
ds.create_tensor(
labels_tensor_name.replace("\\", "/"),
Expand All @@ -146,6 +151,65 @@ def structure( # type: ignore
**label_tensor_args,
)

paths = self._abs_file_paths
skipped_files: list = []

iterator = tqdm(
paths,
desc='Ingesting "%s" (%i files skipped)'
% (self.source.name, len(skipped_files)),
total=len(paths),
disable=not progressbar,
)

with ds, iterator:
for file_path in iterator:
image = deeplake.read(file_path)

class_name = _class_name_from_path(file_path)

label = np.uint32(self.class_names.index(class_name))

set_name = _set_name_from_path(file_path) if use_set_prefix else ""

# TODO: try to get all len(shape)s to match.
# if appending fails because of a shape mismatch, expand dims (might also fail)
try:
ds[tensor_map[set_name]].append(image)

except TensorInvalidSampleShapeError:
im = image.array
reshaped_image = np.expand_dims(im, -1)
ds[tensor_map[set_name]].append(reshaped_image)

except Exception:
skipped_files.append(file_path.name)
iterator.set_description(
'Ingesting "%s" (%i files skipped)'
% (self.source.name, len(skipped_files))
)
continue

ds[labels_tensor_map[set_name]].append(label)

if generate_summary:
ingestion_summary(str(self.source), skipped_files)
return ds


class ImageClassification(Classification):
def __init__(self, source: str, htype: str):
super().__init__(source, htype)


class AudioClassification(Classification):
def __init__(self, source: str, htype: str):
super().__init__(source, htype)


class VideoClassification(Classification):
def __init__(self, source: str, htype: str):
super().__init__(source, htype)
paths = self._abs_file_paths
if shuffle:
rshuffle(paths)
Expand Down
32 changes: 32 additions & 0 deletions deeplake/compression.py
Expand Up @@ -70,6 +70,38 @@
)

VIDEO_COMPRESSIONS = ["mp4", "mkv", "avi"]

VIDEO_COMPRESSION_EXT_DICT = {
"mp4": [".mp4"],
"mkv": [".mkv"],
"avi": [".avi"],
}

VIDEO_COMPRESSION_EXTENSIONS = list(
set(itertools.chain(*VIDEO_COMPRESSION_EXT_DICT.values()))
)

AUDIO_COMPRESSIONS = ["mp3", "flac", "wav"]


AUDIO_COMPRESSION_EXT_DICT = {
"mp3": [".mp3"],
"flac": [".flac"],
"wav": [".wav"],
}

AUDIO_COMPRESSION_EXTENSIONS = list(
set(itertools.chain(*AUDIO_COMPRESSION_EXT_DICT.values()))
)

COMPRESSION_EXTENSIONS = list(
IMAGE_COMPRESSION_EXTENSIONS
+ VIDEO_COMPRESSION_EXTENSIONS
+ AUDIO_COMPRESSION_EXTENSIONS
)


READONLY_COMPRESSIONS = ["mpo", "fli", "dcm", *AUDIO_COMPRESSIONS, *VIDEO_COMPRESSIONS]
AUDIO_COMPRESSIONS = ["mp3", "flac", "wav"]
NIFTI_COMPRESSIONS = ["nii", "nii.gz"]
POINT_CLOUD_COMPRESSIONS = ["las"]
Expand Down
8 changes: 4 additions & 4 deletions deeplake/core/compression.py
Expand Up @@ -867,9 +867,9 @@ def _open_video(file: Union[str, bytes, memoryview]):
raise ModuleNotFoundError(
"PyAV is not installed. Run `pip install deeplake[video]`."
)
if isinstance(file, str):
if isinstance(file, (str, Path)):
container = av.open(
file, options={"protocol_whitelist": "file,http,https,tcp,tls,subfile"}
str(file), options={"protocol_whitelist": "file,http,https,tcp,tls,subfile"}
)
else:
container = av.open(BytesIO(file))
Expand Down Expand Up @@ -1042,9 +1042,9 @@ def _open_audio(file: Union[str, bytes, memoryview]):
raise ModuleNotFoundError(
"PyAV is not installed. Please run `pip install deeplake[audio]`"
)
if isinstance(file, str):
if isinstance(file, (str, Path)):
container = av.open(
file, options={"protocol_whitelist": "file,http,https,tcp,tls,subfile"}
str(file), options={"protocol_whitelist": "file,http,https,tcp,tls,subfile"}
)
else:
container = av.open(BytesIO(file))
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.