New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shuffle-based drop duplicates produces incorrect result with shuffle="p2p"
#10708
Shuffle-based drop duplicates produces incorrect result with shuffle="p2p"
#10708
Comments
cc @phofl |
Related to dask/dask-expr#526 |
I am fine with switching to keep=None, I am also fine with warning if the user gives anything other than the default for keep and we would default to p2p shuffle, but I am -1 on choosing task based shuffling without the user specifically requesting it in these scenarios. Users should actively request tasks |
Yeah, I suppose can understand that. I do think it is best to use the "stable" shuffle algorithm unless the user explicitly asks for "tasks". In that case, my only suggestion would be to warn the user that the results are likely to be "wrong" (as you already mentioned). |
@hendrikmakait with similar indexing foo as for arrays we should be able to make P2P stable, shouldn't we? |
Sure, guaranteed ordering is definitely possible. I'm not sure if indexing foo is sufficient or if we also need to make changes to guarantee ordering within shards but we can look into this. |
It seems like the groupby-based sharing should should preserve ordering on its own, no? If there is motivation to avoid any modifications to the general "p2p" logic, what about the naive approach of (optionally) adding a |
How do you mean? |
Sorry, I'm probably "thinking out loud" a bit too much. I was mis-remembering that the initial "sharding" in |
For the drop_duplicates I believe there is a simple way to fix this, see #10722 (I may need to be a little more careful about when that sort is appropriate but I think it's not harmful) I opened dask/distributed#8421 to discuss the possibility to make P2P stable but I don't consider this high prio right now |
Describe the issue:
The
dask.dataframe.DataFrame.drop_duplicates
API includes akeep
argument, where the user can specify something like"first"
or"last"
to determine which duplicated row to keep in the output.DataFrame.drop_duplicates
will return correct results when an ACA-based algorithm is used to reduce a deduplicated DataFrame. You will also get correct results for the shuffle-based algorithm whenshuffle="tasks"
is used, because the task graph will always guarantee that initial ordering is preserved.The problem comes when the shuffle-based algorithm is used with
shuffle="p2p"
(the default whensplit_out > 1
and a distributed client is active). In this case, the initial ordering is not preserved after the shuffle. Therefore, a block-wisepd.DataFrame.drop_duplicates(..., keep="first")
operation will not necessarily keep the"first"
duplicate (according to global ordering).Minimal Complete Verifiable Example:
Anything else we need to know?:
My sense is that the following is true:
keep = "first"
andkeep = "last"
drop_duplicates(..., shuffle="p2p")
has proven valuable in dask-expr (using TPCh as a metric)Therefore, this may be another situation where the Dask DataFrame API needs to differ from the pandas API a bit. That is, we could use a
keep = None
default instead ofkeep = "first"
, because we should not allowshuffle="p2p"
whenkeep = "first"
orkeep = "last"
.The text was updated successfully, but these errors were encountered: