Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed index in pandas on pyspark does not work as expected #2204

Open
nitinmnsn opened this issue Oct 24, 2021 · 3 comments
Open

Distributed index in pandas on pyspark does not work as expected #2204

nitinmnsn opened this issue Oct 24, 2021 · 3 comments

Comments

@nitinmnsn
Copy link

This is done on pandas on pyspark but the same is true for koalas as well (at least for now when I tested last)
There are 3 different kinds of default indexes in pandas on pyspark. I am not able to replicate their said behavior:

Setting up to test:

import pyspark.pandas as ps
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
pd.DataFrame({'id':np.arange(20000000),'b':np.random.choice(['a','b','c','d'],size=(20000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None) #so many rows because the dataset needs to be greater than 128 Mbs otherwise it gets collected in just 1 partition

  1. Sequence type
  • The Data would be collected on the same node (dataframe should have just 1 partition?)
  • The default index is [0,1,2,3,...] (monotonically increasing by 1 and in order)

tests:

ps.set_option('compute.default_index_type','sequence')
dfsp = ps.read_csv('df_s.csv')
dfsp.head()
output:
	id	b
0	0	a
1	1	c
2	2	c
3	3	b
4	4	d
​#Expected
dfsp.to_spark().rdd.getNumPartitions()
output:
8
#Unexpected

Question: Why is not the number of partitions 1 since for when the default index is set to 'sequence' all the data must be collected on a single node.

  1. Distributed-sequence
  • It computes and generates the index in a distributed manner but it needs another extra Spark Job to generate the global sequence internally. It also does not guarantee the natural order of the results. In general, it becomes a continuously increasing number.

tests:

ps.set_option('compute.default_index_type','distributed-sequence')
dfsp = ps.read_csv('df_s.csv')
dfsp.head()
output:
	id	b
0	0	a
1	1	c
2	2	c
3	3	b
4	4	d
​#Expected
dfsp.to_spark().rdd.getNumPartitions()
output:
8
#Unexpected

Questions: The dataframe being distributed to all 8 cores is the expected behaviour but, the indexes should not be ordered which they are. It seems this behaviour is also like sequence type default index only.

  1. Distributed
  • “distributed” index has almost no performance penalty and always creates monotonically increasing numbers. If the index is just needed as unique numbers for each row, or the order of rows, this index type would be the best choice. However, the numbers have an indeterministic gap

tests:

ps.set_option('compute.default_index_type','distributed')
dfsp = ps.read_csv('df_s.csv')
print(dfsp.to_spark().rdd.getNumPartitions())
output:
8
dfsp.head()
output:
	id	b
0	0	c
1	1	c
2	2	b
3	3	c
4	4	c

Questions: This is also sequence type behaviour only. The index generated is an ordered sequence from 1 to wherever. It should be monotonically increasing numbers with an indeterministic gap.

Can somebody please help me clarify what I am not understanding correctly and what is the exact expected behaviour for all three types of the default index?

@HyukjinKwon
Copy link
Member

Yeah, you described all correctly. The order isn't guaranteed in general. Can you try to turn on compute.ordered_head configuration?

@nitinmnsn
Copy link
Author

nitinmnsn commented Oct 25, 2021

I did that now with ps.set_option('compute.ordered_head', True). The results are exactly the same.

Order is not the only source of my confusion though. I do not understand a lot in the example I have described above

  1. The file that I have generated would occupy 200 MB on disk. Then why is it getting read in 8 partitions? I think it should use 2 partitions for distributed and distributed-sequence default_index_type and 1 partition for sequence default_index_type. I have checked my maxPartitionBytes and it is set to 128 MBs. spark.conf.get('spark.sql.files.maxPartitionBytes') outputs '134217728b'.
  2. In the case of distributed-sequence Why is the order of indexes maintained?
  3. In the case of distributed why are not there random, monotonically increasing indexes. Indexes are the same 1,2,3,4.. as they are in the other two default_index_type configurations

Many thanks for the time and effort you are putting in to help me. :)

@nitinmnsn
Copy link
Author

nitinmnsn commented Oct 26, 2021

I do understand the number of partitions issue. It was a lapse in my understanding.

But, I do think that the default indexes are not generated correctly. Are there any updates on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants