Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): respect subscription message ordering field in scheduler #3886

Merged
merged 7 commits into from Apr 5, 2021
81 changes: 81 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -683,6 +684,16 @@ func TestIntegration_UpdateSubscription(t *testing.T) {
}
}

// publishSync is a utility function for publishing a message and
// blocking until the message has been confirmed.
func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) {
res := topic.Publish(ctx, msg)
_, err := res.Get(ctx)
if err != nil {
t.Fatalf("publishSync err: %v", err)
}
}

func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down Expand Up @@ -1344,6 +1355,76 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
}
}

// TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages
// with ordering keys are not processed as such if the subscription
// does not have message ordering enabled.
func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
topic.EnableMessageOrdering = true

// Explicitly disable message ordering on the subscription.
enableMessageOrdering := false
subCfg := SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: enableMessageOrdering,
}
sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg)
if err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)

publishSync(ctx, t, topic, &Message{
Data: []byte("message-1"),
OrderingKey: "ordering-key-1",
})

publishSync(ctx, t, topic, &Message{
Data: []byte("message-2"),
OrderingKey: "ordering-key-1",
})

sub.ReceiveSettings.Synchronous = true
ctx2, cancel := context.WithTimeout(ctx, 12*time.Second)
defer cancel()

var numAcked int32
sub.Receive(ctx2, func(_ context.Context, msg *Message) {
// Create artificial constraints on message processing time.
if string(msg.Data) == "message-1" {
time.Sleep(10 * time.Second)
} else {
time.Sleep(5 * time.Second)
}
msg.Ack()
atomic.AddInt32(&numAcked, 1)
})
if sub.enableOrdering != enableMessageOrdering {
t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering)
}
// If the messages were received on a subscription with the EnableMessageOrdering=true,
// total processing would exceed the timeout and only one message would be processed.
if numAcked < 2 {
t.Fatalf("did not process all messages in time, numAcked: %d", numAcked)
}
}

func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down
14 changes: 12 additions & 2 deletions pubsub/pullstream_test.go
Expand Up @@ -101,10 +101,20 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) {
t.Fatal(err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, "foo")
if err != nil {
t.Fatal(err)
}
sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatal(err)
}

errc := make(chan error)
go func() {
errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) {
errc <- sub.Receive(ctx, func(context.Context, *Message) {
t.Error("should not have received any data")
})
}()
Expand All @@ -118,7 +128,7 @@ func TestPullStreamGet_ResourceUnavailable(t *testing.T) {
t.Fatal("expected to receive a grpc ResourceExhausted error")
}
} else {
t.Fatal("expected to receive a grpc ResourceExhausted error")
t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err)
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion pubsub/subscription.go
Expand Up @@ -47,6 +47,8 @@ type Subscription struct {

mu sync.Mutex
receiveActive bool

enableOrdering bool
}

// Subscription creates a reference to a subscription.
Expand Down Expand Up @@ -772,6 +774,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

// Check config to check EnableMessageOrdering field.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
cfg, err := s.Config(ctx)
if err != nil {
return fmt.Errorf("sub.Config err: %v", err)
}
s.enableOrdering = cfg.EnableMessageOrdering

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This page does not explicitly call out why we're setting EnableMessageOrdering, aside from the code sample, so I'm inclined to leave this out.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

My comment was more like - look there is an example where this property is mentioned.

This was a good read too:
https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8

These two properties allow publishers and subscribers to decide independently if messages are ordered. If the publisher does not specify ordering keys with messages or the subscriber does not enable ordered delivery, then message delivery is not in order and behaves just like Cloud Pub/Sub without the ordered delivery feature. Not all subscriptions on a topic need to have the same setting for enable_message_ordering. Therefore, different use cases that receive the same messages can determine if they need ordered delivery without impacting each other.


maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
Expand Down Expand Up @@ -901,9 +911,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old(ackID, ack, receiveTime)
}
wg.Add(1)
// Make sure the subscription has ordering enabled before adding to scheduler.
var key string
if s.enableOrdering {
key = msg.OrderingKey
}
// TODO(deklerk): Can we have a generic handler at the
// constructor level?
if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) {
if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
f(ctx2, msg.(*Message))
}); err != nil {
Expand Down