Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask Nunique bug under dask 2024.2.1 #10982

Open
frbelotto opened this issue Mar 7, 2024 · 7 comments
Open

Dask Nunique bug under dask 2024.2.1 #10982

frbelotto opened this issue Mar 7, 2024 · 7 comments

Comments

@frbelotto
Copy link

frbelotto commented Mar 7, 2024

Hello guys,
Take this CSV as an example dataframe. I am sorry but I could set an example dataframe by coding able to reproduce such bug
teste.csv

Now, lets open and execute the query on the example dataframe under dask 2023.10.0:

import dask.dataframe as dd
ddf = dd.read_csv('teste.csv', dtype = {'status':'category', 'produto':'category','parceiro':'category', 
                                            'mci' : 'Int32','marca':'category', 'sku' :'string', 'cod_transacao': 'string', 'forma_pagamento':'category',
                                            'gmv' : 'Float32', 'receita' : 'Float32', 'cashback':'Float32'})
base_consumo = ddf.groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()
base_consumo.head()

image

Runs ok!

Now, lets do the same test under dask 2024.2.1

import dask.dataframe as dd
ddf = dd.read_csv('teste.csv', dtype = {'status':'category', 'produto':'category','parceiro':'category', 
                                            'mci' : 'Int32','marca':'category', 'sku' :'string', 'cod_transacao': 'string', 'forma_pagamento':'category',
                                            'gmv' : 'Float32', 'receita' : 'Float32', 'cashback':'Float32'})
base_consumo = ddf.groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()
base_consumo.head()

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\utils.py:194](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:194), in raise_on_meta_error(funcname, udf)
    [193](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:193) try:
--> [194](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:194)     yield
    [195](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:195) except Exception as e:

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py:7174](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7174), in _emulate(func, udf, *args, **kwargs)
   [7173](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7173) with raise_on_meta_error(funcname(func), udf=udf), check_numeric_only_deprecation():
-> [7174](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7174)     return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\groupby.py:781](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:781), in _nunique_df_aggregate(df, levels, name, sort)
    [780](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:780) def _nunique_df_aggregate(df, levels, name, sort=False):
--> [781](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:781)     return df.groupby(level=levels, sort=sort, observed=True)[name].nunique()

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\groupby\generic.py:1951](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1951), in DataFrameGroupBy.__getitem__(self, key)
   [1947](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1947)     raise ValueError(
   [1948](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1948)         "Cannot subset columns with a tuple with more than one element. "
   [1949](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1949)         "Use a list instead."
   [1950](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1950)     )
-> [1951](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/groupby/generic.py:1951) return super().__getitem__(key)

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\base.py:244](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:244), in SelectionMixin.__getitem__(self, key)
    [243](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:243) if key not in self.obj:
--> [244](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:244)     raise KeyError(f"Column not found: {key}")
    [245](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/pandas/core/base.py:245) ndim = self.obj[key].ndim

KeyError: 'Column not found: marca'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Cell In[3], [line 1](vscode-notebook-cell:?execution_count=3&line=1)
----> [1](vscode-notebook-cell:?execution_count=3&line=1) base_consumo = ddf.groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()
      [2](vscode-notebook-cell:?execution_count=3&line=2) base_consumo.head()

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\groupby.py:3078](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3078), in SeriesGroupBy.nunique(self, split_every, split_out)
   [3075](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3075) else:
   [3076](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3076)     chunk = _nunique_series_chunk
-> [3078](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3078) return aca(
   [3079](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3079)     [self.obj, self.by]
   [3080](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3080)     if not isinstance(self.by, list)
   [3081](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3081)     else [self.obj] + self.by,
   [3082](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3082)     chunk=chunk,
   [3083](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3083)     aggregate=_nunique_df_aggregate,
   [3084](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3084)     combine=_nunique_df_combine,
   [3085](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3085)     token="series-groupby-nunique",
   [3086](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3086)     chunk_kwargs={"levels": levels, "name": name},
   [3087](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3087)     aggregate_kwargs={"levels": levels, "name": name},
   [3088](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3088)     combine_kwargs={"levels": levels},
   [3089](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3089)     split_every=split_every,
   [3090](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3090)     split_out=split_out,
   [3091](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3091)     split_out_setup=split_out_on_index,
   [3092](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3092)     sort=self.sort,
   [3093](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/groupby.py:3093) )

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py:7128](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7128), in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
   [7126](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7126) if meta is no_default:
   [7127](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7127)     meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
