Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash join transfer with error cannot pickle '_contextvars.ContextVar' object #11019

Open
guozhans opened this issue Mar 24, 2024 · 6 comments
Open
Labels
dataframe p2 Affects more than a few users but doesn't prevent core functions

Comments

@guozhans
Copy link

guozhans commented Mar 24, 2024

Describe the issue:
Hi
I encountered this error, and don't know what happened under the hood. Therefore, I open it for better tracking.

I have some spatial datasets in parquet format with row group size 64MB that contains nodes and coordinates. I never intend to do any re-partition or shuffle operations, but it could happen during merge, or set index operation, etc.. The hash join issue was found with the dataset bigger more than one row group size enough to create few partitions in dataframe on hash join operations. The error always showed with hash-join-transfer-xxxxxxxxx operation.

To workaround this issue, i changed shuffle method back to "tasks".

Error messages:

Traceback (most recent call last):
  File "/opt/project/src/main/python/utils/shuffle_test.py", line 52, in <module>
    main()
  File "/opt/project/src/main/python/utils/shuffle_test.py", line 48, in main
    print(nodes["longitude"].compute())
  File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/loky/process_executor.py", line 370, in _sendback_result
    result_queue.put(
  File "/usr/local/lib/python3.10/dist-packages/loky/backend/queues.py", line 230, in put
    obj = dumps(obj, reducers=self._reducers)
  File "/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py", line 215, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py", line 208, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/usr/local/lib/python3.10/dist-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_contextvars.ContextVar' object

Minimal Complete Verifiable Example:

import faulthandler
import logging
from concurrent.futures import ThreadPoolExecutor

import dask
import dask.dataframe as dd

from distributed import WorkerPlugin, Worker, LocalCluster, Client, progress, wait
from loky import ProcessPoolExecutor


class TaskExecutorPool(WorkerPlugin):
    def __init__(self, logger, name):
        self.logger = logger
        self.worker = None
        self.name = name

    def setup(self, worker: Worker):
        faulthandler.enable()
        executor = ProcessPoolExecutor(max_workers=worker.state.nthreads)
        worker.executors[self.name] = executor
        self.worker = worker

    def transition(self, key, start, finish, *args, **kwargs):
        if finish == 'error':
            ts = self.worker.tasks[key]
            exc_info = (type(ts.exception), ts.exception, ts.traceback)
            print(f"Task traceback: {ts.traceback}")
            print(f"Task exception: {exc_info}")
            self.logger.error(f"Error during computation of {key}, caused by {str(ts.exception)}.")


def main():
    cluster = LocalCluster(processes=False, silence_logs=logging.DEBUG)
    with Client(cluster) as client:
        client.register_plugin(TaskExecutorPool(logging, "process"), name="process")
        with dask.annotate(executor="process", retries=10):
            nodes = dd.read_parquet("node parquet file with multiple row group size", columns=["id", "tags"])
            node_coordinates = dd.read_parquet("node parquet file with multiple row group size"
                                               , columns=["id", "latitude", "longitude"])
            
            # This re-partition operation can make different partitions to nodes, and later merge operation will have hash join operation inside to make error showed more often
            node_coordinates = node_coordinates.repartition(partition_size="some size")
            nodes = nodes.merge(
                node_coordinates, how="left", left_on=["id"], right_on=["id"], suffixes=(False, False), shuffle_method="p2p")
            client.persist(nodes)
            progress(nodes)
            print(nodes["latitude"].compute())


if __name__ == "__main__":
    main()

Anything else we need to know?:

Environment:

  • Dask version: 2024.2.1
  • Python version: 3.10
  • Operating System: Docker installed Ubuntu 22.04, Host is also Ubuntu 22.04
  • Install method (conda, pip, source):
    loky 3.4.1
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Mar 24, 2024
@phofl
Copy link
Collaborator

phofl commented Mar 28, 2024

Can you post a reproducible example? E.g. have some code that creates the parquet files that you read. I have trouble reproducing the error that you are seeing

@guozhans
Copy link
Author

Hi @phofl

I attached small dataset and my pip installation at bottom, you can try this dataset with my another script. the dataset is quite small, it suppose won't cause you OOM. At least it doesn't cause OOM here. :)

