Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Validation Reporting for PySpark DataFrames in Pandera #1540

Open
zaheerabbas-prodigal opened this issue Mar 26, 2024 · 10 comments
Open
Labels
enhancement New feature or request

Comments

@zaheerabbas-prodigal
Copy link

Is your feature request related to a problem? Please describe.
Hello, I am new to Pyspark and data engineering in general. I am looking to validate a Pyspark Dataframe given a schema. Came across pandera, which suits my needs the best.

Currently, as I understand, because of the distributed nature of spark. All pandera invalidation errors are compiled and are generated in error report of pandera. But these error reports do not seem to show which records are invalid.

I am looking for a way to run validation of pyspark dataframe through pandera and get indices of invalid rows so that I can post process or dump into a different corrupt records database if that makes sense or atleast drop invalid rows.

There is a feature of Drop Invalid Rows - but only supports pandas for now if my understanding is correct. When I add a drop_invalid_rows it throws me BackendNotFound exception. I also came across this stackoverflow response but this also is only supported for pandas I believe.

Describe the solution you'd like
Get the indices of invalid rows after running Pandera.validate() function

Describe alternatives you've considered
Drop invalid rows in a pyspark dataframe that do not match schema defined. This is supported for pandas but not for pyspark dataframe as per my experimentation.

Additional context
NA

@zaheerabbas-prodigal zaheerabbas-prodigal added the enhancement New feature or request label Mar 26, 2024
@amulgund-kr
Copy link

We are also interested in this enhancement

@pyarlagadda-kr
Copy link

We are waiting on this enhancement for PYSPark data frame to filter invalid records after schema validation

@Niveth7922
Copy link

I am also interested in this enhancement

@acestes-kr
Copy link

I am interested as well!!!!

@cosmicBboy
Copy link
Collaborator

cosmicBboy commented Apr 13, 2024

This seems like a useful feature! I'd support this effort. @NeerajMalhotra-QB @jaskaransinghsidana @filipeo2-mck any thoughts on this? It would incur significant compute cost, since right now the pyspark checks queries are limited to the first invalid value: https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/builtin_checks.py

A couple of thoughts:

  1. Does this justify additional value in the PANDERA_VALIDATION_DEPTH configuration, like DATA_ALL, or a new configuration setting like PANDERA_FULL_TABLE_SCAN=True.
  2. We may want to add more comprehensive logging to let users know how validation is being performed.
  3. For very large spark dataframes would a sample or first n failure cases make sense, or do folks want to see all failure cases?

@jaskaransinghsidana
Copy link
Contributor

This seems like a useful feature! I'd support this effort. @NeerajMalhotra-QB @jaskaransinghsidana @filipeo2-mck any thoughts on this? It would incur significant compute cost, since right now the pyspark checks queries are limited to the first invalid value: https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/builtin_checks.py

A couple of thoughts:

  1. Does this justify additional value in the PANDERA_VALIDATION_DEPTH configuration, like DATA_ALL, or a new configuration setting like PANDERA_FULL_TABLE_SCAN=True.
  2. We may want to add more comprehensive logging to let users know how validation is being performed.
  3. For very large spark dataframes would a sample or first n failure cases make sense, or do folks want to see all failure cases?
  1. Yeah I have also seen some folks checking for this feature too, it make sense to actually implement this functionality, but given it would be expensive operation it could add some documentation which clearly list out that this operations would have performance implications.
  2. I am not sure exactly what do you want to capture for this one, but comprehensive logging is always developer friendly..
  3. This is something quite debatable, we debated on this topic quite a bit internally too not related to pandera but for any data quality in general too, but the answer is varied per team/product maturity/cost requirements quite honestly with no consensus.. Some team wants 0 bad records no matter the cost+time implemplications but others have some % threshold that are willing to accept. I would personally rather have configurable with where I can accept top N if I want to do a sample but also have something value like "-1" which would mean full, let user make this decision which is right for them.

@filipeo2-mck
Copy link
Contributor

  1. Yep, I agree it would be useful. Currently we just count the DF to identify failing checks (1 spark action per check). If a new option to filter out invalid rows is set, more spark actions would be triggered per check. It would be a more expensive operation but that can be diminished by caching the PySpark DF before applying the validations (through export PANDERA_CACHE_DATAFRAME=True).

@cosmicBboy
Copy link
Collaborator

Okay, so high level steps for this issue:

  1. Introduce PANDERA_FULL_TABLE_VALIDATION configuration. By default it should be None and should be set depending on the validation backend. It should be True for the pandas check backend but False for the pyspark backend.
  2. Modify all of the pyspark builtin checks to have two execution modes:
    • PANDERA_FULL_TABLE_VALIDATION=False is the current behavior
    • PANDERA_FULL_TABLE_VALIDATION=True should return a boolean column indicating which element in the column passed the check
  3. Make any additional changes to the pyspark backend to support a boolean column as the output of a check (we can take inspiration from the polars check backend on how to do this).
  4. Add support for the drop_invalid_rows option
  5. Add info logging at validation time to let the user know if full table validation is happening or not
  6. Add documentation discussing the performance implications of turning on full table validation.

To avoid further complexity we shouldn't support randomly sampling data values, we either do full table validation or single value validation (the current pyspark behavior)

@cosmicBboy
Copy link
Collaborator

Happy to help review and take a PR over the finish line if someone wants to take this task on @zaheerabbas-prodigal

@nk4456542
Copy link

Thanks for laying out the high-level details needed for this feature @cosmicBboy, will be happy to take this up 😄.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

9 participants