Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control order of execution with query planner #11067

Open
jtilly opened this issue Apr 25, 2024 · 2 comments
Open

Control order of execution with query planner #11067

jtilly opened this issue Apr 25, 2024 · 2 comments
Labels
needs triage Needs a response from a contributor

Comments

@jtilly
Copy link

jtilly commented Apr 25, 2024

Describe the issue:

We often use dask together with plateau to perform the following task in parallel:

  • load a chunk of data
  • do a transformation
  • write the chunk of data to disk

We typically do this with datasets that don't fit into our machine's RAM. This used to work fine, but with the new query planner, we typically run out of memory, because dask now loads all the data before doing a transformation. Before, dask would operate chunk by chunk.

Minimal Complete Verifiable Example:

I tried to come up with a simple example that doesn't involve plateau (or a lot of data):

import dask
import dask.dataframe as dd
import pandas as pd

from dask.distributed import Client, LocalCluster

# load the data from disk
@dask.delayed
def _load(x):
    print(f"Loading chunk {x}")
    return pd.DataFrame({"x": x, "y": [1, 2, 3]})

# transform and store
def _transform_and_store(df):
    # this example doesn't transform or store anything, but you get the idea
    x = df["x"].unique().item()
    print(f"Storing chunk {x}")


if __name__ == "__main__":

    ddf = dd.from_delayed([_load(x) for x in range(10)], meta={"x": "int64", "y": "int64"})

    with LocalCluster(n_workers=1, threads_per_worker=1) as cluster:
        with Client(cluster):
            ddf.map_partitions(_transform_and_store, meta={}).compute()

The behavior without the query planner is:

DASK_DATAFRAME__QUERY_PLANNING=False python example.py 
Loading chunk 9
Storing chunk 9
Loading chunk 8
Storing chunk 8
Loading chunk 7
Storing chunk 7
Loading chunk 6
Storing chunk 6
Loading chunk 5
Storing chunk 5
Loading chunk 4
Storing chunk 4
Loading chunk 3
Storing chunk 3
Loading chunk 2
Storing chunk 2
Loading chunk 1
Storing chunk 1
Loading chunk 0
Storing chunk 0

So, we load, transform and store and won't run out of memory.

DASK_DATAFRAME__QUERY_PLANNING=True python example.py 
Loading chunk 7
Loading chunk 2
Loading chunk 3
Loading chunk 0
Loading chunk 9
Loading chunk 1
Loading chunk 8
Loading chunk 5
Loading chunk 4
Loading chunk 6
Storing chunk 9
Storing chunk 8
Storing chunk 7
Storing chunk 6
Storing chunk 5
Storing chunk 4
Storing chunk 3
Storing chunk 2
Storing chunk 1
Storing chunk 0

Here, we first load everything and so we'll run out of memory.

  • Is there a way to tell the query planner to operate chunk-by-chunk when it encounters map_partitions?
  • Should I not be using map_partitions for something like this?
  • NB: DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1 doesn't help

Anything else that we should know?

Note that it's really tricky to turn off the query planner (other than through environment variables). When using dask.config from within Python, it's important that the config is set before the first import of dask.dataframe, which is difficult to control.

Environment:

  • Dask version: 2024.4.2
  • Python version: 3.12.3
  • Operating System: linux
  • Install method (conda, pip, source): conda

Thank you!

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Apr 25, 2024
@phofl
Copy link
Collaborator

phofl commented Apr 25, 2024

Hi, thanks for your report. This is definitely a bug, this should behave the same as on the previous version. I'll put up a solution

@fjetter
Copy link
Member

fjetter commented Apr 29, 2024

I migrated read_dataset_as_ddf already to from_map in data-engineering-collective/plateau#80 if there are any remaining from_delayed calls I strongly recommend to replace them as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

3 participants