Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel queue #311

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Parallel queue #311

wants to merge 4 commits into from

Conversation

dvolgyes
Copy link
Contributor

The code for #301. It is way uglier than it should be, but maybe it is easier to
understand this way. Tests, type hints, etc: i will fix it when we agreed on the details.
But now it should work.

In my examples the network processes a patch as fast as i can extract,
this will be a fundamenal limit. But if i put a time.sleep(1) into my training loop,
then i see that the queue preloads the volumes.

Recommendation: set the subject size to a small value, e.g. 3, the number of patches to a large number, like 64,
and put a time.sleep(1) into the training loop. Hopefully this will set conditions where you can study how it works.
(Assuming you have enough memory for that.)

@sarthakpati
Copy link
Contributor

Any idea when this will get merged?

@fepegar
Copy link
Owner

fepegar commented Dec 15, 2020

@sarthakpati, according to the discussion in #301, it's not clear that this implementation would generally be preferred, and I believe it's now in a prototyping state. You can always check out this branch and use it, if you like. I'll convert this PR to a draft, so it's clear that it's not just waiting to be reviewed.

@fepegar fepegar marked this pull request as draft December 15, 2020 13:12
@hsyang1222
Copy link
Contributor

hsyang1222 commented Apr 3, 2023

@fepegar, Would you consider providing a parallel queue?
Despite torchio offers the huge convenience and efficiency, we have to spend a lot of time preprocessing medical images. This preprocessing time is growing exponentially as datasets become higher quality, more diverse, and larger. The idea of learning on GPUs and parallelizing on CPUs used to feel like an experimental, but increasingly it seems to be a necessary methodology to solve the learning bottleneck.

I'm thinking of sticking with the current code interface, but offering this as an option in a parallel environment.

@sarthakpati
Copy link
Contributor

@sarthakpati, according to the discussion in #301, it's not clear that this implementation would generally be preferred, and I believe it's now in a prototyping state. You can always check out this branch and use it, if you like. I'll convert this PR to a draft, so it's clear that it's not just waiting to be reviewed.

Thanks! I will try this out and will be following the discussion closely. Cool work!

@fepegar
Copy link
Owner

fepegar commented Apr 5, 2023

@fepegar, Would you consider providing a parallel queue? Despite torchio offers the huge convenience and efficiency, we have to spend a lot of time preprocessing medical images. This preprocessing time is growing exponentially as datasets become higher quality, more diverse, and larger. The idea of learning on GPUs and parallelizing on CPUs used to feel like an experimental, but increasingly it seems to be a necessary methodology to solve the learning bottleneck.

I'm thinking of sticking with the current code interface, but offering this as an option in a parallel environment.

Unfortunately, I can't "provide" a parallel queue because I don't have the resources to work on it. I think nowadays the data loading tools available in modern versions of PyTorch would allow for a very nice implementation, but I really don't have the bandwidth. You're welcome to contribute!

@hsyang1222
Copy link
Contributor

@fepegar I have some possible improvements to make queues more parallel. Can you tell me my understanding of queues is correct, and my improvement ideas are useful?

Understanding how queue works
(1) In queue, subjects are already loaded in parallel from main process via pytorch dataloader. If you set num_workers >= 1, this dataloader prefetches data with prefetch argument, so fetching data from the main storage (e.g. SSD) is already non-blocking.
(2) The subjects fetched by the dataloader are moved to the main process when the queue calls fill(), after that transformed and cropped to the patch size.

I think this can be improved in two ways.
(a) Handle the transform process inside the dataloader that prefetches the subject. This will reduce the amount of time blocked in creating patches because when fill() is called, it will be passed subjects that have already been transformed.
(b) A typical transform consists of [the pretransform such as resampe, and the augmentation such as random noise]. The augmentation produces random results and must be executed every time, while the pretransform produces the same result every time so pretransform can be cached. In general, the resample time is long, so it's much better to save it to a file and then load it, even in a hard-disk environment. I want to automate this process: Split the transform into a pretransform and an augmentation, and cache the pretransform results.

@fepegar
Copy link
Owner

fepegar commented Apr 16, 2023

Handle the transform process inside the dataloader that prefetches the subject.

If I understood correctly, I think this is already the case. The data loader gets images from the dataset, where they are loaded and transformed in the subprocesses:

def __getitem__(self, index: int) -> Subject:
try:
index = int(index)
except (RuntimeError, TypeError):
message = (
f'Index "{index}" must be int or compatible dtype,'
f' but an object of type "{type(index)}" was passed'
)
raise ValueError(message)
subject = self._subjects[index]
subject = copy.deepcopy(subject) # cheap since images not loaded yet
if self.load_getitem:
subject.load()
# Apply transform (this is usually the bottleneck)
if self._transform is not None:
subject = self._transform(subject)
return subject

A typical transform consists of [the pretransform such as resampe, and the augmentation such as random noise]. The augmentation produces random results and must be executed every time, while the pretransform produces the same result every time so pretransform can be cached. In general, the resample time is long, so it's much better to save it to a file and then load it, even in a hard-disk environment. I want to automate this process: Split the transform into a pretransform and an augmentation, and cache the pretransform results.

This is indeed interesting. I think it's related to MONAI's CacheDataset. I wonder what the gains would be. I guess if there is a Resample transform used for preprocessing, caching could help quite a lot.

@dvolgyes
Copy link
Contributor Author

Hi,

I used to write the patch long time ago, and as @fepegar said: it was not clear if it would improve speed.
Unfortunately, I have no time to fix it, but if someone picks it up, here are my past year experiences:

  • Parallel data loader is nice, but quite tricky. Usually the transforms make it complicated, and CPU based transforms are slow. And you can't use GPU based transforms in worker processes.
  • The disk access is a huge bottleneck, it would be preferable to use some hiearchical structure which can read subsets properly and not in DICOM format. Remote sensing use tiled datasets, that is actually reasonably implementable in arbitrary dimension. (You select a griding, e.g. 64**3, that will be the underlying storage, and a slicing operator intelligently reads the blocks. E.g. x[58:66, ...] would read two blocks (along first dim) but not the rest.) Current medical format usually read the whole volume into memory first, which is quite expensive. So pre-processing a volume into a tiled volume is probably the most fruitful approach. I would probably try tensorstore first before I would try to implement anything on my own.
    https://google.github.io/tensorstore/

Static transforms could be applied before the dataset, and nonstatic afterwards, something like

noncached_dataset = Dataset(..., transform=cached_transforms)
dataset = CachedDataset(noncached_dataset, cache_dir=..., expiration_time=, transforms=...)

where the getitem_(self, idx): would be something like this:

def __getitem___(self, idx):
    if idx in self.cache:
        data = self.cache[idx]
    else:
        data = self.original_dataset[idx]  # cached transform is passed in original dataset
        self.cache[idx] = data
    if self.transform:
        data = self.transform(data)
    return data

So what do you need? First, even if you have N worker processes, only one should create the cache.
That should be even shared between multiple trainings. (imagine a server with 4 gpus, and you have parallel trainings)

However, this design has a serious limitation, that is CUDA access. Workers don't have CUDA access.
So probably it is a better way for going no worker processes, and give cuda access to the dataset object.
In this case, the dataset could make subprocesses, but probably shouldn't.
After years of maturing the idea, I would probably go for an async data storage where you can send many async calls, and the storage backend handles both parallelization, caching, ordering, etc., (see tensorstore)
The most important part here is the locking mechanism: that is error-prone to implement, and better to leave it to the storage than using multiprocessing locks.

In remote sensing, there is another thing: coordinate systems. While this is not directly applicable to medical imaging, the different modalities exhibit similar resampling challenges.
I would recommend taking a look at https://torchgeo.readthedocs.io/en/stable/
Both the coordinate system handling and the spatial samplers are interesting (random, grid, prechipped, etc.)

So summarizing: I would guess the disk format is the most significant bottleneck, and the only reasonable way, I think, it to read the pixel data from a preprocessed file, and as a first attempt I would go for tensorstore.
As of structure: I would go for async variant of tensorstore, and not using any subprocesses. This makes it possible to run CUDA based transforms asynchronously from the model. (just careful with free memory)

Ugly but not completely out of question: most medical datasets are small: make an in memory cache, load everything in the beginning of the training, and make subsamples from these. (unfortunately, we rarely get more than 100G data, and 100G in a server is not too much).

Monai & co: I would assume that everyone has the same problem, so sooner or later someone will solve it. But most frameworks have relatively clear cut at the dataset level, so it is not unreasonable to combine datasets/loaders/samplers with the other frameworks transforms, models, losses, etc.

@hsyang1222
Copy link
Contributor

Thank you, @fepegar and @dvolgyes , for your kind responses. I mentioned improving the prefetch of queue (a) and caching the pretransform (b). I'll talk about those two first, and then talk about a broader speedup (c).

(a) Improving prefetch on queue
I think fepegar is right about queue. The dataloader currently uses prefetch to fetch data asynchronously, but I still think it could be improved.
I'll create a separate issue to make a pull request for this.

(b) Caching pretransforms
Like fepegar and dvolgyes said, I think a CachedDataset could be helpful. I didn't know about CachedDataset when I started talking about this, so I hadn't considered it up front. Currently I simply (b-1) load the dataset from disk and create a ScalarImage(tensor= data from disk, affine = affine from disk) to store the data in memory, (b-2) perform the pretransform, and (b-3) only handle the augmentation when training. The speedup was about 5x per iteration.
However, this method was difficult to use in all common situations: memory management was difficult, and even a small increase in data moved data to swap memory, which was sometimes slower than just fetching data from disk.
Now that I know about CachedDataset, I'm sure there's research and libraries out there for this situation, and I'll be looking into it.

(c) Using CUDA for transforms
I used kornia (https://github.com/kornia/kornia) to perform augmentation via GPU. However, some features (elastic deformation, pretransform) are not supported and there are differences in the settings for each feature. The use of GPUs for augmentation does not seem to be fully consensual. For example, in my case, when training a large model, I use torchio transform as it is due to lack of GPU memory.

Thanks again for your answers. I think there is still a lot of room for improvement in medical image pretransforms. If I can think of more, I'll comment again.

@romainVala
Copy link
Contributor

Hi there,

Just my feedback about "pre-transform" because I do not think that this is the major bottleneck. Of course it may depend on the exact choice of transform (depending on your application). In my case I do use a RandomAffine, and a RandomElastics ... even worst I also find very useful to use RandomMotion ! So each on those transforms takes a quite a lot of time. I would love to see them implemented on GPU, (should it be a developing objective of torchio ???)
Anyway for now we need to deal with cpu transform ... For an efficient training I needed more than 32 cpu core (and a num_worker above 12) so that the gpu does not wait too long. But increasing num_worker does also goes with increasing (cpu) memory

Unfortunately the gpu node, I can access, do not have enough cpu memory and reducing the num_worker (to 6) lead to training time just too long (a factor 5 to 10 or more compare with almost no augmentation)

I end up doing the augmentation on a cpu cluster and saving each iteration on disk. This is a quite insane solution since I do need a few Terra of disk, to save my few hundred thousand augmented examples, but it is effective and I greatly speed up the training

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants