Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor scheduling with flox, leading to high memory usage and eventual failure #11026

Open
ivirshup opened this issue Mar 26, 2024 · 7 comments
Open
Labels
needs triage Needs a response from a contributor

Comments

@ivirshup
Copy link

ivirshup commented Mar 26, 2024

Describe the issue:

When performing aggregation with flox, I quickly run out of memory, regardless of the size of a chunk.

It appears that lower nodes (loading data) keep getting run while higher nodes (reduction) sit un-computed.

Opening here as requested in:

cc: @dcherian

Minimal Complete Verifiable Example:

import dask.distributed as dd
import dask.array as da
import numpy as np
import flox

cluster = dd.LocalCluster(n_workers=3)
client = dd.Client(cluster)

M, N = 1_000_000, 20_000

# Note that it doesn't seem to matter where X comes from.
# In my real-world use case, it is either generated from `map_blocks` or 
# read directly from a zarr store
X = da.random.normal(size=(M, N), chunks=(5_000, N))
by = np.random.choice(2_000, size=M)

res, codes = flox.groupby_reduce(
    X.T,
    by,
    func="sum",
    fill_value=0,
    method="map-reduce",
    reindex=True,
)

res_comp = res.compute()

Anything else we need to know?:

As an example, here's how my task graph completes if I run `X.sum(axis=0).compute()` image
Whereas `res.compute()` ends up with something more like: image image

Environment:

  • Dask version: '2024.3.1'
  • Python version: Python 3.11.7 | packaged by conda-forge | (main, Dec 23 2023, 14:43:09) [GCC 12.3.0]
  • Operating System: ubuntu (but can replicate on mac)
  • Install method (conda, pip, source): pip (in conda env)
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Mar 26, 2024
@fjetter
Copy link
Member

fjetter commented Mar 28, 2024

I only very briefly looked into this so far but it looks like the very first groupby tasks generate very large partitions, pretty much regardless of how the initial input partitions look like

image

I get many, many of those 600MiB chunks and this is clogging the cluster. From what I can tell, scheduling is working as intended. The reason why this isn't moving along is because some of those chunks end up on different workers and have to be moved around using network, i.e. the final reducer tasks are delayed since the workers have to fetch that data. While that is being fetched, the worker is attempting to run other tasks.

I'm a little surprised to see these large intermediate results on a map-reduce workflow but I don't have sufficient knowledge about flox to judge this properly.

@dcherian
Copy link
Contributor

dcherian commented Mar 28, 2024

The input chunk sizes are large here, so that's fixable in theory. I did notice that no root tasks were identified in the dashboard. @ivirshup's comment suggest that this is happening with Zarr inputs as well, not just random. Do you see this too?

EDIT: Those chunk tasks are blockwise reduction tasks fused with the random data generation task.

@ivirshup
Copy link
Author

@dcherian, adding an intermediate (computed) zarr store like this:

M, N = 500_000, 20_000
N_CATEGORIES = 2_000
SCRATCH = Path("/scratch/tmp")
X_pth = SCRATCH / "X.zarr"

X_orig = da.random.normal(size=(M, N), chunks=(5_000, N))

X_orig.to_zarr(X_pth, overwrite=True, compute=True)
X = da.from_zarr(X_pth)
by = np.random.choice(N_CATEGORIES, size=M)

Has the same behaviour for me.


@fjetter, what is that task representation?

I'm not familiar enough with dask to know what my expectations should be of the high level graph representations, but some stuff there does seem a little off.

image

My expectation would be that each groupby-sum chunk would be (N, N_CATEGORIES) with a total shape of (N, N_CATEGORIES * len(X.chunks[1]). Each partial aggregation layer after that reports chunk sizes to be (20000, 1) which also seems wrong. Not sure if this is related though.

@dcherian
Copy link
Contributor

dcherian commented Mar 28, 2024

Each partial aggregation layer after that reports chunk sizes to be (20000, 1) which also seems wrong.

Yes this is (intentionally) wrong.

Number of outputs is more indicative of the reduction tree. You start with 200 chunks -> 50 -> 13 -> 4 -> 1 (The default "split-size" for the tree reduction is 4, so at each stage 4 outputs get combined to 1).

I think Florian's image is a result of large initial chunk sizes. It's certaily matches the input chunk size.

image

I'll also note that number of categories ~ chunksize of input along the reduction dimension, so the blockwise reduction isn't too effective at reducing memory. It is doing a factor of 2 reduction in your latest example (5000->2000).

Note: You could try passing reindex=False, this would control memory at the expense of slower stages during the tree reduction. But make sure to do this for your actual data, it shouldn't show too much difference (on average) with purely random groups, but will definitely be slower.

@ivirshup
Copy link
Author

ivirshup commented Mar 29, 2024

Number of outputs is more indicative of the reduction tree. You start with 200 chunks -> 50 -> 13 -> 4 -> 1 (The default "split-size" for the tree reduction is 4, so at each stage 4 outputs get combined to 1).

That makes sense, but I guess I would have expected to have arrays that look like:

  • shape=(N_FEATURES, N_CATEGORIES, N_REDUCTIONS), chunks=(FEATURE_CHUNKS, N_CATEGORIES, 1)

Instead of:

  • shape=(N_FEATURES, N_REDUCTIONS), chunks=(FEATURE_CHUNKS, 1)

Why does the groupby_sum-chunk task output an array the same size as the input?


FWIW, I tried this with the threading scheduler and it seemed to work great with no memory issues. So definitely speeks to high cost of transfer.


I was under the impression that the scheduler would try to reduce the number of data transfers necessary. Is this explicitly done, or is this just expected to happen due to last-in-first-out task distribution?

@dcherian
Copy link
Contributor

Why does the groupby_sum-chunk task output an array the same size as the input?

This is interesting, I don't actually update the sizes.

@fjetter would the scheduling algorithm work better if the size/shapes were more accurate at the blockwise step?

@mrocklin
Copy link
Member

mrocklin commented Mar 29, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

4 participants