Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When the DDL statement is different from the actual schema in the database, ArrayIndexOutOfBoundsException will be reported #118

Open
pyscala opened this issue Nov 23, 2021 · 0 comments

Comments

@pyscala
Copy link
Contributor

pyscala commented Nov 23, 2021

DDL

CREATE TABLE if not exists table_a (
       `user_id` BIGINT NULL COMMENT '',
       `id` BIGINT NULL COMMENT '',
       `position_id` BIGINT NULL COMMENT '',
       `status` STRING NULL COMMENT '',
       `transaction_id` BIGINT NULL COMMENT '',
    PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
    ) WITH(
          'connector'='kafka',
          'topic'='xxxx',
          'properties.bootstrap.servers'='xxx',
          'properties.group.id'='xxx',
          'properties.auto.offset.reset'='earliest',
          'scan.startup.mode'='earliest-offset',
          'format'='debezium-avro-confluent',
          'debezium-avro-confluent.schema-registry.url'='xxxx'
          );
CREATE TABLE if not exists table_b (
     `user_id` BIGINT NULL COMMENT '',
     `id` BIGINT NULL COMMENT '',
     `position_id` BIGINT NULL COMMENT '',
     `status` STRING NULL COMMENT '',
     `transaction_id` BIGINT NULL COMMENT '',
    ) WITH (
          'connector' = 'tidb',
          'tidb.database.url' = 'jdbc:mysql://xxxx',
          'tidb.username' = 'xxxx',
          'tidb.password' = 'xxxxx',
          'tidb.database.name' = 'xxxxx',
          'tidb.maximum.pool.size' = '1',
          'tidb.minimum.idle.size' = '1',
          'tidb.table.name' = 'withdraws',
          'tidb.write_mode' = 'upsert',
          'sink.buffer-flush.max-rows' = '0'
          );
insert into table_b select * from table_a;

The actual schema in tidb has one more auto-increment column than table_b, and the following error is reported when the task is started.

java.lang.ArrayIndexOutOfBoundsException: -1
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_291]
	at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_291]
	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_291]
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_291]
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-tidb-connector-1.13-0.0.4.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
2021-11-19 07:55:36,996 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 25e6800fc67392651c32db54b2fcc483 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant