Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add flush method to topic #2863

Merged
merged 10 commits into from May 24, 2021
13 changes: 13 additions & 0 deletions pubsub/internal/scheduler/publish_scheduler.go
Expand Up @@ -157,6 +157,19 @@ func (s *PublishScheduler) FlushAndStop() {
}
}

// Flush waits until all bundlers are sent.
func (s *PublishScheduler) Flush() {
var wg sync.WaitGroup
for _, b := range s.bundlers {
wg.Add(1)
go func(b *bundler.Bundler) {
defer wg.Done()
b.Flush()
}(b)
}
wg.Wait()
}

// IsPaused checks if the bundler associated with an ordering keys is
// paused.
func (s *PublishScheduler) IsPaused(orderingKey string) bool {
Expand Down
8 changes: 8 additions & 0 deletions pubsub/topic.go
Expand Up @@ -474,6 +474,14 @@ func (t *Topic) Stop() {
t.scheduler.FlushAndStop()
}

// Flush blocks until all remaining messages are sent.
func (t *Topic) Flush() {
codyoss marked this conversation as resolved.
Show resolved Hide resolved
if t.stopped || t.scheduler == nil {
return
}
t.scheduler.Flush()
}

type bundledMessage struct {
msg *Message
res *PublishResult
Expand Down
42 changes: 42 additions & 0 deletions pubsub/topic_test.go
Expand Up @@ -307,3 +307,45 @@ func TestDetachSubscription(t *testing.T) {
t.Errorf("DetachSubscription failed: %v", err)
}
}

func TestFlushStopTopic(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "flush-topic")
if err != nil {
t.Fatal(err)
}

// Subsequent publishes after a flush should succeed.
topic.Flush()
r := topic.Publish(ctx, &Message{
Data: []byte("hello"),
})
_, err = r.Get(ctx)
if err != nil {
t.Errorf("got err: %v", err)
}

// Publishing after a flush should succeed.
topic.Flush()
r = topic.Publish(ctx, &Message{
Data: []byte("world"),
})
_, err = r.Get(ctx)
if err != nil {
t.Errorf("got err: %v", err)
}

// Publishing after Stop should fail.
topic.Stop()
r = topic.Publish(ctx, &Message{
Data: []byte("this should fail"),
})
_, err = r.Get(ctx)
if err != errTopicStopped {
t.Errorf("got %v, want errTopicStopped", err)
}
}