Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for multiple partition columns and filters in to_pyarrow_dataset() and OR filters in write_datalake() #1722

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

ldacey
Copy link
Contributor

@ldacey ldacey commented Oct 13, 2023

Description

This provides support for passing lists of DNF tuples to the to_pyarrow_dataset() method. I didn't want to touch the dataset_partitions() Rust code, so this can probably be further simplified and improved. My use case was to be able to create a pyarrow dataset from the DeltaTable and then use polars.scan_pyarrow_dataset().

Related Issue(s)

Documentation

Here are some examples (which are part of the pytests as well):

Lists of lists of tuples in DNF format will create OR expressions:

    multiple_partitions_multiple_columns = [
        [("year", "=", "2020"), ("month", "=", "2"), ("day", "=", "5")],
        [("year", "=", "2021"), ("month", "=", "4"), ("day", "=", "5")],
        [("year", "=", "2021"), ("month", "=", "3"), ("day", "=", "1")],
    ]
    dataset_filtered = dt.to_pyarrow_dataset(
        partitions=multiple_partitions_multiple_columns
    )

A single list of tuples will create an AND expression:

    single_partition_multiple_columns = [("month", "=", "2"), ("day", "=", "5")]
    dataset_filtered = dt.to_pyarrow_dataset(
        partitions=single_partition_multiple_columns
    )

… to_pyarrow_dataset()

- Partitions with multiple columns can be passed as lists of tuples in DNF format
- Multiple partition filters can be passed
- Add tests for various filter/partition scenarios which can be passted to to_pyarrow_dataset()
@github-actions github-actions bot added the binding/python Issues for the Python package label Oct 13, 2023
@wjones127
Copy link
Collaborator

My use case was to be able to create a pyarrow dataset from the DeltaTable and then use polars.scan_pyarrow_dataset().

If you are using polars lazy api, have you considered just writing the filters in Polars instead? They should be pushed down into the PyArrow dataset automatically.

@ldacey
Copy link
Contributor Author

ldacey commented Oct 13, 2023

Yeah, I am doing that currently. I pass the DNF filters between Airflow tasks and convert them into polars filter expressions.

When testing earlier I noticed the partitions argument in to_pyarrow_dataset was not working with multiple partitions or multiple columns so it seemed like a reasonable addition.

My Airflow tasks do not always read data into polars (only for transformation and removing duplicates), so being able to stick with a single solution is nice. Pass the DNF filters between tasks and return a filtered dataset. Then scan that with polars or duckdb or turn it into a table etc.

@ldacey
Copy link
Contributor Author

ldacey commented Oct 14, 2023

Can you check my latest changes? This is my attempt to tackle #1479 and adding support for overwriting multiple partitions (without touching the Rust code). This allows me to update a dataset with AND and OR filter logic by using lists of lists of DNF tuples, or a list of DNF tuples. The test I added shows a few scenarios.

@MrPowers - this is what I was trying to accomplish during our chat on Slack the other day. Basically, I need to be able to overwrite partitions to with deduplicated data.

Many of my sources are partitioned by some category and some temporal field, so for example [("account", "=", "x"), ("date", "=", "2023-10-10")] would overwrite partitions where the account is X and the date is 2023-10-10.

Or some tasks download data based on updated_at timestamps but the data is partitioned by created_date. In this case I might need to overwrite all of the unique created_date partitions in the in-memory data which results in OR filters like [[("created_date", "=", "2023-01-01")], [("created_date", "=", "2023-08-23")], [("created_date", "=", "2023-10-15")]]

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making the logic in reading and writing really complicated. Could you pull out the DNF filter parsing into some separate functions? If we can normalize them to the same general type, then we can pass that down into Rust and have it handle that.

