Skip to content

Commit

Permalink
feat(pubsub): add flush method to topic (#2863)
Browse files Browse the repository at this point in the history
* feat(pubsub): add flush method to topic
  • Loading branch information
hongalex committed May 24, 2021
1 parent e4fcc83 commit 825ddd6
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pubsub/internal/scheduler/publish_scheduler.go
Expand Up @@ -157,6 +157,19 @@ func (s *PublishScheduler) FlushAndStop() {
}
}

// Flush waits until all bundlers are sent.
func (s *PublishScheduler) Flush() {
var wg sync.WaitGroup
for _, b := range s.bundlers {
wg.Add(1)
go func(b *bundler.Bundler) {
defer wg.Done()
b.Flush()
}(b)
}
wg.Wait()
}

// IsPaused checks if the bundler associated with an ordering keys is
// paused.
func (s *PublishScheduler) IsPaused(orderingKey string) bool {
Expand Down
8 changes: 8 additions & 0 deletions pubsub/topic.go
Expand Up @@ -474,6 +474,14 @@ func (t *Topic) Stop() {
t.scheduler.FlushAndStop()
}

// Flush blocks until all remaining messages are sent.
func (t *Topic) Flush() {
if t.stopped || t.scheduler == nil {
return
}
t.scheduler.Flush()
}

type bundledMessage struct {
msg *Message
res *PublishResult
Expand Down
42 changes: 42 additions & 0 deletions pubsub/topic_test.go
Expand Up @@ -307,3 +307,45 @@ func TestDetachSubscription(t *testing.T) {
t.Errorf("DetachSubscription failed: %v", err)
}
}

func TestFlushStopTopic(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "flush-topic")
if err != nil {
t.Fatal(err)
}

// Subsequent publishes after a flush should succeed.
topic.Flush()
r := topic.Publish(ctx, &Message{
Data: []byte("hello"),
})
_, err = r.Get(ctx)
if err != nil {
t.Errorf("got err: %v", err)
}

// Publishing after a flush should succeed.
topic.Flush()
r = topic.Publish(ctx, &Message{
Data: []byte("world"),
})
_, err = r.Get(ctx)
if err != nil {
t.Errorf("got err: %v", err)
}

// Publishing after Stop should fail.
topic.Stop()
r = topic.Publish(ctx, &Message{
Data: []byte("this should fail"),
})
_, err = r.Get(ctx)
if err != errTopicStopped {
t.Errorf("got %v, want errTopicStopped", err)
}
}

0 comments on commit 825ddd6

Please sign in to comment.