Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question][chunjun-oracle] oracle的clob类型,使用transformer报错 #1893

Open
4 tasks done
J1aHe opened this issue Apr 12, 2024 · 0 comments
Open
4 tasks done
Labels
question Further information is requested

Comments

@J1aHe
Copy link

J1aHe commented Apr 12, 2024

Search before asking

  • I had searched in the issues and found no similar question.

  • I had googled my question but i didn't get any help.

  • I had read the documentation: ChunJun doc but it didn't help me.

Description

这是报错

 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. com.dtstack.chunjun.connector.oracle.converter.ClobType cannot be cast to org.apache.flink.table.types.logical.VarCharType
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:640)
	at com.dtstack.chunjun.Main.syncStreamToTable(Main.java:288)
	at com.dtstack.chunjun.Main.exeSyncJob(Main.java:246)
	at com.dtstack.chunjun.Main.main(Main.java:137)
	at com.dtstack.chunjun.local.test.LocalTest.main(LocalTest.java:136)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.connector.oracle.converter.ClobType cannot be cast to org.apache.flink.table.types.logical.VarCharType
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.newRelDataType$1(FlinkTypeFactory.scala:76)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.createFieldTypeFromLogicalType(FlinkTypeFactory.scala:167)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.$anonfun$buildStructType$1(FlinkTypeFactory.scala:254)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.buildStructType(FlinkTypeFactory.scala:252)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.buildRelNodeRowType(FlinkTypeFactory.scala:224)
	at org.apache.flink.table.planner.sources.TableSourceUtil$.getSourceRowType(TableSourceUtil.scala:191)
	at org.apache.flink.table.planner.sources.TableSourceUtil.getSourceRowType(TableSourceUtil.scala)
	at org.apache.flink.table.planner.catalog.CatalogSchemaTable.getRowType(CatalogSchemaTable.java:171)
	at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
	at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
	at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
	at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
	at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3205)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3187)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3461)
	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
	... 8 more

这是json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "allowCreateSlot": false,
            "column": [
              {
                "index": 10,
                "name": "SHIPPING_ADDRESS",
                "type": "CLOB"
              },
              {
                "index": 11,
                "name": "BILLING_ADDRESS",
                "type": "CLOB"
              }
            ],
            "connection": [
              {
                "jdbcUrl": [
                  ""
                ],
                "table": [
                  "ORDERS1"
                ]
              }
            ],
            "password": "",
            "username": "",
            "where": ""
          },
          "table": {
            "tableName": "sourceTable"
          }
        },
        "transformer": {
          "transformSql": "select SHIPPING_ADDRESS,BILLING_ADDRESS from sourceTable "
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "column": [
              {
                "index": 10,
                "name": "SHIPPING_ADDRESS",
                "type": "longtext"
              },
              {
                "index": 11,
                "name": "BILLING_ADDRESS",
                "type": "longtext"
              }
            ],
            "connection": [
              {
                "jdbcUrl": "",
                "table": [
                  "ORDERS1"
                ]
              }
            ],
            "password": "",
            "preSql": [
              "truncate table ORDERS1"
            ],
            "username": ""
          },
          "table": {
            "tableName": "sinkTable"
          }
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 0
      },
      "speed": {
        "bytes": 10485760,
        "writerChannel": 2
      }
    }
  }
}

我本地调试后发现好像是flink的报错,但是具体的还是不太清楚,想问下在使用transformer的情况下clob类型怎么进行同步呢

Code of Conduct

@J1aHe J1aHe added the question Further information is requested label Apr 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant