Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Flink-table-sink] Why add unique key to the pk set when generating sink table schema #181

Open
itinycheng opened this issue Apr 19, 2022 · 2 comments
Labels
good first issue Good for newcomers type/enhancement New feature or request

Comments

@itinycheng
Copy link
Contributor

The flink-tidb-connector doesn't work as expected when I use upsert mode to sink data to TiDB, the reason for this is as in title, flink version: 1.13.x. Is there any reason for making unique key as part of the primary?

Code block:

private String[] getKeyFields(Context context, ReadableConfig config, String databaseName,
      String tableName) {
    // check write mode
    TiDBWriteMode writeMode = TiDBWriteMode.fromString(config.get(WRITE_MODE));
    String[] keyFields = null;
    if (writeMode == TiDBWriteMode.UPSERT) {
      try (ClientSession clientSession = ClientSession.create(
          new ClientConfig(context.getCatalogTable().toProperties()))) {
        Set<String> set = ImmutableSet.<String>builder()
            .addAll(clientSession.getUniqueKeyColumns(databaseName, tableName)) // Why add all unique keys to pk set?
            .addAll(clientSession.getPrimaryKeyColumns(databaseName, tableName))
            .build();
        keyFields = set.size() == 0 ? null : set.toArray(new String[0]);
      } catch (Exception e) {
        throw new IllegalStateException(e);
      }
    }
    return keyFields;
  }

The issue:
flink-tidb-connector use official jdbc connector flink-jdbc-connector to complete data wiriting to tidb;
In upsert mode, the records in the buffer of flink-jdbc-connector are deduplicated by the primary key, and execute executeBatch to flush data out is disordered(because type of buffer is HashMap), refer to: TableBufferReducedStatementExecutor.java;
These may cause multiple records with the same primary key in the same batch and write TiDB out of order.

@xuanyu66 xuanyu66 added type/enhancement New feature or request good first issue Good for newcomers labels May 16, 2022
@xuanyu66
Copy link
Collaborator

xuanyu66 commented May 16, 2022

@itinycheng Sorry to miss this issue.
You are right, it's better to use the primary key as keyFields.
And we can use SQL hint if someone needs to custom keyFields.

@itinycheng
Copy link
Contributor Author

OK, got it.

@xuanyu66 xuanyu66 reopened this May 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers type/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants