Skip to content

pauldeschacht/SparkCustomWindowOperator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CustomSparkWindowOperator

This is an example how to implement a custom SQL operator over a Spark SQL window, such as rank, ntile.

This implementation detects changes in a number of columns with respect to the previous row, within a window group. If a change in any of the columns is detected, the aggregate column hasChanged will be set to true, otherwise it will be set to false.

The custom SQL operator changed takes a number of column names over which a change needs to be detected. The result will be stored in the column hasChanged.

 val rowsWithChanged = df.withColumn("hasChanged", changed("status", "title").over(Window.partitionBy("id").orderBy("date")))
 val changedRows = rowsWithChanged.filter("hasChanged == true")

Details

The implementation of the example is inspired by the rank operator. The important difference is that the rank function uses the orderBy expressions for the aggregation, which are already resolved. The initial calls (plan analysis, code generation phase) to ChangedOverPreviousRow contains non resolved children (the datatype is not yet known), therefore the child expressions are checked if resolved or not.

The case class ChangesOverPreviousRow implements the DeclarativeAggregate, which implements the AggregrateFunction contract.

About

Example of custom Spark SQL Window operator

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages