Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flush all records before closing the producer #3853

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Cali0707
Copy link
Member

Fixes a possible data race: we don't always flush all records before we stop receiving them.

Proposed Changes

  • After we stop allowing records to be sent, we should call flush one more time on the producer

@knative-prow knative-prow bot added the size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. label Apr 23, 2024
Copy link

knative-prow bot commented Apr 23, 2024

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: Cali0707

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Apr 23, 2024
@Cali0707
Copy link
Member Author

/cc @pierDipi

This shouldn't fix all the problems we are seeing, but this is a possible data race so I figured it's best to fix it

@knative-prow knative-prow bot requested a review from pierDipi April 23, 2024 19:32
Copy link

codecov bot commented Apr 23, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 49.72%. Comparing base (aacc057) to head (88f66e2).
Report is 9 commits behind head on main.

Additional details and impacted files
@@              Coverage Diff              @@
##               main    #3853       +/-   ##
=============================================
- Coverage     73.70%   49.72%   -23.98%     
=============================================
  Files           100      246      +146     
  Lines          3419    14819    +11400     
  Branches        292        0      -292     
=============================================
+ Hits           2520     7369     +4849     
- Misses          723     6695     +5972     
- Partials        176      755      +579     
Flag Coverage Δ
java-unittests ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pierDipi
Copy link
Member

/retest

@@ -130,6 +130,7 @@ public Future<Void> close() {
}
sendFromQueueThread.interrupt();
sendFromQueueThread.join();
producer.flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call flush before closing here

public Future<Void> close() {
return producer.flush().compose(s -> closeNow(), c -> {
logger.error("Failed to flush producer", c);
return closeNow();
});
}

The idea of this class is to mimic the core Kafka producer interface and expose "flush" and "close" as separate methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense - I think our issue is that we need to flush after we stop receiving requests with events, but before we close the producer. Currently, we:

  1. Flush
  2. Stop receiving events
  3. Close the producer
    This leads to a data race which causes a whole bunch of metadata requests (which aren't present now that we have this change)

I think we could fix this f we updated the producer to either:

  1. Have another method on the interface to "mark the producer as closed" and then called markedClosed -> flush -> close, or
  2. Have the flush method set the closed variable so that the producer stops receiving events

Copy link
Member

@pierDipi pierDipi Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could fix this f we updated the producer to either:

  1. Have another method on the interface to "mark the producer as closed" and then called markedClosed -> flush -> close, or
  2. Have the flush method set the closed variable so that the producer stops receiving events

Isn't this exactly what the returned future (in close) communicates to the caller ?

Copy link
Member

@pierDipi pierDipi Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning that with the changes in this PR we're doing 1 and 2 as we:

  • set closed to true (line 125) -> stop accepting events (lines 69-70)
  • flush
  • close

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is what is happening currently. I guess what I meant was to set closed to true in the flush method we have in the interface, or call flush again here

@Cali0707
Copy link
Member Author

/retest-required

1 similar comment
@creydr
Copy link
Contributor

creydr commented Apr 25, 2024

/retest-required

Copy link
Contributor

@creydr creydr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@knative-prow knative-prow bot added the lgtm Indicates that a PR is ready to be merged. label Apr 25, 2024
@Cali0707
Copy link
Member Author

/retest-required

@Cali0707
Copy link
Member Author

/hold

The warnings are still present in the newest set of logs, looks like this didn't fix it properly and earlier tests we were just lucky :(

@knative-prow knative-prow bot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Apr 25, 2024
@creydr
Copy link
Contributor

creydr commented Apr 26, 2024

/lgtm cancel

@knative-prow knative-prow bot removed the lgtm Indicates that a PR is ready to be merged. label Apr 26, 2024
@knative-prow-robot knative-prow-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label May 5, 2024
Signed-off-by: Calum Murray <cmurray@redhat.com>
@Cali0707 Cali0707 force-pushed the flush-records-before-closing-producer branch from 7c2171a to 88f66e2 Compare May 6, 2024 12:22
@knative-prow-robot knative-prow-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label May 6, 2024
@Cali0707
Copy link
Member Author

Cali0707 commented May 6, 2024

/cc @creydr @pierDipi

I think we still want to have this change, otherwise there is technically a case where we close the producer while there are still pending sends (since we wait for the eventQueue to be empty, but that only means we processed the event, not that the event's promise completed)

@knative-prow knative-prow bot requested review from creydr and pierDipi May 6, 2024 12:23
@Cali0707
Copy link
Member Author

Cali0707 commented May 6, 2024

/retest-required

Copy link

knative-prow bot commented May 6, 2024

@Cali0707: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
upgrade-tests_eventing-kafka-broker_main 88f66e2 link true /test upgrade-tests
reconciler-tests-keda_eventing-kafka-broker_main 88f66e2 link true /test reconciler-tests-keda

Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. area/data-plane do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. size/XS Denotes a PR that changes 0-9 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants