From 0b6d801a35250e8240ef2ebc175ca0065415d932 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 2 Apr 2021 18:05:02 -0700 Subject: [PATCH 1/5] fix(pubsub): respect subscription message ordering field in scheduler --- pubsub/integration_test.go | 79 ++++++++++++++++++++++++++++++++++++++ pubsub/subscription.go | 19 ++++++++- 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 3427655b14f..7e396c77e1e 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -22,6 +22,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "time" @@ -683,6 +684,14 @@ func TestIntegration_UpdateSubscription(t *testing.T) { } } +// publishSync is a utility function for publishing a message and +// blocking until the message has been confirmed. +func publishSync(ctx context.Context, topic *Topic, msg *Message) error { + res := topic.Publish(ctx, msg) + _, err := res.Get(ctx) + return err +} + func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1344,6 +1353,76 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { } } +// TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages +// with ordering keys are not processed as such if the subscription +// does not message ordering enabled. +func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) + defer client.Close() + + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("topic %v should exist, but it doesn't", topic) + } + topic.EnableMessageOrdering = true + + // Explicitly disable message ordering on the subscription. + enableMessageOrdering := false + subCfg := SubscriptionConfig{ + Topic: topic, + EnableMessageOrdering: enableMessageOrdering, + } + sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg) + if err != nil { + t.Fatal(err) + } + defer sub.Delete(ctx) + + publishSync(ctx, topic, &Message{ + Data: []byte("message-1"), + OrderingKey: "ordering-key-1", + }) + + publishSync(ctx, topic, &Message{ + Data: []byte("message-2"), + OrderingKey: "ordering-key-1", + }) + + sub.ReceiveSettings.Synchronous = true + ctx2, cancel := context.WithTimeout(ctx, 12*time.Second) + defer cancel() + + var numAcked int32 + sub.Receive(ctx2, func(_ context.Context, msg *Message) { + // Create artificial constraints on message processing time. + if string(msg.Data) == "message-1" { + time.Sleep(10 * time.Second) + } else { + time.Sleep(5 * time.Second) + } + msg.Ack() + atomic.AddInt32(&numAcked, 1) + }) + if sub.enableOrdering != enableMessageOrdering { + t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering) + } + // If the messages were received on a subscription with the EnableMessageOrdering=true, + // total processing would exceed the timeout and only one message would be processed. + if numAcked < 2 { + t.Fatalf("did not process all messages in time, numAcked: %d", numAcked) + } +} + func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 85f8e20552c..bc9f7def0fe 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -47,6 +47,8 @@ type Subscription struct { mu sync.Mutex receiveActive bool + + enableOrdering bool } // Subscription creates a reference to a subscription. @@ -772,6 +774,13 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() + // Call getSubscription before to check for EnableMessageOrdering field. + cfg, err := s.Config(ctx) + if err != nil { + return fmt.Errorf("sub.Config err: %v", err) + } + s.enableOrdering = cfg.EnableMessageOrdering + maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { maxCount = DefaultReceiveSettings.MaxOutstandingMessages @@ -901,9 +910,17 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes old(ackID, ack, receiveTime) } wg.Add(1) + // If using a key, make sure the subscription has ordering enabled. + // See: https://github.com/googleapis/google-cloud-go/issues/3884 + var key string + if s.enableOrdering { + key = msg.OrderingKey + } else { + key = "" + } // TODO(deklerk): Can we have a generic handler at the // constructor level? - if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) { + if err := sched.Add(key, msg, func(msg interface{}) { defer wg.Done() f(ctx2, msg.(*Message)) }); err != nil { From a5ca57deff5dd0ab00f48158936d88da37e1dbf2 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 2 Apr 2021 18:08:47 -0700 Subject: [PATCH 2/5] clarify comments and add testing to publishSync --- pubsub/integration_test.go | 10 ++++++---- pubsub/subscription.go | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 7e396c77e1e..ff31da62647 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -686,10 +686,12 @@ func TestIntegration_UpdateSubscription(t *testing.T) { // publishSync is a utility function for publishing a message and // blocking until the message has been confirmed. -func publishSync(ctx context.Context, topic *Topic, msg *Message) error { +func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) { res := topic.Publish(ctx, msg) _, err := res.Get(ctx) - return err + if err != nil { + t.Fatalf("publishSync err: %v", err) + } } func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) { @@ -1388,12 +1390,12 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { } defer sub.Delete(ctx) - publishSync(ctx, topic, &Message{ + publishSync(ctx, t, topic, &Message{ Data: []byte("message-1"), OrderingKey: "ordering-key-1", }) - publishSync(ctx, topic, &Message{ + publishSync(ctx, t, topic, &Message{ Data: []byte("message-2"), OrderingKey: "ordering-key-1", }) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index bc9f7def0fe..03a75472bdb 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -774,7 +774,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() - // Call getSubscription before to check for EnableMessageOrdering field. + // Check config to check EnableMessageOrdering field. + // See: https://github.com/googleapis/google-cloud-go/issues/3884 cfg, err := s.Config(ctx) if err != nil { return fmt.Errorf("sub.Config err: %v", err) @@ -910,8 +911,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes old(ackID, ack, receiveTime) } wg.Add(1) - // If using a key, make sure the subscription has ordering enabled. - // See: https://github.com/googleapis/google-cloud-go/issues/3884 + // Make sure the subscription has ordering enabled before adding to scheduler. var key string if s.enableOrdering { key = msg.OrderingKey From a80b7ba8c8787b17d64039660453ee57d121f6d5 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 5 Apr 2021 10:21:30 -0700 Subject: [PATCH 3/5] fix doc comments, remove unnecessary assignment --- pubsub/integration_test.go | 2 +- pubsub/pullstream_test.go | 2 +- pubsub/subscription.go | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index ff31da62647..c3e172ee18f 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1357,7 +1357,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { // TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages // with ordering keys are not processed as such if the subscription -// does not message ordering enabled. +// does not have message ordering enabled. func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { ctx := context.Background() client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go index a91f98e67a7..2d9650d764e 100644 --- a/pubsub/pullstream_test.go +++ b/pubsub/pullstream_test.go @@ -118,7 +118,7 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) { t.Fatal("expected to receive a grpc ResourceExhausted error") } } else { - t.Fatal("expected to receive a grpc ResourceExhausted error") + t.Fatalf("expected to receive a grpc ResourceExhausted error, got: %v", err) } } } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 03a75472bdb..982743b71a0 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -915,8 +915,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes var key string if s.enableOrdering { key = msg.OrderingKey - } else { - key = "" } // TODO(deklerk): Can we have a generic handler at the // constructor level? From eafc4d5fc213f657c5ee4622d69c02aa514cdc73 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 5 Apr 2021 10:28:53 -0700 Subject: [PATCH 4/5] revert pullstream test change --- pubsub/pullstream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go index 2d9650d764e..a91f98e67a7 100644 --- a/pubsub/pullstream_test.go +++ b/pubsub/pullstream_test.go @@ -118,7 +118,7 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) { t.Fatal("expected to receive a grpc ResourceExhausted error") } } else { - t.Fatalf("expected to receive a grpc ResourceExhausted error, got: %v", err) + t.Fatal("expected to receive a grpc ResourceExhausted error") } } } From 2f791f73c0d44fad0503fa484c62978d8e3b3a12 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 5 Apr 2021 10:58:13 -0700 Subject: [PATCH 5/5] fix error with RPC error test after adding config check in subscription --- pubsub/pullstream_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go index a91f98e67a7..5554a29e062 100644 --- a/pubsub/pullstream_test.go +++ b/pubsub/pullstream_test.go @@ -101,10 +101,20 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) { t.Fatal(err) } defer client.Close() + topic, err := client.CreateTopic(ctx, "foo") + if err != nil { + t.Fatal(err) + } + sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + t.Fatal(err) + } errc := make(chan error) go func() { - errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) { + errc <- sub.Receive(ctx, func(context.Context, *Message) { t.Error("should not have received any data") }) }() @@ -118,7 +128,7 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) { t.Fatal("expected to receive a grpc ResourceExhausted error") } } else { - t.Fatal("expected to receive a grpc ResourceExhausted error") + t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err) } } }