Also you must enable worker plugin with loky

If you use nanny, the script will run successfully.

dji-osm.tar.gz

import logging
import dask
import dask.dataframe as dd
import pandas as pd
import dask.config as dc

from dask.delayed import delayed
from distributed import WorkerPlugin, Worker, LocalCluster, Client
from loky import ProcessPoolExecutor


class TaskExecutorPool(WorkerPlugin):
    def __init__(self, logger, name):
        self.logger = logger
        self.worker = None
        self.name = name

    def setup(self, worker: Worker):
        executor = ProcessPoolExecutor(max_workers=worker.state.nthreads)
        worker.executors[self.name] = executor
        self.worker = worker

    def transition(self, key, start, finish, *args, **kwargs):
        if finish == 'error':
            ts = self.worker.tasks[key]
            exc_info = (type(ts.exception), ts.exception, ts.traceback)
            print(f"Task traceback: {ts.traceback}")
            print(f"Task exception: {exc_info}")
            self.logger.error(f"Error during computation of {key}, caused by {str(ts.exception)}.")


def main():
    cluster = LocalCluster(n_workers=4, processes=False, silence_logs=logging.DEBUG)
    with Client(cluster) as client:
        client.register_plugin(TaskExecutorPool(logging, "process"), name="process")
        with dask.annotate(executor="process", retries=10):
            dc.set({"dataframe.convert-string": False})
            ways = dd.read_parquet(
                "djibouti-latest.osm/way", columns=["id", "nodes"], blocksize="8MiB")
            node_coordinates = dd.read_parquet(
                "djibouti-latest.osm/node", columns=["latitude", "longitude"], index=["id"])
               
            way_dfs = ways.to_delayed()
            delays = []
            for way_df in way_dfs:
                delays.append(delayed(create_df)(way_df))
            dfs = dd.compute(*delays)
            df = dd.concat([*dfs])
            df = dd.merge(df, node_coordinates, left_on=["nodeId"], right_on=["id"], right_index=True).set_index("id", shuffle_method="p2p")
            print(f"df = {df.compute()}")


def create_df(way_df):
    new_df = way_df.set_index("id").nodes.apply(
        lambda ns: pd.Series([n["nodeId"] for n in ns], dtype="int64")
    ).convert_dtypes(convert_integer=True).stack().reset_index(0, name="nodeId")
    return dd.from_pandas(new_df, npartitions=10)

if __name__ == "__main__":
    main()

My pip install
pip install dask-kubernetes==2024.3.1 dask[complete]=="2024.2.1" dask-geopandas pandas==2.1.4 pandas[performance]==2.1.4 numpy==1.22.4 jupyter-server-proxy pyarrow==15.0.2 shapely==2.0.3 pyproj==3.6.1 geopandas==0.14.3 geoparquet==0.0.3 wheel loky==3.4.1 graphviz

@phofl phofl added dataframe p2 Affects more than a few users but doesn't prevent core functions and removed needs triage Needs a response from a contributor labels Apr 4, 2024
@phofl
Copy link
Collaborator

phofl commented Apr 4, 2024

Thanks! Is it possible to reproduce this without the tar file?

@guozhans
Copy link
Author

guozhans commented Apr 4, 2024

I don't understand, and can't you use this data? or you are check something? Or do you mean Dask can only work on specific dataset? Perhaps you could give more context.

I didn't try to reproduce it with other data, but i observe similar data set has same issue.

@phofl
Copy link
Collaborator

phofl commented Apr 4, 2024

It's always better to have something that can just be copy-paste for developers. Here is some context about why downloading those files can be a little concerning

#10995 (comment)

@guozhans
Copy link
Author

guozhans commented Apr 12, 2024

Hi @phofl
I see, and thanks for providing some context.

If i copy and paste, somehow it will change data format.
the attached file only has few hundreds lines. I hope it will work for you..

lie-de-clipperton.zip

Or you can download complete Île de Clipperton PBF data from https://download.geofabrik.de/australia-oceania.html
and transform it into parquet files by using osm-parquetizer, and then to separate ways nodes

Sam

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe p2 Affects more than a few users but doesn't prevent core functions
Projects
None yet
Development

No branches or pull requests

2 participants