fix: Fixed data race in consumer group example #2725
Closed
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
In the consumer loop, if any error occurs (other than
sarama.ErrClosedConsumerGroup
), then it'll retry callingclient.Consume
since it's infinitefor
loop. It will also assign a new ready channel at the end of the iteration. That is the race condition. Since it's in goroutine and the parent function will already be waiting on the different channel now (<-consumer.ready
), which will keep waiting forever even if theSetup()
closes the channel (because both the channels are now different).Let me know if this makes sense. Please refer to the comments that I have added in the PR code changes below for more clarity.
I can see the same issue in other examples as well. But fixed this one for now. If this fix makes sense, I can also fix the other occurrences as well.
Here is the explanation of fix:
In this example, since we are only waiting at a single place on the
ready
channel, we'll close theready
channel only once (sync.Once
is required sinceSetup
can be called multiple times in events of rebalance).Here is a similar fix that was required after following this buggy example: https://github.com/open-telemetry/opentelemetry-collector/pull/1696/files, which was required in my project as well because of the issue that I explained above reagarding infinite waiting in case of error.
Thanks.