-> [7128](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7128)     meta = _emulate(
   [7129](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7129)         aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs
   [7130](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7130)     )
   [7131](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7131) meta = make_meta(
   [7132](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7132)     meta,
   [7133](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7133)     index=(getattr(make_meta(dfs[0]), "index", None) if dfs else None),
   [7134](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7134)     parent_meta=dfs[0]._meta,
   [7135](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7135) )
   [7137](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7137) graph = HighLevelGraph.from_collections(final_name, layer, dependencies=(chunked,))

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py:7173](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7173), in _emulate(func, udf, *args, **kwargs)
   [7168](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7168) def _emulate(func, *args, udf=False, **kwargs):
   [7169](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7169)     """
   [7170](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7170)     Apply a function using args / kwargs. If arguments contain dd.DataFrame /
   [7171](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7171)     dd.Series, using internal cache (``_meta``) for calculation
   [7172](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7172)     """
-> [7173](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7173)     with raise_on_meta_error(funcname(func), udf=udf), check_numeric_only_deprecation():
   [7174](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/core.py:7174)         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\contextlib.py:158](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:158), in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    [156](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:156)     value = typ()
    [157](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:157) try:
--> [158](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:158)     self.gen.throw(value)
    [159](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:159) except StopIteration as exc:
    [160](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:160)     # Suppress StopIteration *unless* it's the same exception that
    [161](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:161)     # was passed to throw().  This prevents a StopIteration
    [162](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:162)     # raised inside the "with" statement from being suppressed.
    [163](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/contextlib.py:163)     return exc is not value

File [c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\utils.py:215](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:215), in raise_on_meta_error(funcname, udf)
    [206](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:206) msg += (
    [207](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:207)     "Original error is below:\n"
    [208](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:208)     "------------------------\n"
   (...)
    [212](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:212)     "{2}"
    [213](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:213) )
    [214](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:214) msg = msg.format(f" in `{funcname}`" if funcname else "", repr(e), tb)
--> [215](file:///C:/Users/fabio/AppData/Local/Programs/Python/Python312/Lib/site-packages/dask/dataframe/utils.py:215) raise ValueError(msg) from e

ValueError: Metadata inference failed in `_nunique_df_aggregate`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError('Column not found: marca')

Traceback:
---------
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\utils.py", line 194, in raise_on_meta_error
    yield
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\core.py", line 7174, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\groupby.py", line 781, in _nunique_df_aggregate
    return df.groupby(level=levels, sort=sort, observed=True)[name].nunique()
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\groupby\generic.py", line 1951, in __getitem__
    return super().__getitem__(key)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\fabio\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\core\base.py", line 244, in __getitem__
    raise KeyError(f"Column not found: {key}")

Environment:

  • Dask version: 2024.2.1
  • Python version: 3.11.8 and 3.12.2
  • Operating System: windows 11
  • Install method (conda, pip, source): pip
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Mar 7, 2024
@frbelotto frbelotto changed the title Dask Nunique bug under Python 3.12 Dask Nunique bug under dask 2024.2.1 Mar 7, 2024
@phofl
Copy link
Collaborator

phofl commented Mar 7, 2024

The query doesn't make much sense, you are computing nunique on one of the group columns which will always return 1

We should fix this anyway though

@frbelotto
Copy link
Author

The query doesn't make much sense, you are computing nunique on one of the group columns which will always return 1

We should fix this anyway though

LOL.
This query is part of a bigger query sentence to extract how many unique clients ("MCIs") have consumed from each brand. Maybe it could be written is a smarter way, but when we get what was expected, I as not changing it anymore LOL

**base_consumo = gerado.loc[(gerado['produto'] == 'Afiliados') & (gerado['data'] >= datetime(2024,1,1))].groupby(['mci', 'marca'], dropna=False, observed=True)['marca'].nunique().to_frame()**
base_consumo = base_consumo.groupby(['mci'], dropna=False, observed=True).aggregate({'marca' : 'sum'})
base_consumo = base_consumo.rename(columns={'marca' : 'qtde_marcas'}).reset_index()
base_consumo = base_consumo.groupby('qtde_marcas', dropna=False, observed=True)['mci'].nunique().to_frame()
base_consumo = base_consumo.compute()
base_consumo.to_excel(f'{pastaloja}\\Clientes_por_marcas_afiliados.xlsx', merge_cells=False)

@phofl
Copy link
Collaborator

phofl commented Mar 7, 2024

PRs to fix are welcome

@frbelotto
Copy link
Author

PRs to fix are welcome

Is there a newbie guide for where to start? My knowledges on python is average, but I've no experience in building, sharing and keeping a library. I don't even know how to read the source code of the unique method so I could try to better understand what is happening.

@frbelotto
Copy link
Author

And for the related example of bug,it's interesting the the "marca" column returns an error, but any other column seems to work. My first thought was that is something related to the category dtype ( a very buggy dtype), but I've tried changing it to string and the error persist

@phofl
Copy link
Collaborator

phofl commented Mar 9, 2024

The error happens because Marca is part of your grouping keys, it’s not dtype related

@frbelotto
Copy link
Author

The error happens because Marca is part of your grouping keys, it’s not dtype related

But it does not happen if I use MCI column.

@phofl phofl added dataframe and removed needs triage Needs a response from a contributor labels Apr 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants