Skip to content

nicoDs96/debezium-engine-demo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DEBEZIUM DEMO

This project aims to experiment using debezium cdc engine without relying on kafka.

Motivation

While the standard deployment using kafka connect is the most robust CDC option, we are capturing changes only for two small sized tables, one with 20K insert every 5 min at most 24 times in one day, for two days each month. The other has very rare CRUD operation after initialization (esteemed to be 200 each month). Since the major cloud providers implements pay-as-you-fart mechanism, and since using kafka we should also need a consumer, we have decided to merge the CDC engine within the consumer to reduce the deployment cost. It has been made possible since we do not think that parallelism is needed to address the current load: a single consumer is enough.

Important

Debezium acts like a consumer of replication record provided by postgres. To achieve this goal an entry into the table pg_replication_slots is created. If, for any reason, the debezium process is no more running but the entry is still in the pg_replication_slots table, the database will experience a huge disk space increase. To address this behaviour use

select pg_drop_replication_slot(slot_name) 
from pg_replication_slots

to kill them all or

select pg_drop_replication_slot(${slot_name}) 

to kill the unused one. Postgres will be in charge of cleanup the unused disk space.

How-to

Track a new table:

  1. append schema.table to the csv list of tables to track, into .properites files under debezium.table.include.list
  2. create a package under it.nicods.debeziumdemo.data named schema.table_to_track
  3. under the new package create
    1. a class TableName.java with the following annotations:
      @Entity
      @Table(name = "table_name_tracking", schema = "tracking")
      @Getter
      @Setter
      @ToString
      @NoArgsConstructor
      @AllArgsConstructor
      @Builder
      and with properties all the field of the table plus the following:
      @Id
      @GeneratedValue
      private Integer Id;
      //table fields...
      private String operation;
      private ZonedDateTime ts;
    2. create the relative repository:
       @Repository
       public interface TableNameRepository extends JpaRepository<TableName, Integer> {}
    3. Create a change event class:
      @AllArgsConstructor
      @NoArgsConstructor
      @Getter
      @Setter
      @ToString
      public class TableNameChangeEvent {
      private TableName before;
      private TableName after;
      private String op;
      private Long ts_ms;
      }
    4. Create a service class capable of handling the created change event:
       @Slf4j
       public class TableNameService extends TrackingService<TableNameChangeEvent> {
       public TableNameRepository repository;
      
           public TableNameService(TableNameRepository repository){
               this.repository =repository;
           }
      
           @Override
           @Transactional
           public void handleEvent(TableNameChangeEvent record) {
               //DO STUFF
           }
       }
  4. Integrate the new table change event into DebeziumSourceEventListener:
    case "cdc-demo-connector.schema.table_name":
        TableNameChangeEvent tEvent = tableNameService.deserialize(sourceRecordValue, TableNameChangeEvent.class);
        log.debug("JSON DESER {}", tEvent);
        tableNameService.handleEvent(tEvent);
        return true;