From 1fcc78ac6ecb461c3bbede9667436614c9df1535 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 5 Apr 2021 12:05:28 -0700 Subject: [PATCH] fix(pubsub): respect subscription message ordering field in scheduler (#3886) * fix(pubsub): respect subscription message ordering field in scheduler * clarify comments and add testing to publishSync * fix doc comments, remove unnecessary assignment * revert pullstream test change * fix error with RPC error test after adding config check in subscription --- pubsub/integration_test.go | 81 ++++++++++++++++++++++++++++++++++++++ pubsub/pullstream_test.go | 14 ++++++- pubsub/subscription.go | 17 +++++++- 3 files changed, 109 insertions(+), 3 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 3427655b14f..c3e172ee18f 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -22,6 +22,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "time" @@ -683,6 +684,16 @@ func TestIntegration_UpdateSubscription(t *testing.T) { } } +// publishSync is a utility function for publishing a message and +// blocking until the message has been confirmed. +func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) { + res := topic.Publish(ctx, msg) + _, err := res.Get(ctx) + if err != nil { + t.Fatalf("publishSync err: %v", err) + } +} + func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1344,6 +1355,76 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { } } +// TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages +// with ordering keys are not processed as such if the subscription +// does not have message ordering enabled. +func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) + defer client.Close() + + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("topic %v should exist, but it doesn't", topic) + } + topic.EnableMessageOrdering = true + + // Explicitly disable message ordering on the subscription. + enableMessageOrdering := false + subCfg := SubscriptionConfig{ + Topic: topic, + EnableMessageOrdering: enableMessageOrdering, + } + sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg) + if err != nil { + t.Fatal(err) + } + defer sub.Delete(ctx) + + publishSync(ctx, t, topic, &Message{ + Data: []byte("message-1"), + OrderingKey: "ordering-key-1", + }) + + publishSync(ctx, t, topic, &Message{ + Data: []byte("message-2"), + OrderingKey: "ordering-key-1", + }) + + sub.ReceiveSettings.Synchronous = true + ctx2, cancel := context.WithTimeout(ctx, 12*time.Second) + defer cancel() + + var numAcked int32 + sub.Receive(ctx2, func(_ context.Context, msg *Message) { + // Create artificial constraints on message processing time. + if string(msg.Data) == "message-1" { + time.Sleep(10 * time.Second) + } else { + time.Sleep(5 * time.Second) + } + msg.Ack() + atomic.AddInt32(&numAcked, 1) + }) + if sub.enableOrdering != enableMessageOrdering { + t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering) + } + // If the messages were received on a subscription with the EnableMessageOrdering=true, + // total processing would exceed the timeout and only one message would be processed. + if numAcked < 2 { + t.Fatalf("did not process all messages in time, numAcked: %d", numAcked) + } +} + func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go index a91f98e67a7..5554a29e062 100644 --- a/pubsub/pullstream_test.go +++ b/pubsub/pullstream_test.go @@ -101,10 +101,20 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) { t.Fatal(err) } defer client.Close() + topic, err := client.CreateTopic(ctx, "foo") + if err != nil { + t.Fatal(err) + } + sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + t.Fatal(err) + } errc := make(chan error) go func() { - errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) { + errc <- sub.Receive(ctx, func(context.Context, *Message) { t.Error("should not have received any data") }) }() @@ -118,7 +128,7 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) { t.Fatal("expected to receive a grpc ResourceExhausted error") } } else { - t.Fatal("expected to receive a grpc ResourceExhausted error") + t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err) } } } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 85f8e20552c..982743b71a0 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -47,6 +47,8 @@ type Subscription struct { mu sync.Mutex receiveActive bool + + enableOrdering bool } // Subscription creates a reference to a subscription. @@ -772,6 +774,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() + // Check config to check EnableMessageOrdering field. + // See: https://github.com/googleapis/google-cloud-go/issues/3884 + cfg, err := s.Config(ctx) + if err != nil { + return fmt.Errorf("sub.Config err: %v", err) + } + s.enableOrdering = cfg.EnableMessageOrdering + maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { maxCount = DefaultReceiveSettings.MaxOutstandingMessages @@ -901,9 +911,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes old(ackID, ack, receiveTime) } wg.Add(1) + // Make sure the subscription has ordering enabled before adding to scheduler. + var key string + if s.enableOrdering { + key = msg.OrderingKey + } // TODO(deklerk): Can we have a generic handler at the // constructor level? - if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) { + if err := sched.Add(key, msg, func(msg interface{}) { defer wg.Done() f(ctx2, msg.(*Message)) }); err != nil {