Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] #23

Open
lookhua opened this issue Nov 20, 2020 · 4 comments
Open

Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] #23

lookhua opened this issue Nov 20, 2020 · 4 comments

Comments

@lookhua
Copy link

lookhua commented Nov 20, 2020

public class TidbAlarmClearUp {

public static void main(String[] args) throws Exception {

    //运行job
    String jobName = TidbAlarmClearUp.class.getSimpleName();

    // set up the streaming execution environment
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

    bsTableEnv.executeSql("CREATE TABLE flinkTest(\n"
            + " id     bigint,\n"
            + " ps_id  int,\n"
            + " ps_key string,\n"
            + " happen_time timestamp\n"
            + ") WITH (\n"
            + "  'connector' = 'tidb',\n"
            + "  'tidb.database.url' = 'jdbc:tidb://tc-fat-tidb.tidb-fat:4000/health',\n"
            + "  'tidb.driver' = 'com.mysql.jdbc.Driver',\n"
            + "  'tidb.database.name' = 'xxxx',\n"
            + "  'tidb.table.name' = 'flink_test',\n"
            + "  'tidb.maximum.pool.size' = '10',\n"
            + "  'tidb.minimum.idle.size' = '10',"
            + "  'tidb.username' = 'xxxx',\n"
            + "  'tidb.password' = 'xxxxx'\n"
            + ")"
    );

    // execute Table
    TableResult tableResult1 = bsTableEnv.executeSql(
            "INSERT INTO flinkTest(id,ps_id,ps_key,happen_time) " +
                     "SELECT id,ps_id,ps_key,happen_time FROM HealthOnlineAlarm ");
}

}

2020-11-20 09:02:47,044 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, HealthOnlineAlarm]], fields=[id, ps_id, ps_key, happen_time]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (e28717441b6b8eb1ba8013dc2e6a5527) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:204) ~[?:1.8.0_265]
at com.zhihu.tibigdata.flink.tidb.TypeUtils.getObjectWithDataType(TypeUtils.java:146) ~[flink-tidb-connector-0.0.1.jar:?]
at com.zhihu.tibigdata.flink.tidb.TiDBRowDataInputFormat.nextRecord(TiDBRowDataInputFormat.java:167) ~[flink-tidb-connector-0.0.1.jar:?]
at com.zhihu.tibigdata.flink.tidb.TiDBRowDataInputFormat.nextRecord(TiDBRowDataInputFormat.java:53) ~[flink-tidb-connector-0.0.1.jar:?]
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

@humengyu2012
Copy link
Collaborator

humengyu2012 commented Nov 20, 2020

It seems that the datatype of column happen_time is varchar in tidb, but you want convert it to timestamp. We will add a config which can be used to specify timestamp format, or you could try (happen_time string ).

@humengyu2012
Copy link
Collaborator

Thanks for your issue, now you can specify timestamp format by config timestamp-format.${columnName}, like timestamp-format.happen_time='yyyy-MM-dd HH:mm:ss.SSS'.

@lookhua
Copy link
Author

lookhua commented Nov 20, 2020

I build this project with flink verion 1.11.2 not 1.11.1 and the datatype of column happen_time is timestamp in tidb

@humengyu2012
Copy link
Collaborator

humengyu2012 commented Nov 21, 2020

can you provide your tidb create table sql and some sample data? I want to test your case in flink 1.11.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants