Skip to content

DbSink is a sink connector that provides a sink implementation for streaming changes emitted by Debezium

License

Notifications You must be signed in to change notification settings

1149782339/DbSink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DbSink

DbSink is a Kafka Sink Connector that provides a sink implementation for streaming changes emitted by Debezium

Features

1.100% compatible with debezium and able to process data change event, schema change event and transaction event without requiring any additional transform like 'ExtractNewRecordState'.

2.Supports both transaction-based and table-based parallel applying.

3.Supports for a variety of database dialect applying. Currently only Oracle to Postgres and MySQL to Postgres are supported, more database dialects are being developed and will be released in the near future.

4.Supports executing insert, update, and delete based on operation from the debezium event record without requiring additional configuration like 'insert mode'.

5.Support automatic switching to upsert mode to apply insert events when there are duplicate key errors.

6.Supports most data types and able to use the right way to process the data type from the source event based on the definition of the target column.

For example, the time data type of the MySQL can be inserted into either the time data type or the interval data type in the postgres database.

7.Support configuration of target table and column. By default, the connector uses the source table and column name as the target ones.

Connector Configuration Properties

connector.class

To use this connector, specify the name of the connector class in the connector.class configuration property.

"connector.class": "io.dbsink.connector.sink.DbSinkConnector"

JDBC Group

jdbc.username

JDBC connection user.

  • Type: String
  • Default: null
  • Importantce: hight

jdbc.password

JDBC connection password.

  • Type: String
  • Default: null
  • Importantce: hight

jdbc.url

JDBC connection URL. For example: jdbc:postgresql://localhost:5432/migration", jdbc:mysql://localhost/db_name

  • Type: String
  • Default: null
  • Importantce: high

jdbc.driver.class

JDBC driver class. For example: org.postgresql.Driver, com.mysql.cj.jdbc.Driver,com.mysql.jdbc.Driver

  • Type: String
  • Default: null
  • Importantce: low

jdbc.retries.max

The maximum number of retries to call JDBC interface when SQLException hanppes. The value must be a positive integer.

  • Type: int
  • Default: 5
  • Importantce: low

jdbc.backoff.ms

The backoff time in milliseconds between JDBC retries.

  • Type: long
  • Default: 3000
  • Importantce: low

Applier Group

applier.parallel.max

The Maximum number of threads to apply.

  • Type: int
  • Default: cpu cores * 1.4
  • Importantce: high

applier.transaction.enabled

Whether to apply transactionally. Requires debezium configuration properties provide.transaction.metadata to be true and all the incoming events with the same topic name. (use transforms)

  • Type: boolean
  • Default: false
  • Importantce: high

applier.transaction.buffer.size

Specifies how many transction to cache in the buffer.

  • Type: int
  • Default: 50
  • Importantce: high

applier.worker.buffer.size

Specifies how many transctions that an applier worker can cache in its buffer.

  • Type: int
  • Default: 100
  • Importantce: high

table.naming.strategy

Specifies the fully-qualified class name of a TableNamingStrategy implementation that the connector uses to resolve table names from incoming event s. DefaultTableNamingStrategy,LowCaseTableNamingStrategy,UpperCaseTableNamingStrategy are availables.

  • Type: class
  • Default: io.dbsink.connector.sink.naming.DefaultTableNamingStrategy
  • Importantce: high

column.naming.strategy

Specifies the fully-qualified class name of a ColumnNamingStrategy implementation that the connector uses to resolve column names from incoming events. DefaultColumNamingStrategy,LowCaseColumNamingStrategy,UpperCaseColumNamingStrategy are availables.

  • Type: class
  • Default: io.dbsink.connector.sink.naming.DefaultColumNamingStrategy
  • Importantce: high

Build guidance

jdk version: >=11

maven: >=3.0

mvn clean package -Dmaven.test.skip=true

note: The dependencies like jdbc drivers are not to be packaged into the jar, you need to manually put them to in the plugin path of kafka connect.

Data type mapping

MySQL to Postgres

mysql postgres description
float(n) float(n) Float in postgres is a standard type while mysql is not, so this may be errors
float(n,p) float(n) or decimal(n,p) No fully equivalent data type, so may be errors
double(n,p) double precision or decimal(n,p) No fully equivalent data type, so may be errors
double double precision Double in postgres is a standard type while mysql is not, so may be errors
decimal(n,p) decimal(n,p)
bigint bigint
mediumint int
int int
smallint smallint
tinyint smallint
timestamp(n) timestamp(n) with time zone Ensure that MySQL and Postgres time zones are the same, in which case no error.
datetime(n) timestamp(n) without time zone
time(n) time(n) or interval day to second 0-23:59:59.999999: time(n) ---> time(n). otherwise time(n) --> interval day to second
year smallint or interval year
bit(n) bit(m) m>=n
bit(1) bit(1) or bool
tinyint(1) bool
binary(n) bytea
blob bytea
tinyblob bytea
mediumblob bytea
longblob bytea
char(n) char(n)
varchar(n) varchar(n)
tinytext text
text text
mediumtext text
json json

Oracle to Postgres

