Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix memory leak issue in publish scheduler #4282

Merged
merged 6 commits into from Jun 21, 2021

Conversation

hongalex
Copy link
Member

@hongalex hongalex commented Jun 17, 2021

This PR builds off the work completed by @mzanibelli in #4253. A detailed description was written there, but essentially we were instantiating outstanding messages to be 2 instead of 1, leading bundles to never be freed, which leads to memory leaks when using large number of ordering keys. This switches the underlying implementation to a sync.Map to properly synchronize access to the bundlers and outstanding maps, allowing the user to safely call Flush and FlushAndStop without race conditions.

Running benchmarks against both the unordered and ordered publish benchmarks reveals no performance degradation. Below are results from the ordered case, publishing 100k 10kb messages, using different ordering keys.

prev(using built-in maps)

BenchmarkPublishThroughput-12    	       1	68024964685 ns/op	  14.70 MB/s	5010368568 B/op	35265687 allocs/o
BenchmarkPublishThroughput-12    	       1	68054813422 ns/op	  14.69 MB/s	5002350416 B/op	35229700 allocs/op

now(using sync.Map)

BenchmarkPublishThroughput-12    	       1	68124099897 ns/op	  14.68 MB/s	5023081976 B/op	36219292 allocs/op
BenchmarkPublishThroughput-12    	       1	68060299607 ns/op	  14.69 MB/s	5027381288 B/op	36268891 allocs/op

In addition, a test was added to topic_test to verify the correct behavior of calling PublishScheduler.Add while a Flush is in flight.

This also fixes a small issue when using the pstest fake's AddPublishResponse method which allows you to call Publish before calling AddPublishResponse by removing the lock acquisition.

Fixes #4288

@hongalex hongalex requested review from a team as code owners June 17, 2021 23:37
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jun 17, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Jun 17, 2021
if !ok {
s.outstanding[key] = 1
s.outstanding.Store(key, 1)
b = bundler.NewBundler(item, func(bundle interface{}) {
s.workers <- struct{}{}
s.handle(bundle)
<-s.workers

nlen := reflect.ValueOf(bundle).Len()
s.mu.Lock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that's it! You left the lock between Add() and handle() so you don't load non-existing keys from the map inside the else clause. This gave me a hard time and I thought I was doing something wrong when I tried this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

pubsub: Memory growth observed while sending messages with ordering keys
3 participants