New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Tombstone Message Handling for JDBC Sink Connector #302
base: master
Are you sure you want to change the base?
Conversation
085a497
to
d7e51af
Compare
@ivanyu, could you please take a look at this pull request when you get a chance? Your feedback would be really valuable. Thank you! |
@@ -109,25 +118,32 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException { | |||
} | |||
|
|||
private void prepareStatement() throws SQLException { | |||
final String sql; | |||
log.debug("Generating query for insert mode {} and {} records", config.insertMode, records.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
Why are we deleting this debug line. It seems like it would be useful to have the records.size().
- Maybe move the records.size() as a 3rd parameter for the other debug entry.
- also after the change if there are no records or no tombstones we do not get any debug
if (records.isEmpty()) { | ||
log.debug("Records is empty"); | ||
if (records.isEmpty() && tombstoneRecords.isEmpty()) { | ||
log.debug("Records are empty."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
I am not sure about this debug.
- the debug text should include Records and tombstone records are empty.
- Do we want to have a debug is the records are empty as before and there are tombstones .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to have a debug is the records are empty as before and there are tombstones.
It seems redundant to include if statements solely for debugging empty records, as the prepare statement debug (mentioned previously) will already cover this aspect by not debugging the statement if no records are there.
} else { | ||
records.add(record); | ||
} | ||
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) { | ||
log.debug("Flushing buffered records after exceeding configured batch size {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
might be worth adding in the records.size() and tombstoneRecords.size() so we know which one caused the flush
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @davidradl for the review, I have addressed most of the comments
- Added support for handling tombstone messages in the JDBC sink connector. - Implemented the ability to delete rows based on tombstone messages. - Introduced a new parameter, `delete.enabled`, to control delete behavior. - Aligned functionality with the documented approach for processing tombstones, similar to Confluent JDBC driver behavior. Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
d7e51af
to
5e2a3c8
Compare
Hey @Joel-hanson, thanks a million for your contribution I have this on my list for review on Monday, sorry for the delay I am just catching up on a few projects at the moment! |
records.add(record); | ||
} | ||
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) { | ||
log.debug("Flushing buffered records {} and tombstone records {} after exceeding the configured batch size of {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey,
Just started going through the PR but there are a couple of checkstyle errors on line 102 & Line 124 for lines that exceed 120 characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Joel-hanson for the PR! I've left a few thoughts but overall it looks pretty good.
I'm unable to run the integration tests on my Apple Silicon Macbook, though. I've tried several things (including looking through Testcontainers issues and using an alternative Docker runtime) with no success.
Would it be possible to use Postgres either instead of or in addition to Oracle for the new integration tests? As Apple Silicon becomes more and more prevalent for devs the cost of having tests that can't be run locally increases.
@@ -164,6 +164,7 @@ dependencies { | |||
|
|||
runtimeOnly("org.xerial:sqlite-jdbc:3.45.2.0") | |||
runtimeOnly("org.postgresql:postgresql:42.7.3") | |||
runtimeOnly("com.oracle.database.jdbc:ojdbc8:19.3.0.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a pretty old driver version; the latest available in Maven Central is 23.4.0.24.05. Was there a specific reason you chose this one?
default String buildDeleteStatement(TableId table, | ||
int records, | ||
Collection<ColumnId> keyColumns) { | ||
return buildDeleteStatement(table, records, keyColumns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this cause a stack overflow for custom DatabaseDialect
implementations that don't override this method? We can throw an UnsupportedOperationException
as an alternative.
* @throws SQLException if there is a problem binding values into the statement | ||
*/ | ||
default void bindTombstoneRecord(SinkRecord record) throws SQLException { | ||
bindTombstoneRecord(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thought RE potential stack overflows
@@ -86,12 +90,17 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException { | |||
} | |||
|
|||
final List<SinkRecord> flushed; | |||
if (currentSchemaPair.equals(schemaPair)) { | |||
// Skip the schemaPair check for all tombstone records or the current schema pair matches | |||
if (record.valueSchema() == null || currentSchemaPair.equals(schemaPair)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible that a Converter
instance may deserialize a record with a non-null schema but a null value, which still qualifies as a tombstone record. Can we check if record.value() == null
instead of record.valueSchema() == null
?
} else { | ||
records.add(record); | ||
} | ||
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be if (records.size() + tombstoneRecords.size() >= config.batchSize)
? Otherwise, for a batch size B
, we may end up buffering B * 2 - 1
records before flushing (B
tombstone records and B - 1
regular records, or vice-versa).
} | ||
log.debug("Done executing batch."); | ||
if (totalUpdateCount != records.size() && !successNoInfo) { | ||
verifySuccessfulExecutions(totalSuccessfulExecutionCount, successNoInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we invoke this once per executed batch (i.e., once for regular records and once for tombstone records)? Otherwise, the successNoInfo
field gets reused across batches and may produce inaccurate results.
if (getBoolean(DELETE_ENABLED)) { | ||
log.error("Delete mode will enabled only if pk mode set to record_key"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should fail here instead of just logging an error. Bonus points if we implement this multi-property validation in the Connector::validate method, but I don't think it's necessary to go that far for this PR if you don't have the time.
Description:
This pull request addresses issue #165, focusing on enhancing the functionality of the JDBC sink connector to handle deletes on tombstone messages effectively. By implementing this feature, users can now delete rows corresponding to tombstone messages, which is particularly useful for scenarios involving Change Data Capture (CDC) from another database.
Changes:
delete.enabled
, to control delete behavior.Related Issue(s):
Signed-off-by: Joel Hanson joelhanson025@gmail.com