Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Connector-V2-Paimon] Data changes are lost when sinking into Paimon using batch mode. #6831

Open
2 of 3 tasks
MirrerZu opened this issue May 10, 2024 · 18 comments
Open
2 of 3 tasks
Labels

Comments

@MirrerZu
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When I use MySQL JDBC source and Paimon sink under batch mode, Some data with the same primary key have not been updated correctly. However I'm sure it's a PK table using deduplicate merge.
If I add checkpoint.interval=1000 into job env config, all the data can been updated correctly, but job will throw exciption .
And I didn't find any errors or warnings in HDFS logs.
To verify, I used Flink's batch mode and JDBC external tables to write same data, all the data were updated correctly without any errors.

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  jdbc {
    url = "jdbc:mysql://10.1.xxx.xxx:3306/test"
	driver = "com.mysql.cj.jdbc.Driver"
	connection_check_timeout_sec = 180
	user = "root"
	password = "*****"
	table_path="test.test2"
  }
}

sink {

  Paimon {
    warehouse = "hdfs://10.1.xxx.xxx:9000/paimon"
    database = "mytest"
    table = "test2"
	paimon.table.write-props = {
        bucket = 1
    }
  }
 
}

Running Command

./bin/seatunnel.sh --config ./config/v2.mysql.config -e local

Error Exception

2024-05-10 17:32:01,333 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@6c0e24be
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) [seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_402]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-transforms-v2.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
        ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_402]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_402]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-transforms-v2.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
        ... 16 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
        ... 5 more
Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing.
        at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
        ... 5 more
2024-05-10 17:32:01,339 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 70000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,340 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Interrupted task 60000 - org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@49d2a9d8
2024-05-10 17:32:01,340 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 60000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,340 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - Release classloader for job 841246167579754497 with jars [file:/bigdata/seatunnel/connectors/connector-paimon-2.3.5.jar, file:/bigdata/seatunnel/connectors/connector-jdbc-2.3.5.jar]
2024-05-10 17:32:01,349 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-1
2024-05-10 17:32:01,350 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,350 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-2
2024-05-10 17:32:01,351 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Task TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} complete with state FAILED
2024-05-10 17:32:01,351 INFO  [o.a.s.e.s.CoordinatorService  ] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Received task end from execution TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}, state FAILED
2024-05-10 17:32:01,353 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] turned from state RUNNING to FAILED.
2024-05-10 17:32:01,353 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] state process is stopped
2024-05-10 17:32:01,353 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] end with state FAILED and Exception: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191)
        ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232)
        ... 17 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165)
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)
        ... 5 more
Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing.
        at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182)
        at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201)
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157)
        ... 6 more

Zeta or Flink or Spark Version

Zeta 2.3.5

Java or Scala Version

jdk 1.8.0_402

Screenshots

image
image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@MirrerZu MirrerZu added the bug label May 10, 2024
@dailai
Copy link
Contributor

dailai commented May 10, 2024

If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.

@MirrerZu
Copy link
Author

If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.

thanks for your reply.
actually, I want to use the batch mode to synchronize SAP HANA's data with Paimon, but can't find a CDC tool for HANA.
I'm curious about the difference between flink's batch mode and seaTunnel's batch mode when bundled with Paimon, because flink's batch insert seems to be capable of writing data correctly.

@dailai
Copy link
Contributor

dailai commented May 11, 2024

If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.

thanks for your reply. actually, I want to use the batch mode to synchronize SAP HANA's data with Paimon, but can't find a CDC tool for HANA. I'm curious about the difference between flink's batch mode and seaTunnel's batch mode when bundled with Paimon, because flink's batch insert seems to be capable of writing data correctly.

Jdbc source can not capture cdc event. I think may be your test not all right.

@MirrerZu
Copy link
Author

I tried to explain why I use JDBC source connector and batch mode, but perhaps I wasn't clear.
Well, there's a bug that only occurs in seatunnel's batch mode (CDC works well) :
when I wrote data in batch mode using seatunnel's paimon sink, paimon didn't keep the latest record although the table with PK using deduplicate merge engine.
However, both flink's batch mode and spark handle the same table and data correctly. So I think paimon's deduplicate merge engine should work even if there are only inserts without update events.
It might be helpful to run a simple test to see if this is the case.

@dailai
Copy link
Contributor

dailai commented May 16, 2024

I tried to explain why I use JDBC source connector and batch mode, but perhaps I wasn't clear. Well, there's a bug that only occurs in seatunnel's batch mode (CDC works well) : when I wrote data in batch mode using seatunnel's paimon sink, paimon didn't keep the latest record although the table with PK using deduplicate merge engine. However, both flink's batch mode and spark handle the same table and data correctly. So I think paimon's deduplicate merge engine should work even if there are only inserts without update events. It might be helpful to run a simple test to see if this is the case.

