Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a pytorch dataloader with 2 data sources #5220

Open
1 task done
rvandeghen opened this issue Dec 5, 2023 · 5 comments
Open
1 task done

Create a pytorch dataloader with 2 data sources #5220

rvandeghen opened this issue Dec 5, 2023 · 5 comments
Assignees
Labels
question Further information is requested

Comments

@rvandeghen
Copy link

Describe the question.

Hi,

I would like to know how I can create 2 pipelines with 2 different sources, and use them in the same dataloader using DALIGenericIterator, or something else more appropriated.

Thanks

Check for duplicates

  • I have searched the open bugs/issues and have found no duplicates for this bug report
@rvandeghen rvandeghen added the question Further information is requested label Dec 5, 2023
@stiepan
Copy link
Member

stiepan commented Dec 5, 2023

Hi @rvandeghen,

Could you tell us more on the use case at hand (what data you'd like to read, what framework you're using)?

In principle, you can have a single pipeline with multiple readers/external sources that produces multiple outputs and wrap the pipeline in a DALIGenericIterator. There's an option to pass multiple pipelines to a single DALIGenericIterator, but that's aimed for sharding/data-parallelism, so typically that would be instances of the same pipeline.

@rvandeghen
Copy link
Author

Hi @stiepan,

I'm using pytorch, and I would like to have data coming from videos and from images. Currently, I was using them separately, so I have 2 independent pipes with 2 different dataloader, but I would like to merge them in the same dataloader and use them in the same training.
If possible, I would like to have the opportunity to chose if a batch is created from samples of the 2 sources or from the same source only.

If you need more information about the use case, let me know.

@stiepan
Copy link
Member

stiepan commented Dec 5, 2023

Thanks for the details @rvandeghen.

I would like to merge them in the same dataloader and use them in the same training.

In the simple case when you need images and videos you could just read them both and return from the same pipeline.

If possible, I would like to have the opportunity to chose if a batch is created from samples of the 2 sources or from the same source only.

This may be tricky. With the exception of if/else blocks (with conditional execution enabled), DALI requires batch size to stay the same across a single iteration in the pipeline. So, something like reading at once batch of video, batch of imges and merging it into a single batch won't work (as the output batch will be bigger). With the conditionals, you could create an output batch that picks some samples from one batch and others from another batch (as long as the two source batch sizes and the output batch size stay the same), but that's also subject to some restrictions (i.e. the dimensionality and data-type of the the samples in a batch must match).

Could you tell us more how you mix the modalities when feeding them into the network? If you switch after a number of iterations, maybe custom Python iterator that wraps two DALIGenericIterators and just switches from time to time which one to call would be a solution?

Do you use DALI's fn.readers or external_source?
Could you provide us with some sample code/simplified example of the two pipelines?

@rvandeghen
Copy link
Author

@stiepan
Here are some code snippets that we currently use.

For the images, we have a pipeline something similar to

@pipeline_def
def create_image_reader_pipeline(files, shard_id=0, num_shards=1):
    files, _ = fn.readers.file(files=files,
                               shard_id=shard_id,
                               num_shards=num_shards,
                               random_shuffle=True,
                               name="Reader",
                              )
    images = fn.decoders.image(files, device="mixed", hw_decoder_load=0.75)

    # do some data augmentations...

   return images

and its corresponding dataloader

class ImageDataset(DALIGenericIterator):
    def __init__(self, *kargs, **kvargs):
        super().__init__(*kargs, **kvargs)

    def __next__(self):
        out = super().__next__()
        out = out[0]["data"]
       
        return out

For the videos, our pipeline follows the same idea

@pipeline_def
def video_pipe(files, shard_id=0, num_shards=1):

    videos = fn.readers.video(
        filenames = file_names,
        sequence_length = sequence_length,
        step = step,
        stride = stride,
        shard_id = shard_id,
        num_shards = num_shards,
        device="gpu",
        random_shuffle = random_shuffle,
        file_list_include_preceding_frame = True,
        skip_vfr_check = True,
        dtype = types.FLOAT,
        seed = seed,
        name = "Reader"
    )

    # do some data augmentations...

    return videos

and the dataloader

class VideoDataset(pytorch.DALIGenericIterator):
    def __init__(self, *kargs, **kvargs):
        super().__init__(*kargs, **kvargs)

    def __next__(self):
        out = super().__next__()
        out = out[0]["data"]

        B, F, C, H, W = out.size()
        out = out.view(B*F, C, H, W)
        return out

Note that for this simple case, our effective batch size is given as the batch size multiplied by the sequence length, thus the dimension is the same as for the image pipeline.

We consider that our model is agnostic to the modality of data.

@stiepan
Copy link
Member

stiepan commented Dec 11, 2023

Hi @rvandeghen,

Thanks for the details of your use case. As I mentioned before, unfortunately, the pipeline (and the provided iterator) are constrained in that the batch for the inputs and outputs must be equal, so currently there's no way to merge two different batches into a single one of bigger size. Maybe even more importantly, if we have two readers in the pipeline, both of them will produce the full batch in each iteration, so there's no way to delay progress of reading images to videos (or the other way round) within single pipeline.

Given those constraint, I think the best option at the table is to use the two pipelines separately and implement Python iterator that takes the two pipeline instances in the constructor and implements its own logic of which of the pipelines should be quereied for the next iteration and combining the outputs using pytroch.

The closest to what you would like to do that is possible with DALI right now is, I think, reading two full batches of images and videos and mixing them into two batches with the samples shuffled. I am not sure if that's of any help in your case, but just for completness, here's a sample code that does that:

import numpy as np
from nvidia.dali import types, fn, pipeline_def

num_frames = 16
size = (223, 367)

def load_images():
    images, _ = fn.readers.file(name="Reader", file_root=data_path, random_shuffle=True)
    images = fn.decoders.image(images, device="mixed")
    images = fn.resize(images, size=size)
    return images
    
    
def load_video():
    video = fn.readers.video_resize(
        filenames=vid_filenames,
        sequence_length=num_frames,
        resize_x=size[1],
        resize_y=size[0],
        file_list_include_preceding_frame=True,
        device='gpu')
    return video


def first_batch(sample_info):
    sample_idx = sample_info.idx_in_epoch
    epoch_idx = sample_info.epoch_idx
    iteration = sample_info.iteration
    # some predicate to decide if we pick the image or the video into the
    # first batch
    return np.array(sample_idx % 2)


@pipeline_def(enable_conditionals=True, batch_size=8, num_threads=4, device_id=0, seed=42)
def pipeline():
    image = load_images() # HWC
    video = load_video() # FHWC
    image = fn.expand_dims(image, axes=[0], new_axis_names="F")  # HWC -> FHWC with single frame
    image_or_vid = fn.external_source(first_batch, batch=False)
    if image_or_vid:
        first = image
        second = video
    else:
        first = video
        second = image
    return first, second

p = pipeline()
p.build()
first, second = p.run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants