Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db.to_dataframe, throws: TypeError: 'coroutine' object is not iterable #11051

Open
SGT911 opened this issue Apr 14, 2024 · 2 comments
Open

db.to_dataframe, throws: TypeError: 'coroutine' object is not iterable #11051

SGT911 opened this issue Apr 14, 2024 · 2 comments
Labels
needs triage Needs a response from a contributor

Comments

@SGT911
Copy link

SGT911 commented Apr 14, 2024

Describe the issue:
Using dask bag to normalize the data into a dask dataframe throws: TypeError: 'coroutine' object is not iterable

Minimal Complete Verifiable Example:

from dask.distributed import Client
import dask.bag as db
import asyncio

async def create_dataframe():
    # Data from sources stuff...
    data = [{'_id': 1, 'name': 'Bob'}]
    data_bag = db.from_sequence(data)
    data_bag = data_bag.map(lambda x: {'id': x['_id'], 'name': x['name'].lower()})

    return data_bag.to_dataframe()

async def main():
    cli = await Client('localhost:8786', asynchronous=True)
    df = await create_dataframe()
    df = await cli.compute(df)
    print(df)
    await cli.close()


if __name__ == '__main__':
    asyncio.run(main())

Anything else we need to know?:
The error was caused using a dask.distributed async client.
I'm also tried using .flatten() method and dd.from_delayed(data_bag.to_delayed())

Environment:

  • Dask version: dask, version 2024.2.0
  • Python version: Python 3.11.6
  • Operating System: Docker - Debian GNU/Linux 12 (bookworm)
  • Install method (conda, pip, source): pip
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Apr 14, 2024
@fjetter
Copy link
Member

fjetter commented Apr 15, 2024

Thanks for the report. This is unfortunately a known issue of the async Client and one we will likely not be able to fix. However, there are ways to avoid this problem

First of all, it's important to understand where this is coming from. Your data_bag.to_dataframe() method call is not providing any meta, i.e. we have to guess / determine what the schema of the dataframe is going to look like. To do this, we actually run the very first task of the bag ourselves to check the schema. This is typically something users want to avoid and providing meta specifically avoids this issue, i.e.

async def create_dataframe():
    ...
    return data_bag.to_dataframe(meta={"id": int, "name": str})

fixes the issue. The reason is that the internal call to compute the first partition is not handling async code. This is a shortcoming of the async Client when working with dask collections.

Ideally, you wouldn't load the data locally, though, but do this on the cluster as well. Most users are not using the async dask Client unless they work with the lower level Future interface.

For example, using dask.delayed and dask.dataframe this would look as

import dask
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd

@dask.delayed()
def get_data_from_one_source(ix):
    data = {"_id": ix, "name": "Bob"}
    return pd.DataFrame([data])

def create_dataframe():
    # Data from sources stuff...
    data = [
        get_data_from_one_source(0)
    ]
    ddf = dd.from_delayed(data)
    ddf = ddf.rename(columns={"_id": "id"})
    ddf["name"] = ddf.name.str.lower()
    return ddf

with Client() as client:
    df = create_dataframe()
    df = df.compute()
df

@SGaviriaBrickell
Copy link

Thanks, i changed the implementation with dask dataframe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

3 participants