Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion filter on partition column doesn't work. (when the phsical schema ordering is different to logical one) #2494

Open
Veiasai opened this issue May 9, 2024 · 3 comments
Assignees
Labels
binding/rust Issues for the Rust crate bug Something isn't working

Comments

@Veiasai
Copy link

Veiasai commented May 9, 2024

Environment

Linux, Rust
Delta-rs version:
0.17.3

Binding:

Environment:

  • Cloud provider:
  • OS:
  • Other:

Bug

What happened:
The filter expr didn't return expected rows. My table is relatively big so I tried to construct a minimal test to reproduce it, see below code.
Besides, from what I see in the log, my guess is:

  1. delta scan is good, it successfully prune irrelated files.
  2. https://github.com/delta-io/delta-rs/pull/1071/files#diff-f3a4847c9506848f6f5bf021b5f10fb24602373580e58739bd2a2a24f9878e77R438 we use InExact filter push down, so datafusion apply the same filter again, but however, the physical plan gets wrong column index.
  3. I am not an expert on datafusion or delta-rs.. so I stop here... thank you in advance for any help...

What you expected to happen:

How to reproduce it:

I wrote a unit test to check it, but it seems like I don't have permission to push it?

    #[tokio::test]
    async fn delta_scan_mixed_partition_order_and_filter() {
        let schema = Arc::new(ArrowSchema::new(vec![
            Field::new("modified", DataType::Utf8, true),
            Field::new("id", DataType::Utf8, true),
            Field::new("value", DataType::Int32, true),
        ]));

        let table = crate::DeltaOps::new_in_memory()
            .create()
            .with_columns(get_delta_schema().fields().clone())
            .with_partition_columns(["modified", "id"])
            .await
            .unwrap();
        assert_eq!(table.version(), 0);

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(arrow::array::StringArray::from(vec![
                    "2021-02-01",
                ])),
                Arc::new(arrow::array::StringArray::from(vec!["A"])),
                Arc::new(arrow::array::Int32Array::from(vec![1])),
            ],
        )
        .unwrap();
        // write some data
        let table = crate::DeltaOps(table)
            .write(vec![batch.clone()])
            .with_save_mode(crate::protocol::SaveMode::Append)
            .await
            .unwrap();

        let provider = Arc::new(table);
        let ctx = SessionContext::new();
        let df = ctx.read_table(provider).unwrap();

        let actual = df.clone().collect().await.unwrap();
        let expected = vec![
            "+-------+------------+----+",
            "| value | modified   | id |",
            "+-------+------------+----+",
            "| 1     | 2021-02-01 | A  |",
            "+-------+------------+----+",
        ];
        assert_batches_sorted_eq!(&expected, &actual);

        let actual = df.clone().filter(col("value").eq(lit(1))).unwrap().collect().await.unwrap();
        assert_batches_sorted_eq!(&expected, &actual);

        let actual = df.clone().filter(col("id").eq(lit("A"))).unwrap().collect().await.unwrap();
        assert_batches_sorted_eq!(&expected, &actual);
    }

More details:

expected:

[
    "+-------+------------+----+",
    "| value | modified   | id |",
    "+-------+------------+----+",
    "| 1     | 2021-02-01 | A  |",
    "+-------+------------+----+",
]
actual:

[
    "++",
    "++",
]


  left: ["+-------+------------+----+", "| value | modified   | id |", "+-------+------------+----+", "| 1     | 2021-02-01 | A  |", "+-------+------------+----+"]
 right: ["++", "++"]
@Veiasai Veiasai added the bug Something isn't working label May 9, 2024
@Veiasai
Copy link
Author

Veiasai commented May 9, 2024

One more suggestion:
Actually, we are able to return dynamic filter push down flag?

pub enum TableProviderFilterPushDown {
    /// The expression cannot be used by the provider.
    Unsupported,
    /// The expression can be used to reduce the data retrieved,
    /// but the provider cannot guarantee it will omit all tuples that
    /// may be filtered. In this case, DataFusion will apply an additional
    /// `Filter` operation after the scan to ensure all rows are filtered correctly.
    Inexact,
    /// The provider **guarantees** that it will omit **all** tuples that are
    /// filtered by the filter expression. This is the fastest option, if available
    /// as DataFusion will not apply additional filtering.
    Exact,
}

when the expr only includes partition columns, we should return Exact.

@rtyler rtyler added the binding/rust Issues for the Rust crate label May 9, 2024
@rtyler
Copy link
Member

rtyler commented May 9, 2024

Thanks for taking the time to write a test @Veiasai ! I'll take a look at this shortly

@rtyler rtyler self-assigned this May 9, 2024
@Veiasai
Copy link
Author

Veiasai commented May 15, 2024

hey, any updates?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants