Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregating column has a wrong datatype. #141

Open
svaningelgem opened this issue Jan 27, 2021 · 1 comment
Open

Aggregating column has a wrong datatype. #141

svaningelgem opened this issue Jan 27, 2021 · 1 comment

Comments

@svaningelgem
Copy link
Contributor

When working in the PR #138, I encountered an issue:

from datetime import date
from pysparkling.sql.session import SparkSession
from pysparkling.sql.functions import collect_set

spark = SparkSession.Builder().getOrCreate()

dataset_usage = [
    ('steven', 'UUID1', date(2019, 7, 22)),
    ('steven', 'UUID2', date(2019, 7, 22)),
    ('steven', 'UUID3', date(2019, 7, 23)),
    ('johan', 'UUID4', date(2019, 7, 22)),
    ('johan', 'UUID5', date(2019, 7, 23))
]
dataset_usage_list = [
    ('steven', date(2019, 7, 22), ['UUID2', 'UUID1']),
    ('steven', date(2019, 7, 23), ['UUID3']),
    ('johan', date(2019, 7, 22), ['UUID4']),
    ('johan', date(2019, 7, 23), ['UUID5'])
]
dataset_usage_schema = 'id: string, datauid: string, access_date: date'
dataset_usage_list_schema = 'id: string, access_date: date, list_of_datauids: array<string>'


df = spark.sparkContext.parallelize(dataset_usage).toDF(dataset_usage_schema)
actual = df.groupBy('id', 'access_date').agg(collect_set('datauid').alias('list_of_datauids'))
expected = spark.createDataFrame(dataset_usage_list, dataset_usage_list_schema)

if actual == expected:
    print("Test OK")
else:
    print("Values differ -->")
    actual.show()
    expected.show()

    print(actual.schema)
    print(expected.schema)
Values differ -->
+------+-----------+------------------+
|    id|access_date|  list_of_datauids|
+------+-----------+------------------+
|steven| 2019-07-22|['UUID2', 'UUID1']|
|steven| 2019-07-23|         ['UUID3']|
| johan| 2019-07-22|         ['UUID4']|
| johan| 2019-07-23|         ['UUID5']|
+------+-----------+------------------+
+------+-----------+------------------+
|    id|access_date|  list_of_datauids|
+------+-----------+------------------+
|steven| 2019-07-22|['UUID2', 'UUID1']|
|steven| 2019-07-23|         ['UUID3']|
| johan| 2019-07-22|         ['UUID4']|
| johan| 2019-07-23|         ['UUID5']|
+------+-----------+------------------+

StructType(List(StructField(id,StringType,true),StructField(access_date,DateType,true),StructField(list_of_datauids,DataType,true)))
StructType(List(StructField(id,StringType,true),StructField(access_date,DateType,true),StructField(list_of_datauids,ArrayType(StringType,true),true)))

As you can see, the official spark version will create an ArrayType of the underlying column. In this case datauid = StringType.

This is very likely because DataType is the default implementation when you pass in a string (parse() will generate a Column from it). But this is not linked at any time to the aggregating method. And as such it just stays a DataType.

@tools4origins
Copy link
Collaborator

Yes, indeed.

Types are not handled properly currently by pysparkling implementation of Spark SQL, as it is much less relying on it to transform data,

All columns (aggregation or not) are using DataType, the parent type.
in Column.data_type: https://github.com/svenkreiss/pysparkling/blob/master/pysparkling/sql/column.py#L684-L687
in aggregration: https://github.com/svenkreiss/pysparkling/blob/master/pysparkling/sql/internals.py#L1045

It is in the todo list, of course, but I focused my latest development on SQL statement parsing as it will make testing easier hence the current code status

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants