Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Tombstone Message Handling for JDBC Sink Connector #302

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Joel-hanson
Copy link

Description:

This pull request addresses issue #165, focusing on enhancing the functionality of the JDBC sink connector to handle deletes on tombstone messages effectively. By implementing this feature, users can now delete rows corresponding to tombstone messages, which is particularly useful for scenarios involving Change Data Capture (CDC) from another database.

Changes:

  • Added support for handling tombstone messages in the JDBC sink connector.
  • Implemented the ability to delete rows based on tombstone messages.
  • Introduced a new parameter, delete.enabled, to control delete behavior.
  • Aligned functionality with the documented approach for processing tombstones, similar to Confluent JDBC driver behavior.

Related Issue(s):

Signed-off-by: Joel Hanson joelhanson025@gmail.com

@Joel-hanson
Copy link
Author

@ivanyu, could you please take a look at this pull request when you get a chance? Your feedback would be really valuable. Thank you!

@matuagarwal
Copy link

@jeqo @ivanyu could you please review the PR

@@ -109,25 +118,32 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException {
}

private void prepareStatement() throws SQLException {
final String sql;
log.debug("Generating query for insert mode {} and {} records", config.insertMode, records.size());
Copy link

@davidradl davidradl May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit
Why are we deleting this debug line. It seems like it would be useful to have the records.size().

  • Maybe move the records.size() as a 3rd parameter for the other debug entry.
  • also after the change if there are no records or no tombstones we do not get any debug

if (records.isEmpty()) {
log.debug("Records is empty");
if (records.isEmpty() && tombstoneRecords.isEmpty()) {
log.debug("Records are empty.");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit
I am not sure about this debug.

  • the debug text should include Records and tombstone records are empty.
  • Do we want to have a debug is the records are empty as before and there are tombstones .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have a debug is the records are empty as before and there are tombstones.

It seems redundant to include if statements solely for debugging empty records, as the prepare statement debug (mentioned previously) will already cover this aspect by not debugging the statement if no records are there.

} else {
records.add(record);
}
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) {
log.debug("Flushing buffered records after exceeding configured batch size {}.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit
might be worth adding in the records.size() and tombstoneRecords.size() so we know which one caused the flush

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @davidradl for the review, I have addressed most of the comments

- Added support for handling tombstone messages in the JDBC sink connector.
- Implemented the ability to delete rows based on tombstone messages.
- Introduced a new parameter, `delete.enabled`, to control delete behavior.
- Aligned functionality with the documented approach for processing tombstones,
  similar to Confluent JDBC driver behavior.

Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
@aindriu-aiven
Copy link

Hey @Joel-hanson, thanks a million for your contribution I have this on my list for review on Monday, sorry for the delay I am just catching up on a few projects at the moment!

records.add(record);
}
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) {
log.debug("Flushing buffered records {} and tombstone records {} after exceeding the configured batch size of {}.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,
Just started going through the PR but there are a couple of checkstyle errors on line 102 & Line 124 for lines that exceed 120 characters.

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Joel-hanson for the PR! I've left a few thoughts but overall it looks pretty good.

I'm unable to run the integration tests on my Apple Silicon Macbook, though. I've tried several things (including looking through Testcontainers issues and using an alternative Docker runtime) with no success.

Would it be possible to use Postgres either instead of or in addition to Oracle for the new integration tests? As Apple Silicon becomes more and more prevalent for devs the cost of having tests that can't be run locally increases.

@@ -164,6 +164,7 @@ dependencies {

runtimeOnly("org.xerial:sqlite-jdbc:3.45.2.0")
runtimeOnly("org.postgresql:postgresql:42.7.3")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:19.3.0.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty old driver version; the latest available in Maven Central is 23.4.0.24.05. Was there a specific reason you chose this one?

default String buildDeleteStatement(TableId table,
int records,
Collection<ColumnId> keyColumns) {
return buildDeleteStatement(table, records, keyColumns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this cause a stack overflow for custom DatabaseDialect implementations that don't override this method? We can throw an UnsupportedOperationException as an alternative.

* @throws SQLException if there is a problem binding values into the statement
*/
default void bindTombstoneRecord(SinkRecord record) throws SQLException {
bindTombstoneRecord(record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thought RE potential stack overflows

@@ -86,12 +90,17 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException {
}

final List<SinkRecord> flushed;
if (currentSchemaPair.equals(schemaPair)) {
// Skip the schemaPair check for all tombstone records or the current schema pair matches
if (record.valueSchema() == null || currentSchemaPair.equals(schemaPair)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that a Converter instance may deserialize a record with a non-null schema but a null value, which still qualifies as a tombstone record. Can we check if record.value() == null instead of record.valueSchema() == null?

} else {
records.add(record);
}
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be if (records.size() + tombstoneRecords.size() >= config.batchSize)? Otherwise, for a batch size B, we may end up buffering B * 2 - 1 records before flushing (B tombstone records and B - 1 regular records, or vice-versa).

}
log.debug("Done executing batch.");
if (totalUpdateCount != records.size() && !successNoInfo) {
verifySuccessfulExecutions(totalSuccessfulExecutionCount, successNoInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we invoke this once per executed batch (i.e., once for regular records and once for tombstone records)? Otherwise, the successNoInfo field gets reused across batches and may produce inaccurate results.

Comment on lines +417 to +419
if (getBoolean(DELETE_ENABLED)) {
log.error("Delete mode will enabled only if pk mode set to record_key");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should fail here instead of just logging an error. Bonus points if we implement this multi-property validation in the Connector::validate method, but I don't think it's necessary to go that far for this PR if you don't have the time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle deletes on tombstone messages
5 participants