From 825ddd692363eb2dd8cd253cc5976867e432f547 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 24 May 2021 15:58:02 -0700 Subject: [PATCH] feat(pubsub): add flush method to topic (#2863) * feat(pubsub): add flush method to topic --- .../internal/scheduler/publish_scheduler.go | 13 ++++++ pubsub/topic.go | 8 ++++ pubsub/topic_test.go | 42 +++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/pubsub/internal/scheduler/publish_scheduler.go b/pubsub/internal/scheduler/publish_scheduler.go index 7d70b922e48..9dbf51c1cfd 100644 --- a/pubsub/internal/scheduler/publish_scheduler.go +++ b/pubsub/internal/scheduler/publish_scheduler.go @@ -157,6 +157,19 @@ func (s *PublishScheduler) FlushAndStop() { } } +// Flush waits until all bundlers are sent. +func (s *PublishScheduler) Flush() { + var wg sync.WaitGroup + for _, b := range s.bundlers { + wg.Add(1) + go func(b *bundler.Bundler) { + defer wg.Done() + b.Flush() + }(b) + } + wg.Wait() +} + // IsPaused checks if the bundler associated with an ordering keys is // paused. func (s *PublishScheduler) IsPaused(orderingKey string) bool { diff --git a/pubsub/topic.go b/pubsub/topic.go index 806a2350969..c1bd00cb29c 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -474,6 +474,14 @@ func (t *Topic) Stop() { t.scheduler.FlushAndStop() } +// Flush blocks until all remaining messages are sent. +func (t *Topic) Flush() { + if t.stopped || t.scheduler == nil { + return + } + t.scheduler.Flush() +} + type bundledMessage struct { msg *Message res *PublishResult diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 2736fb2e9c5..1707ba16ede 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -307,3 +307,45 @@ func TestDetachSubscription(t *testing.T) { t.Errorf("DetachSubscription failed: %v", err) } } + +func TestFlushStopTopic(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic, err := c.CreateTopic(ctx, "flush-topic") + if err != nil { + t.Fatal(err) + } + + // Subsequent publishes after a flush should succeed. + topic.Flush() + r := topic.Publish(ctx, &Message{ + Data: []byte("hello"), + }) + _, err = r.Get(ctx) + if err != nil { + t.Errorf("got err: %v", err) + } + + // Publishing after a flush should succeed. + topic.Flush() + r = topic.Publish(ctx, &Message{ + Data: []byte("world"), + }) + _, err = r.Get(ctx) + if err != nil { + t.Errorf("got err: %v", err) + } + + // Publishing after Stop should fail. + topic.Stop() + r = topic.Publish(ctx, &Message{ + Data: []byte("this should fail"), + }) + _, err = r.Get(ctx) + if err != errTopicStopped { + t.Errorf("got %v, want errTopicStopped", err) + } +}