I don't think this has anything to do with the paimon sink. In batch mode, the jdbc source only reads the data at the moment it executes the jdbc query. After the data is read, it is sent downstream. No matter the source is updated or inserted, it will not be synchronized to the downstream.

@MirrerZu
Copy link
Author

let's focus only the insert behavior of sink and paimon.
a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:

id tchar
1 abc
2 abc
3 abc

if insert some data where the primary keys already exist:

id tchar
1 ccc
2 ccc

query this table (default lastest snapshot) should like (the result of using flink/spark to insert):

id tchar
1 ccc
2 ccc
3 abc

when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:

id tchar
1 abc
2 ccc
3 abc

and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data.
that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

@dailai
Copy link
Contributor

dailai commented May 16, 2024

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:

id tchar
1 abc
2 abc
3 abc
if insert some data where the primary keys already exist:

id tchar
1 ccc
2 ccc
query this table (default lastest snapshot) should like (the result of using flink/spark to insert):

id tchar
1 ccc
2 ccc
3 abc
when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:

id tchar
1 abc
2 ccc
3 abc
and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

@MirrerZu
Copy link
Author

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:
id tchar
1 abc
2 abc
3 abc
if insert some data where the primary keys already exist:
id tchar
1 ccc
2 ccc
query this table (default lastest snapshot) should like (the result of using flink/spark to insert):
id tchar
1 ccc
2 ccc
3 abc
when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:
id tchar
1 abc
2 ccc
3 abc
and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

JDBC

@dailai
Copy link
Contributor

dailai commented May 16, 2024

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:
id tchar
1 abc
2 abc
3 abc
if insert some data where the primary keys already exist:
id tchar
1 ccc
2 ccc
query this table (default lastest snapshot) should like (the result of using flink/spark to insert):
id tchar
1 ccc
2 ccc
3 abc
when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:
id tchar
1 abc
2 ccc
3 abc
and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

JDBC

Does your source table have a primary key?

@MirrerZu
Copy link
Author

Yes ,same primary key id, and I would use paimon.table.primary-keys to specify in sink

@dailai
Copy link
Contributor

dailai commented May 16, 2024

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:
id tchar
1 abc
2 abc
3 abc
if insert some data where the primary keys already exist:
id tchar
1 ccc
2 ccc
query this table (default lastest snapshot) should like (the result of using flink/spark to insert):
id tchar
1 ccc
2 ccc
3 abc
when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:
id tchar
1 abc
2 ccc
3 abc
and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

JDBC

Does your source table have a primary key?

If your source table has a primary key, is inserting data with the same primary key that is actully a modification operation?

@MirrerZu
Copy link
Author

yes

@dailai
Copy link
Contributor

dailai commented May 16, 2024

Jdbc source can not capture the update events. I think you can use this test case to vertify. You can test only insert with same key.
https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf

@dailai
Copy link
Contributor

dailai commented May 16, 2024

Jdbc source can not capture the update events. I think you can use this test case to vertify. You can test only insert with same key. https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf

You can remove this.
image
Then test only insert.

@MirrerZu
Copy link
Author

same result as used JDBC source.
first, insert five records into an empty table:

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        tchar = string
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "AAA"]
      },
      {
        kind = INSERT
        fields = [2, "BBB"]
      },
      {
        kind = INSERT
        fields = [3, "CCC"]
      },
      {
        kind = INSERT
        fields = [4, "DDD"]
      },
      {
        kind = INSERT
        fields = [5, "EEE"]
      }
    ]
  }
}

sink {
  Paimon {
    warehouse = "file:///tmp/paimon"
    database = "seatunnel_test"
    table = "test1"
  }
}

next, change field tchar to 'ZZZ' and insert:

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
		tchar = string
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [2, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [3, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [4, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [5, "ZZZ"]
      }
    ]
  }
}

sink {
  Paimon {
    warehouse = "file:///tmp/paimon"
    database = "seatunnel_test"
    table = "test1"
	paimon.table.primary-keys = "pk_id"
	paimon.table.write-props = {
        bucket = 1
    }
  }
}

query results for table test1 (even though all values of the 'tchar' should be 'ZZZ'):
image

@dailai
Copy link
Contributor

dailai commented May 16, 2024

I see what you mean.

@dailai
Copy link
Contributor

dailai commented May 17, 2024

@MirrerZu Thanks for your feedback, please use this pr to make a new jar of paimon connector.

@MirrerZu
Copy link
Author

@MirrerZu Thanks for your feedback, please use this pr to make a new jar of paimon connector.

It works well for me, Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants