Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle-based drop duplicates produces incorrect result with shuffle="p2p" #10708

Closed
rjzamora opened this issue Dec 14, 2023 · 10 comments · Fixed by dask/distributed#8458 · May be fixed by #10722
Closed

Shuffle-based drop duplicates produces incorrect result with shuffle="p2p" #10708

rjzamora opened this issue Dec 14, 2023 · 10 comments · Fixed by dask/distributed#8458 · May be fixed by #10722
Labels
bug Something is broken dataframe

Comments

@rjzamora
Copy link
Member

Describe the issue:

The dask.dataframe.DataFrame.drop_duplicates API includes a keep argument, where the user can specify something like "first" or "last" to determine which duplicated row to keep in the output. DataFrame.drop_duplicates will return correct results when an ACA-based algorithm is used to reduce a deduplicated DataFrame. You will also get correct results for the shuffle-based algorithm when shuffle="tasks" is used, because the task graph will always guarantee that initial ordering is preserved.

The problem comes when the shuffle-based algorithm is used with shuffle="p2p" (the default when split_out > 1 and a distributed client is active). In this case, the initial ordering is not preserved after the shuffle. Therefore, a block-wise pd.DataFrame.drop_duplicates(..., keep="first") operation will not necessarily keep the "first" duplicate (according to global ordering).

Minimal Complete Verifiable Example:

import dask
from dask.distributed import LocalCluster, Client
import dask.dataframe as dd
from dask.datasets import timeseries

client = Client(LocalCluster())
shuffle = "p2p"

df = timeseries()
result_pd = df.compute().drop_duplicates(subset=["name"], keep="first")
result_dd = df.drop_duplicates(subset=["name"], keep="first", split_out=df.npartitions, shuffle=shuffle).compute()

# Fails for shuffle = "p2p"
# Passes for shuffle = "tasks"
dd.assert_eq(result_pd, result_dd)

Anything else we need to know?:

My sense is that the following is true:

  1. We should return correct results when keep = "first" and keep = "last"
  2. drop_duplicates(..., shuffle="p2p") has proven valuable in dask-expr (using TPCh as a metric)
  3. It's probably impractical to modify "p2p" to preserve initial ordering

Therefore, this may be another situation where the Dask DataFrame API needs to differ from the pandas API a bit. That is, we could use a keep = None default instead of keep = "first", because we should not allow shuffle="p2p" when keep = "first" or keep = "last".

@rjzamora rjzamora added dataframe bug Something is broken labels Dec 14, 2023
@rjzamora
Copy link
Member Author

cc @phofl

@rjzamora
Copy link
Member Author

Related to dask/dask-expr#526

@phofl
Copy link
Collaborator

phofl commented Dec 14, 2023

I am fine with switching to keep=None, I am also fine with warning if the user gives anything other than the default for keep and we would default to p2p shuffle, but I am -1 on choosing task based shuffling without the user specifically requesting it in these scenarios. Users should actively request tasks

@rjzamora
Copy link
Member Author

but I am -1 on choosing task based shuffling without the user specifically requesting it in these scenarios. Users should actively request tasks

Yeah, I suppose can understand that. I do think it is best to use the "stable" shuffle algorithm unless the user explicitly asks for "tasks". In that case, my only suggestion would be to warn the user that the results are likely to be "wrong" (as you already mentioned).

@fjetter
Copy link
Member

fjetter commented Dec 18, 2023

@hendrikmakait with similar indexing foo as for arrays we should be able to make P2P stable, shouldn't we?

@hendrikmakait
Copy link
Member

Sure, guaranteed ordering is definitely possible. I'm not sure if indexing foo is sufficient or if we also need to make changes to guarantee ordering within shards but we can look into this.

@rjzamora
Copy link
Member Author

or if we also need to make changes to guarantee ordering within shards but we can look into this.

It seems like the groupby-based sharing should should preserve ordering on its own, no?

If there is motivation to avoid any modifications to the general "p2p" logic, what about the naive approach of (optionally) adding a "__source_partition" column before the shuffle? If the shards themselves are ordered, then a uint16 column is probably all you need to sort the output partitions in most cases.

@hendrikmakait
Copy link
Member

It seems like the groupby-based sharing should should preserve ordering on its own, no?

How do you mean?

@rjzamora
Copy link
Member Author

How do you mean?

Sorry, I'm probably "thinking out loud" a bit too much. I was mis-remembering that the initial "sharding" in shuffle_transfer/split_by_worker was groupby based.

@fjetter
Copy link
Member

fjetter commented Dec 19, 2023

For the drop_duplicates I believe there is a simple way to fix this, see #10722 (I may need to be a little more careful about when that sort is appropriate but I think it's not harmful)

I opened dask/distributed#8421 to discuss the possibility to make P2P stable but I don't consider this high prio right now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken dataframe
Projects
None yet
4 participants