oracle postgres description
number(n,p) numeric(n,p)
binary_float float
binary_double double precision
integer int
char(n) char(n)
nvarchar2(n) varchar(n)
varchar2(n) varchar(n)
clob text
nclob text
blob bytea
raw bytea
long raw bytea
long bytea
date timestamp(0) without time zone
timestamp(n) timestamp(n) without time zone Numbers exceeding 6 digits of the Decimal separator will be truncated
timestamp(n) with time zone timestamp(n) with time zone Numbers exceeding 6 digits of the Decimal separator will be truncated
timestamp(n) with local time zone timestamp(n) with time zone Numbers exceeding 6 digits of the Decimal separator will be truncated
interval year to month interval year to month
interval day to month interval day to month

Best Practices

Here are some examples to introduce the usage method and its application scenarios

OLTP business system online migration

A trading system is preparing to migrate from MySQL database to Postgres database. In order to verify whether the Postgres database can meet business needs, it is necessary to establish real-time stream replication between MySQL and Postgres, and switch the read-only business of the system to the Postgres database to verify whether the business system can run normally. Now follow these steps to implement it.

step1

You need to manually or use third-party tools to convert table schema in mysql to Postgres. The mapping of data types can refer to the [MySQL to Postgres](MySQL to Postgres)

Example:

Mysql:

create database shop;
create table shop.user(id int auto_increment primary key, name varchar(50), create_time timestamp(6))

Postgres:

create schema shop;
create table shop.user(id serial primary key, name varchar(50), create_time timestamp(6))

note: the database in mysql will be converted to database in postgres.

step2

You need download zookeeper,kafka, jre(>=jre11) to create a running environment for the Debezium connector and DbSink connector. There are many such documents online, so we won't go into detail here.

step3

Write the configuration json for the debezium connector. Here a template is provided directly.

{
    "name": "mysql_debezium_source_connector",
     "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.allowPublicKeyRetrieval": true,
    "database.user": "wangwei",
    "database.server.id": "1000",
    "tasks.max": 1,
    "database.include.list": "shop",
    "provide.transaction.metadata": true,
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "database.port": 3307,
    "tombstones.on.delete": false,
    "topic.prefix": "olap_migration",
    "schema.history.internal.kafka.topic": "olap_migration_schema_history",
    "database.hostname": "192.168.142.129",
    "database.password": "19961028",
    "snapshot.mode": "initial",
    "snapshot.max.threads": 10,
    "heartbeat.interval.ms": 10000,
    "transforms":"Reroute", 
    "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex":"(.*)",
    "transforms.Reroute.topic.replacement":"all_topics"
    }
}

Here are some special configuration items that you need to notice.

1.snapshot.mode

initial means to capture both snapshot and incremental data.

2.provide.transaction.metadata

In this case, we need get transaction metadata from the source side in order to apply data in transaction, so this configuration item must be set to true

3.transforms

Here we configures the 'ByLogicalTableRouter' transformer to convert all the topics of source records to the same topic('all_topics'). By default, the topic of a record produced by debezium is related to the table identifier. In this way, the order of records consumed by the sink connector may be different from the order produced by the source connector, because events in different tables will have different topics, only records within the same topic can be ordered

After configuring json, send a post request to kafka Connect to create a connector

step4

Write the configuration json for the DbSink connector. Here a template is provided directly.


{
    "name": "DbSinkConnector",
    "config": {
        "connector.class": "io.dbsink.connector.sink.DbSinkConnector",
        "jdbc.password": "wangwei123",
        "jdbc.username": "wangwei",
        "tasks.max": 1,
        "topics": "all_topics",
        "jdbc.url": "jdbc:postgresql://localhost:5432/migration",
        "database.dialect.name": "PostgreSqlDialect",
        "jdbc.driver.class": "org.postgresql.Driver",
        "jdbc.retries.max": 5,
        "jdbc.backoff.ms": 6000,
        "applier.parallel.max": 50,
        "applier.transaction.enabled": "true",
        "applier.transaction.buffer.size": 10000,
        "applier.worker.buffer.size": 100,
        "table.naming.strategy": "io.dbsink.connector.sink.naming.DefaultTableNamingStrategy",
        "column.naming.strategy": "io.dbsink.connector.sink.naming.DefaultColumnNamingStrategy"
    }
}

Here are some special configuration items that you need to notice.

1.apply.transaction.enabled

It specifies whether to apply in a transactional manner. in this case it's true

2.apply.transaction.buffer.size

it specifies the maximum number of cached transactions, the larger this value, the larger the heap memory consumes.

3.applier.worker.buffer.size

it specifies the size of the buffer in applier worker, which indicates the maximum number of transaction which a applier worker can have.

4.applier.parallel.max

it specifies the number of threads to apply parallel

5.table.naming.strategy

It specifies how to resolve the table name from source records, four strategies are provided:

1.DefaultTableNamingStrategy

"Table1"--> "Table1"

2.LowCaseTableNamingStrategy

"Table1"--> "table1"

3.UpperCaseTableNamingStrategy.

"table"--> "TABLE1"

According to step1, DefaultTableNamingStrategy is used.

6.column.naming.strategy

It specifies how to resolve the column name from source records, four strategies are provided:

1.DefaultColumnNamingStrategy

"Col1" ---> "Col1"

2.LowCaseColumnNamingStrategy

"COL1" ---> "col1"

3.UpperCaseColumnNamingStrategy.

"col1" ---> "COL1"

According to step1, DefaultColumnNamingStrategy is used.

After configuring json, send a post request to kafka Connect to create a connector