Comment on lines 454 to 460
table._table.create_write_transaction(
filtered_add_actions,
mode,
partition_by or [],
schema,
partition_filter,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we creating a write transaction per filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed like the create_write_transaction() was not able to handle a list of lists of tuples (OR conditions), so I had to treat each condition separately and loop through them.

[id_values0-account_values0-created_date_values0-updated_at_values0-value_values0-partition_by0-partition_filters0] TypeError: argument 'partitions_filters': 'list' object cannot be converted to 'PyTuple' [909, 1]

Looping through each filter results in this:

partition_filters=[
                [("created_date", "=", "2023-08-25")],
                [("created_date", "=", "2023-09-07")],
                [("created_date", "=", "2023-09-21")],
            ],

ic| partition_filter: [('created_date', '=', '2023-08-25')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-08-25/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
                                     size=3715,
                                     partition_values={'created_date': '2023-08-25'},
                                     modification_time=1697321774036,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 1, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921783", '
                                           '"value": 44.5}, "maxValues": {"id": 1, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921783", '
                                           '"value": 44.5}, "nullCount": {"id": 0, "account": 0, '
                                           '"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'

ic| partition_filter: [('created_date', '=', '2023-09-07')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-09-07/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
                                     size=3715,
                                     partition_values={'created_date': '2023-09-07'},
                                     modification_time=1697321774034,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 3, "account": '
                                           '"account_b", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 68.0}, "maxValues": {"id": 3, "account": '
                                           '"account_b", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 68.0}, "nullCount": {"id": 0, "account": 0, '
                                           '"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'

ic| partition_filter: [('created_date', '=', '2023-09-21')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-09-21/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
                                     size=3715,
                                     partition_values={'created_date': '2023-09-21'},
                                     modification_time=1697321774034,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 4, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 11.5}, "maxValues": {"id": 4, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 11.5}, "nullCount": {"id": 0, "account": 0, '
                                           '"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'

If we use a single list of tuples (AND condition) like the example below, there is still only one call to table.update_incremental().

ic| partition_filter: [('created_date', '>', '2023-08-01'), ('created_date', '<', '2023-12-31')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-08-25/account=account_a/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_a',
                                                       'created_date': '2023-08-25'},
                                     modification_time=1697322195775,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 1, "updated_at": '
                                           '"2023-10-14T22:23:15.400092", "value": 0.1}, "maxValues": '
                                           '{"id": 1, "updated_at": "2023-10-14T22:23:15.400092", '
                                           '"value": 0.1}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-09-05/account=account_b/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_b',
                                                       'created_date': '2023-09-05'},
                                     modification_time=1697322195778,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 2, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.2}, "maxValues": '
                                           '{"id": 2, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.2}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-10-02/account=account_b/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_b',
                                                       'created_date': '2023-10-02'},
                                     modification_time=1697322195780,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 5, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.5}, "maxValues": '
                                           '{"id": 5, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.5}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-09-07/account=account_a/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_a',
                                                       'created_date': '2023-09-07'},
                                     modification_time=1697322195780,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 3, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.3}, "maxValues": '
                                           '{"id": 3, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.3}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-09-21/account=account_c/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_c',
                                                       'created_date': '2023-09-21'},
                                     modification_time=1697322195781,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 4, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.4}, "maxValues": '
                                           '{"id": 4, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.4}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}')]
ic| 'update_incremental'

- validate_filters ensures partitions and filters are in DNF format (list of tuples, list of lists of tuples)
and checks for empty lists
- stringify_partition_values ensures values are converted from dates, ints, etc to string for partition columns
- Use pyarrow.parquet filters_to_expression instead of the custom implementation
- Move __stringify_partition_values to _util to be able to test more easily
- Move partition validation to validate_filters function
- Move fragment building to separate method
@ldacey
Copy link
Contributor Author

ldacey commented Oct 17, 2023

This is making the logic in reading and writing really complicated. Could you pull out the DNF filter parsing into some separate functions? If we can normalize them to the same general type, then we can pass that down into Rust and have it handle that.

Agree. I was keeping the logic within the existing functions to not cause too many changes.

  • validate_filters can be used to ensure that the DNF format is correct (list of tuples, or list of list of tuples)
  • Moved the fragment creation to a separate method
  • I believe we can just use pq.filters_to_expression() which is a public method as of a few versions ago. This removes some of the more complex logic.
  • I added some tests for the filters_to_expression(), the validate_filters(), and the stringify_partition_values()

I still need add the validate_filters() to the writer.py file as well. I did not touch that yet.

@ldacey
Copy link
Contributor Author

ldacey commented Oct 17, 2023

The validate_filters() function I added ensures that the partition_filters is a list of list of tuples now, so I made updates to accomodate for that. I was not able to avoid looping through each OR condition.

I think that is all I can do on the Python side for now and I am not familar with Rust. I will take a look at the get_active_partitions() and create_write_transaction() Rust functions to see if I can adapt them to accept a list of list of tuples. Right now it looks like they only accept a list of tuples:

partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>

If that change is made, then further simplification can be done with the Python code.

@ldacey ldacey changed the title feat: add support for multiple partition columns and filters in to_pyarrow_dataset() feat: add support for multiple partition columns and filters in to_pyarrow_dataset() and OR filters in write_datalake() Oct 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants