Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Valid concurrent merge returns Invalid comparison operation #2462

Open
echai58 opened this issue Apr 29, 2024 · 5 comments · May be fixed by #2463
Open

Valid concurrent merge returns Invalid comparison operation #2462

echai58 opened this issue Apr 29, 2024 · 5 comments · May be fixed by #2463
Labels
bug Something isn't working

Comments

@echai58
Copy link

echai58 commented Apr 29, 2024

Environment

Delta-rs version: 0.17.2 (also tried against latest main, including #2396)

Binding: python


Bug

What happened:
I was testing performing concurrent merges to different partitions. When partitioned by a string, concurrent merges to different partitions worked as expected, with both writes going through and no error raised. However, when partitioned by a date32, it raised the following exception:

CommitFailedError: Failed to commit transaction: Error evaluating predicate: Failed to convert into Arrow schema: Invalid argument error: Invalid comparison operation: Date32 <= Int64

When I change it to a sequential write, it does not raise this exception, so it is probably a bug somewhere in the logic that determines if concurrent writes conflict.

What you expected to happen:
Concurrent writes to different partitions should work even if partitioned on a non-string column.

How to reproduce it:

import datetime
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa

path = "test-concurrent-merge"

df = pd.DataFrame.from_dict(
    {
        "p": [datetime.date(2020, 1, 1)],
        "k": [1],
        "v": [1],
    }
)

schema = pa.schema(
    [
        ("p", pa.date32()),
        ("k", pa.int32()),
        ("v", pa.int32()),
    ]
)
table = pa.Table.from_pandas(df, schema)

write_deltalake(
    path,
    table,
    partition_by="p",
)


def write_table(table: DeltaTable, data):
    table.merge(
        data,
        predicate="s.p = t.p and s.k = t.k",
        source_alias="s",
        target_alias="t",
    ).when_matched_update_all().when_not_matched_insert_all().execute()

# by getting both delta tables first, it simulates concurrent actions
table_1 = DeltaTable(path)
table_2 = DeltaTable(path)

# these data touch different partitions
data_1 = pd.DataFrame.from_dict(
    {
        "p": [datetime.date(2020, 1, 1)],
        "k": [3],
        "v": [-3],
    }
)

data_2 = pd.DataFrame.from_dict(
    {
        "p": [datetime.date(2021, 1, 1)],
        "k": [4],
        "v": [-4],
    }
)

write_table(table_1, pa.table(data_1, schema=schema))
write_table(table_2, pa.table(data_2, schema=schema))

More details:
I also tested with partitioning on ints, and I got the same exception with int32, but it worked with int64.

@echai58 echai58 added the bug Something isn't working label Apr 29, 2024
@ion-elgreco ion-elgreco linked a pull request Apr 29, 2024 that will close this issue
@ion-elgreco
Copy link
Collaborator

@echai58 can you try this PR please: #2463

@echai58
Copy link
Author

echai58 commented Apr 29, 2024

@ion-elgreco Running against that branch works for the date partition!

Under More details I also mentioned that it didn't work for int32 partitions either, is there another simple fix for that?

@ion-elgreco
Copy link
Collaborator

@ion-elgreco Running against that branch works for the date partition!

Under More details I also mentioned that it didn't work for int32 partitions either, is there another simple fix for that?

What's the error for int32?

@echai58
Copy link
Author

echai58 commented Apr 29, 2024

@ion-elgreco Running against that branch works for the date partition!
Under More details I also mentioned that it didn't work for int32 partitions either, is there another simple fix for that?

What's the error for int32?

same thing:

CommitFailedError: Failed to commit transaction: Error evaluating predicate: Failed to convert into Arrow schema: Invalid argument error: Invalid comparison operation: Int32 <= Int64

@ion-elgreco
Copy link
Collaborator

@echai58 I'll look into it later this week. We likely rather want to compare logical types and not physical

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants