Skip to content

Latest commit

 

History

History
211 lines (181 loc) · 9.23 KB

Kudu.md

File metadata and controls

211 lines (181 loc) · 9.23 KB

Kudu

Kudu sink connector

Support Kudu Version

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Data Type Mapping

SeaTunnel Data Type Kudu Data Type
BOOLEAN BOOL
INT INT8
INT16
INT32
BIGINT INT64
DECIMAL DECIMAL
FLOAT FLOAT
DOUBLE DOUBLE
STRING STRING
TIMESTAMP UNIXTIME_MICROS
BYTES BINARY

Sink Options

Name Type Required Default Description
kudu_masters String Yes - Kudu master address. Separated by ',',such as '192.168.88.110:7051'.
table_name String Yes - The name of kudu table.
client_worker_count Int No 2 * Runtime.getRuntime().availableProcessors() Kudu worker count. Default value is twice the current number of cpu cores.
client_default_operation_timeout_ms Long No 30000 Kudu normal operation time out.
client_default_admin_operation_timeout_ms Long No 30000 Kudu admin operation time out.
enable_kerberos Bool No false Kerberos principal enable.
kerberos_principal String No - Kerberos principal. Note that all zeta nodes require have this file.
kerberos_keytab String No - Kerberos keytab. Note that all zeta nodes require have this file.
kerberos_krb5conf String No - Kerberos krb5 conf. Note that all zeta nodes require have this file.
save_mode String No - Storage mode, support overwrite and append.
session_flush_mode String No AUTO_FLUSH_SYNC Kudu flush mode. Default AUTO_FLUSH_SYNC.
batch_size Int No 1024 The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100
buffer_flush_interval Int No 10000 The flush interval mills, over this time, asynchronous threads will flush data.
ignore_not_found Bool No false If true, ignore all not found rows.
ignore_not_duplicate Bool No false If true, ignore all dulicate rows.
common-options No - Source plugin common parameters, please refer to Source Common Options for details.

Task Example

Simple:

The following example refers to a FakeSource named "kudu" cdc write kudu table "kudu_sink_table"

env {
  parallelism = 1
  job.mode = "BATCH"
}
    source {
      FakeSource {
       result_table_name = "kudu"
        schema = {
          fields {
                    id = int
                    val_bool = boolean
                    val_int8 = tinyint
                    val_int16 = smallint
                    val_int32 = int
                    val_int64 = bigint
                    val_float = float
                    val_double = double
                    val_decimal = "decimal(16, 1)"
                    val_string = string
                    val_unixtime_micros = timestamp
          }
        }
        rows = [
          {
            kind = INSERT
            fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
          },
          {
            kind = INSERT
            fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
          },
          {
            kind = INSERT
            fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
          },
          {
            kind = UPDATE_BEFORE
            fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
          },
          {
            kind = UPDATE_AFTER
           fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
          },
          {
            kind = DELETE
            fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
          }
        ]
      }
    }

sink {
   kudu{
    source_table_name = "kudu"
    kudu_masters = "kudu-master-cdc:7051"
    table_name = "kudu_sink_table"
    enable_kerberos = true
    kerberos_principal = "xx@xx.COM"
    kerberos_keytab = "xx.keytab"
 }
}

Multiple Table

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "kudu_sink_1"
         fields {
                id = int
                val_bool = boolean
                val_int8 = tinyint
                val_int16 = smallint
                val_int32 = int
                val_int64 = bigint
                val_float = float
                val_double = double
                val_decimal = "decimal(16, 1)"
                val_string = string
                val_unixtime_micros = timestamp
      }
        }
            rows = [
              {
                kind = INSERT
                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
              }
              ]
       },
       {
       schema = {
         table = "kudu_sink_2"
              fields {
                        id = int
                        val_bool = boolean
                        val_int8 = tinyint
                        val_int16 = smallint
                        val_int32 = int
                        val_int64 = bigint
                        val_float = float
                        val_double = double
                        val_decimal = "decimal(16, 1)"
                        val_string = string
                        val_unixtime_micros = timestamp
              }
       }
           rows = [
             {
               kind = INSERT
               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
             }
             ]
      }
    ]
  }
}


sink {
   kudu{
    kudu_masters = "kudu-master-multiple:7051"
 }
}

Changelog

2.2.0-beta 2022-09-26

  • Add Kudu Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Kudu Sink Connector Support to upsert row (2881)

Next Version

  • Change plugin name from KuduSink to Kudu 3432