Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Flink-tidb-connector Flink-tidb-connector batch flush can add timeout determination #245

Open
Xuxiaotuan opened this issue Sep 19, 2022 · 0 comments
Labels
type/enhancement New feature or request

Comments

@Xuxiaotuan
Copy link

Enhancement

When I start use flink-tidb-connector-1.14 to sink data to TiDB, refer to README_unified_batch_streaming.md
image

But  insert data Too little,only three rows and tikv.sink.buffer-size default 1000,So can't trigger flush rows.

Code block: TiDBWriteOperator

@Override  
public void processElement(StreamRecord<Row> element) throws Exception {  
  Row row = element.getValue();  
  if (buffer.isFull()) {  // only judge rows size
    flushRows();  
  }  
  boolean added = buffer.add(row);  
  if (!added && !sinkOptions.isDeduplicate()) {  
    throw new IllegalStateException(  
        "Duplicate index in one batch, please enable deduplicate, row = " + row);  
  }  
}

The issue:
Set default commit time,for example: tikv.sink.max.wait.ms5000

  1. When checking the number of rows each time, it is judged whether the current time has expired. If the number of rows has not been reached, but the time has been reached, flush rows
  2. Separately judge whether to process overtime, set up a single consumption pipeline, regularly check whether the time is overtime, and flush it when the time is reached.
@Xuxiaotuan Xuxiaotuan added the type/enhancement New feature or request label Sep 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant