Skip to content

Commit

Permalink
fix(pubsub): respect subscription message ordering field in scheduler (
Browse files Browse the repository at this point in the history
…#3886)

* fix(pubsub): respect subscription message ordering field in scheduler

* clarify comments and add testing to publishSync

* fix doc comments, remove unnecessary assignment

* revert pullstream test change

* fix error with RPC error test after adding config check in subscription
  • Loading branch information
hongalex committed Apr 5, 2021
1 parent 48b4e59 commit 1fcc78a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 3 deletions.
81 changes: 81 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -683,6 +684,16 @@ func TestIntegration_UpdateSubscription(t *testing.T) {
}
}

// publishSync is a utility function for publishing a message and
// blocking until the message has been confirmed.
func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) {
res := topic.Publish(ctx, msg)
_, err := res.Get(ctx)
if err != nil {
t.Fatalf("publishSync err: %v", err)
}
}

func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down Expand Up @@ -1344,6 +1355,76 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
}
}

// TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages
// with ordering keys are not processed as such if the subscription
// does not have message ordering enabled.
func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
topic.EnableMessageOrdering = true

// Explicitly disable message ordering on the subscription.
enableMessageOrdering := false
subCfg := SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: enableMessageOrdering,
}
sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg)
if err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)

publishSync(ctx, t, topic, &Message{
Data: []byte("message-1"),
OrderingKey: "ordering-key-1",
})

publishSync(ctx, t, topic, &Message{
Data: []byte("message-2"),
OrderingKey: "ordering-key-1",
})

sub.ReceiveSettings.Synchronous = true
ctx2, cancel := context.WithTimeout(ctx, 12*time.Second)
defer cancel()

var numAcked int32
sub.Receive(ctx2, func(_ context.Context, msg *Message) {
// Create artificial constraints on message processing time.
if string(msg.Data) == "message-1" {
time.Sleep(10 * time.Second)
} else {
time.Sleep(5 * time.Second)
}
msg.Ack()
atomic.AddInt32(&numAcked, 1)
})
if sub.enableOrdering != enableMessageOrdering {
t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering)
}
// If the messages were received on a subscription with the EnableMessageOrdering=true,
// total processing would exceed the timeout and only one message would be processed.
if numAcked < 2 {
t.Fatalf("did not process all messages in time, numAcked: %d", numAcked)
}
}

func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down
14 changes: 12 additions & 2 deletions pubsub/pullstream_test.go
Expand Up @@ -101,10 +101,20 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) {
t.Fatal(err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, "foo")
if err != nil {
t.Fatal(err)
}
sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatal(err)
}

errc := make(chan error)
go func() {
errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) {
errc <- sub.Receive(ctx, func(context.Context, *Message) {
t.Error("should not have received any data")
})
}()
Expand All @@ -118,7 +128,7 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) {
t.Fatal("expected to receive a grpc ResourceExhausted error")
}
} else {
t.Fatal("expected to receive a grpc ResourceExhausted error")
t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err)
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion pubsub/subscription.go
Expand Up @@ -47,6 +47,8 @@ type Subscription struct {

mu sync.Mutex
receiveActive bool

enableOrdering bool
}

// Subscription creates a reference to a subscription.
Expand Down Expand Up @@ -772,6 +774,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

// Check config to check EnableMessageOrdering field.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
cfg, err := s.Config(ctx)
if err != nil {
return fmt.Errorf("sub.Config err: %v", err)
}
s.enableOrdering = cfg.EnableMessageOrdering

maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
Expand Down Expand Up @@ -901,9 +911,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old(ackID, ack, receiveTime)
}
wg.Add(1)
// Make sure the subscription has ordering enabled before adding to scheduler.
var key string
if s.enableOrdering {
key = msg.OrderingKey
}
// TODO(deklerk): Can we have a generic handler at the
// constructor level?
if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) {
if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
f(ctx2, msg.(*Message))
}); err != nil {
Expand Down

0 comments on commit 1fcc78a

Please sign in to comment.