Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupby api .agg behavior changes depending upon the way the dataframe is created #2208

Closed
nitinmnsn opened this issue Oct 28, 2021 · 1 comment

Comments

@nitinmnsn
Copy link

pyspark pandas groupby aggregate function API depends upon whether the dataframe is pyspark.sql.dataframe.DataFrame or pyspark.pandas.frame.DataFrame. Is this intended behaviour? Also, How do I run groupby .agg if the dataframe is pyspark.pandas.frame.DataFrame? Seems like registering pandas_udf is necessary to run them.

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark import pandas as ps
import pandas as pd
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
                   'B': [1, 2, 3],
                   'C': [4, 6, 5]}, columns=['A', 'B', 'C'])

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
    return (x**2).mean()

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
    return x.mean()
df.groupby('A').agg(agg_a('B'), agg_b('C')).show()

Output: ValueError: aggs must be a dict mapping from column name to aggregate functions (string or list of strings).

However, if i create the dataframe without using pandas from pyspark the exact same code works without any errors

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark import pandas as ps
import pandas as pd
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(pd.DataFrame(
{
'A': 'a a b'.split(),
'B': [10, 20, 30],
'C': [4, 6, 5]
},
columns=['A', 'B', 'C']
))
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
    return (x**2).mean()

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
    return x.mean()
df.groupby('A').agg(agg_a('B'), agg_b('C')).show()
Output:
+---+--------+--------+
|  A|agg_a(B)|agg_b(C)|
+---+--------+--------+
|  a|   250.0|     5.0|
|  b|   900.0|     5.0|
@itholic
Copy link
Contributor

itholic commented Dec 9, 2021

Let me close since it's duplicated with #2201.

And this is technically a bug in the pandas API on Spark (pyspark.pandas), not in Koalas, so it should be filed in Apache Spark JIRA.

Of course, Koalas and pandas on Spark have almost the same behavior, but they should be treated as different projects as pandas on Spark is much more actively updated now.

@itholic itholic closed this as completed Dec 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants