New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: flush all records before closing the producer #3853
base: main
Are you sure you want to change the base?
fix: flush all records before closing the producer #3853
Conversation
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Cali0707 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/cc @pierDipi This shouldn't fix all the problems we are seeing, but this is a possible data race so I figured it's best to fix it |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #3853 +/- ##
=============================================
- Coverage 73.70% 49.72% -23.98%
=============================================
Files 100 246 +146
Lines 3419 14819 +11400
Branches 292 0 -292
=============================================
+ Hits 2520 7369 +4849
- Misses 723 6695 +5972
- Partials 176 755 +579
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
/retest |
@@ -130,6 +130,7 @@ public Future<Void> close() { | |||
} | |||
sendFromQueueThread.interrupt(); | |||
sendFromQueueThread.join(); | |||
producer.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call flush before closing here
Lines 253 to 258 in 49ad678
public Future<Void> close() { | |
return producer.flush().compose(s -> closeNow(), c -> { | |
logger.error("Failed to flush producer", c); | |
return closeNow(); | |
}); | |
} |
The idea of this class is to mimic the core Kafka producer interface and expose "flush" and "close" as separate methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that makes sense - I think our issue is that we need to flush after we stop receiving requests with events, but before we close the producer. Currently, we:
- Flush
- Stop receiving events
- Close the producer
This leads to a data race which causes a whole bunch of metadata requests (which aren't present now that we have this change)
I think we could fix this f we updated the producer to either:
- Have another method on the interface to "mark the producer as closed" and then called
markedClosed -> flush -> close
, or - Have the
flush
method set the closed variable so that the producer stops receiving events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could fix this f we updated the producer to either:
- Have another method on the interface to "mark the producer as closed" and then called
markedClosed -> flush -> close
, or- Have the
flush
method set the closed variable so that the producer stops receiving events
Isn't this exactly what the returned future (in close) communicates to the caller ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning that with the changes in this PR we're doing 1 and 2 as we:
- set closed to true (line 125) -> stop accepting events (lines 69-70)
- flush
- close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that is what is happening currently. I guess what I meant was to set closed to true in the flush
method we have in the interface, or call flush again here
/retest-required |
1 similar comment
/retest-required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/retest-required |
/hold The warnings are still present in the newest set of logs, looks like this didn't fix it properly and earlier tests we were just lucky :( |
/lgtm cancel |
Signed-off-by: Calum Murray <cmurray@redhat.com>
7c2171a
to
88f66e2
Compare
/retest-required |
@Cali0707: The following tests failed, say
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Fixes a possible data race: we don't always flush all records before we stop receiving them.
Proposed Changes