From 88e546600c7d4f7570530aa72355f51f44187890 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 2 Dec 2020 14:54:24 +1100 Subject: [PATCH] feat(pubsublite): routing publisher implementation (#3277) routingPublisher routes messages to multiple singlePartitionPublishers and handles the topic partition count increasing. partitionCountWatcher polls for updates to the number of topic partitions. --- pubsublite/internal/wire/partition_count.go | 141 +++++ .../internal/wire/partition_count_test.go | 124 ++++ pubsublite/internal/wire/periodic_task.go | 2 +- pubsublite/internal/wire/publisher.go | 139 ++++- pubsublite/internal/wire/publisher_test.go | 588 ++++++++++++++++++ pubsublite/internal/wire/rpc.go | 21 + pubsublite/internal/wire/settings.go | 5 + 7 files changed, 1017 insertions(+), 3 deletions(-) create mode 100644 pubsublite/internal/wire/partition_count.go create mode 100644 pubsublite/internal/wire/partition_count_test.go diff --git a/pubsublite/internal/wire/partition_count.go b/pubsublite/internal/wire/partition_count.go new file mode 100644 index 00000000000..d1e724a4e3e --- /dev/null +++ b/pubsublite/internal/wire/partition_count.go @@ -0,0 +1,141 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain p copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "context" + "fmt" + + vkit "cloud.google.com/go/pubsublite/apiv1" + gax "github.com/googleapis/gax-go/v2" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// partitionCountReceiver receives updated partition counts. Calls are +// non-overlapping. +type partitionCountReceiver func(partitionCount int) + +// partitionCountWatcher periodically retrieves the number of partitions for a +// topic and notifies a receiver if it changes. +type partitionCountWatcher struct { + // Immutable after creation. + ctx context.Context + adminClient *vkit.AdminClient + topicPath string + receiver partitionCountReceiver + callOption gax.CallOption + + // Fields below must be guarded with mu. + partitionCount int + pollUpdate *periodicTask + + abstractService +} + +func newPartitionCountWatcher(ctx context.Context, adminClient *vkit.AdminClient, + settings PublishSettings, topicPath string, receiver partitionCountReceiver) *partitionCountWatcher { + + p := &partitionCountWatcher{ + ctx: ctx, + adminClient: adminClient, + topicPath: topicPath, + receiver: receiver, + callOption: retryableReadOnlyCallOption(), + } + + // Polling the topic partition count can be disabled in settings if the period + // is <= 0. + backgroundTask := p.updatePartitionCount + if settings.ConfigPollPeriod <= 0 { + backgroundTask = func() {} + } + p.pollUpdate = newPeriodicTask(settings.ConfigPollPeriod, backgroundTask) + return p +} + +// Start retrieves the first topic partition count asynchronously. +func (p *partitionCountWatcher) Start() { + p.mu.Lock() + defer p.mu.Unlock() + + if p.unsafeUpdateStatus(serviceStarting, nil) { + go p.updatePartitionCount() + } +} + +// Stop background polling for partition count updates. +func (p *partitionCountWatcher) Stop() { + p.mu.Lock() + defer p.mu.Unlock() + p.unsafeInitiateShutdown(nil) +} + +// updatePartitionCount is called in a goroutine. +func (p *partitionCountWatcher) updatePartitionCount() { + p.mu.Lock() + prevPartitionCount := p.partitionCount + p.mu.Unlock() + + newPartitionCount, err := func() (int, error) { + req := &pb.GetTopicPartitionsRequest{Name: p.topicPath} + resp, err := p.adminClient.GetTopicPartitions(p.ctx, req, p.callOption) + + p.mu.Lock() + defer p.mu.Unlock() + + if p.status >= serviceTerminating { + // Returning the current partition count here ensures that the receiver + // func will not be invoked below. + return p.partitionCount, nil + } + if err != nil { + err = fmt.Errorf("pubsublite: failed to update topic partition count: %v", err) + p.unsafeInitiateShutdown(err) + return 0, err + } + if resp.GetPartitionCount() <= 0 { + err := fmt.Errorf("pubsublite: topic has invalid number of partitions %d", resp.GetPartitionCount()) + p.unsafeInitiateShutdown(err) + return 0, err + } + + p.partitionCount = int(resp.GetPartitionCount()) + return p.partitionCount, nil + }() + + if err == nil && prevPartitionCount != newPartitionCount { + p.receiver(newPartitionCount) + + if prevPartitionCount == 0 { + p.onStartupComplete() + } + } +} + +func (p *partitionCountWatcher) onStartupComplete() { + p.mu.Lock() + defer p.mu.Unlock() + + // Set the watcher as active and start background polling updates after the + // initial partition count has been processed. + if p.unsafeUpdateStatus(serviceActive, nil) { + p.pollUpdate.Start() + } +} + +func (p *partitionCountWatcher) unsafeInitiateShutdown(err error) { + if p.unsafeUpdateStatus(serviceTerminated, err) { + p.pollUpdate.Stop() + } +} diff --git a/pubsublite/internal/wire/partition_count_test.go b/pubsublite/internal/wire/partition_count_test.go new file mode 100644 index 00000000000..09177e578f3 --- /dev/null +++ b/pubsublite/internal/wire/partition_count_test.go @@ -0,0 +1,124 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "context" + "testing" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/internal/test" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type testPartitionCountWatcher struct { + t *testing.T + watcher *partitionCountWatcher + gotPartitionCounts []int + + serviceTestProxy +} + +func (tw *testPartitionCountWatcher) onCountChanged(partitionCount int) { + tw.gotPartitionCounts = append(tw.gotPartitionCounts, partitionCount) +} + +func (tw *testPartitionCountWatcher) VerifyCounts(want []int) { + if !testutil.Equal(tw.gotPartitionCounts, want) { + tw.t.Errorf("partition counts: got %v, want %v", tw.gotPartitionCounts, want) + } +} + +func (tw *testPartitionCountWatcher) UpdatePartitionCount() { + tw.watcher.updatePartitionCount() +} + +func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings PublishSettings) *testPartitionCountWatcher { + ctx := context.Background() + adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + tw := &testPartitionCountWatcher{ + t: t, + } + tw.watcher = newPartitionCountWatcher(ctx, adminClient, testPublishSettings(), topicPath, tw.onCountChanged) + tw.initAndStart(t, tw.watcher, "PartitionCountWatcher") + return tw +} + +func TestPartitionCountWatcherRetries(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + wantPartitionCount := 2 + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.Unavailable, "retryable")) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.ResourceExhausted, "retryable")) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount), nil) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings()) + if gotErr := watcher.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + watcher.VerifyCounts([]int{wantPartitionCount}) + watcher.StopVerifyNoError() +} + +func TestPartitionCountWatcherZeroPartitionCountFails(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings()) + if gotErr, wantMsg := watcher.StartError(), "invalid number of partitions 0"; !test.ErrorHasMsg(gotErr, wantMsg) { + t.Errorf("Start() got err: (%v), want msg: (%q)", gotErr, wantMsg) + } + watcher.VerifyCounts(nil) +} + +func TestPartitionCountWatcherPartitionCountUnchanged(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + wantPartitionCount1 := 4 + wantPartitionCount2 := 6 + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount1), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount1), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount2), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount2), nil) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings()) + if gotErr := watcher.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + watcher.VerifyCounts([]int{wantPartitionCount1}) // Initial count + + // Simulate 3 background updates. + watcher.UpdatePartitionCount() + watcher.UpdatePartitionCount() + watcher.UpdatePartitionCount() + watcher.VerifyCounts([]int{wantPartitionCount1, wantPartitionCount2}) + watcher.StopVerifyNoError() +} diff --git a/pubsublite/internal/wire/periodic_task.go b/pubsublite/internal/wire/periodic_task.go index dd4e3872bf2..fe869ba7197 100644 --- a/pubsublite/internal/wire/periodic_task.go +++ b/pubsublite/internal/wire/periodic_task.go @@ -35,7 +35,7 @@ func newPeriodicTask(period time.Duration, task func()) *periodicTask { // Start the polling goroutine. No-op if the goroutine is already running. // The task is executed after the polling period. func (pt *periodicTask) Start() { - if pt.ticker != nil { + if pt.ticker != nil || pt.period <= 0 { return } diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index 8c5e9adbed1..bdff3f138a7 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -16,8 +16,12 @@ package wire import ( "context" "errors" + "fmt" + "math/rand" "reflect" + "time" + "google.golang.org/api/option" "google.golang.org/grpc" vkit "cloud.google.com/go/pubsublite/apiv1" @@ -27,6 +31,7 @@ import ( var ( errInvalidInitialPubResponse = errors.New("pubsublite: first response from server was not an initial response for publish") errInvalidMsgPubResponse = errors.New("pubsublite: received invalid publish response from server") + errDecreasingPartitions = errors.New("pubsublite: publisher does not support decreasing topic partition count") ) // singlePartitionPublisher publishes messages to a single topic partition. @@ -109,9 +114,17 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ defer pp.mu.Unlock() processMessage := func() error { - if err := pp.unsafeCheckServiceStatus(); err != nil { - return err + // Messages are accepted while the service is starting up or active. During + // startup, messages are queued in the batcher and will be published once + // the stream connects. If startup fails, the error will be set for the + // queued messages. + switch { + case pp.status == serviceUninitialized: + return ErrServiceUninitialized + case pp.status >= serviceTerminating: + return ErrServiceStopped } + if err := pp.batcher.AddMessage(msg, onResult); err != nil { return err } @@ -257,3 +270,125 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() { pp.stream.Stop() } } + +// routingPublisher publishes messages to multiple topic partitions, each +// managed by a singlePartitionPublisher. It supports increasing topic partition +// count, but not decreasing. +type routingPublisher struct { + // Immutable after creation. + msgRouterFactory *messageRouterFactory + pubFactory *singlePartitionPublisherFactory + partitionWatcher *partitionCountWatcher + + // Fields below must be guarded with mu. + msgRouter messageRouter + publishers []*singlePartitionPublisher + + compositeService +} + +func newRoutingPublisher(adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher { + pub := &routingPublisher{ + msgRouterFactory: msgRouterFactory, + pubFactory: pubFactory, + } + pub.init() + pub.partitionWatcher = newPartitionCountWatcher(pubFactory.ctx, adminClient, pubFactory.settings, pubFactory.topicPath, pub.onPartitionCountChanged) + pub.unsafeAddServices(pub.partitionWatcher) + return pub +} + +func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.status >= serviceTerminating { + return + } + if partitionCount == len(rp.publishers) { + return + } + if partitionCount < len(rp.publishers) { + rp.unsafeInitiateShutdown(serviceTerminating, errDecreasingPartitions) + return + } + + prevPartitionCount := len(rp.publishers) + for i := prevPartitionCount; i < partitionCount; i++ { + pub := rp.pubFactory.New(i) + rp.publishers = append(rp.publishers, pub) + rp.unsafeAddServices(pub) + } + rp.msgRouter = rp.msgRouterFactory.New(partitionCount) +} + +func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) { + pub, err := rp.routeToPublisher(msg) + if err != nil { + onResult(nil, err) + return + } + pub.Publish(msg, onResult) +} + +func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) { + rp.mu.Lock() + defer rp.mu.Unlock() + + if err := rp.unsafeCheckServiceStatus(); err != nil { + return nil, err + } + if rp.msgRouter == nil { + // Should not occur. + rp.unsafeInitiateShutdown(serviceTerminating, ErrServiceUninitialized) + return nil, ErrServiceUninitialized + } + + partition := rp.msgRouter.Route(msg.GetKey()) + if partition >= len(rp.publishers) { + // Should not occur. + err := fmt.Errorf("pubsublite: publisher not found for partition %d", partition) + rp.unsafeInitiateShutdown(serviceTerminating, err) + return nil, err + } + return rp.publishers[partition], nil +} + +// Publisher is the client interface exported from this package for publishing +// messages. +type Publisher interface { + Publish(*pb.PubSubMessage, PublishResultFunc) + + Start() + WaitStarted() error + Stop() + WaitStopped() error + Error() error +} + +// NewPublisher creates a new client for publishing messages. +func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPath string, opts ...option.ClientOption) (Publisher, error) { + if err := ValidateRegion(region); err != nil { + return nil, err + } + if err := validatePublishSettings(settings); err != nil { + return nil, err + } + pubClient, err := newPublisherClient(ctx, region, opts...) + if err != nil { + return nil, err + } + adminClient, err := NewAdminClient(ctx, region, opts...) + if err != nil { + return nil, err + } + + msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano()))) + pubFactory := &singlePartitionPublisherFactory{ + ctx: ctx, + pubClient: pubClient, + settings: settings, + topicPath: topicPath, + } + return newRoutingPublisher(adminClient, msgRouterFactory, pubFactory), nil +} diff --git a/pubsublite/internal/wire/publisher_test.go b/pubsublite/internal/wire/publisher_test.go index 7c5e2450fb8..cde0b1ffed8 100644 --- a/pubsublite/internal/wire/publisher_test.go +++ b/pubsublite/internal/wire/publisher_test.go @@ -16,6 +16,7 @@ package wire import ( "bytes" "context" + "math/rand" "testing" "time" @@ -33,6 +34,8 @@ func testPublishSettings() PublishSettings { // Send messages with minimal delay to speed up tests. settings.DelayThreshold = time.Millisecond settings.Timeout = 5 * time.Second + // Disable topic partition count background polling. + settings.ConfigPollPeriod = 0 return settings } @@ -445,3 +448,588 @@ func TestSinglePartitionPublisherStopFlushesMessages(t *testing.T) { t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, finalErr) } } + +func TestSinglePartitionPublisherPublishWhileStarting(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + msg := &pb.PubSubMessage{Data: []byte{'1'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + stream.Push(msgPubReq(msg), msgPubResp(42), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + + // Did not wait for publisher to finish startup. But it should send msg once + // the Publish stream connects. + result := pub.Publish(msg) + result.ValidateResult(topic.Partition, 42) + + pub.StopVerifyNoError() +} + +func TestSinglePartitionPublisherPublishWhileStartingFails(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + msg := &pb.PubSubMessage{Data: []byte{'1'}} + serverErr := status.Error(codes.FailedPrecondition, "failed") + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + barrier := stream.PushWithBarrier(initPubReq(topic), nil, serverErr) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + + // Published during startup. + result := pub.Publish(msg) + // Send the initial response (with error) to complete startup. + barrier.Release() + + result.ValidateError(serverErr) + if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, serverErr) + } +} + +// testRoutingPublisher wraps a routingPublisher for testing. +type testRoutingPublisher struct { + t *testing.T + pub *routingPublisher +} + +func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher { + ctx := context.Background() + pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + + source := &test.FakeSource{Ret: fakeSourceVal} + msgRouterFactory := newMessageRouterFactory(rand.New(source)) + pubFactory := &singlePartitionPublisherFactory{ + ctx: ctx, + pubClient: pubClient, + settings: settings, + topicPath: topicPath, + } + pub := newRoutingPublisher(adminClient, msgRouterFactory, pubFactory) + pub.Start() + return &testRoutingPublisher{t: t, pub: pub} +} + +func (tp *testRoutingPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver { + result := newTestPublishResultReceiver(tp.t, msg) + tp.pub.Publish(msg, result.set) + return result +} + +func (tp *testRoutingPublisher) NumPartitionPublishers() int { + tp.pub.mu.Lock() + defer tp.pub.mu.Unlock() + return len(tp.pub.publishers) +} + +func (tp *testRoutingPublisher) Start() { tp.pub.Start() } +func (tp *testRoutingPublisher) Stop() { tp.pub.Stop() } +func (tp *testRoutingPublisher) WaitStarted() error { return tp.pub.WaitStarted() } +func (tp *testRoutingPublisher) WaitStopped() error { return tp.pub.WaitStopped() } + +func TestRoutingPublisherStartOnce(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + numPartitions := 2 + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + + t.Run("First succeeds", func(t *testing.T) { + // Note: newTestRoutingPublisher() called Start. + if gotErr := pub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + if got, want := pub.NumPartitionPublishers(), numPartitions; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) + t.Run("Second no-op", func(t *testing.T) { + // An error is not returned, but no new streams are opened. The mock server + // does not expect more RPCs. + pub.Start() + if gotErr := pub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + }) + + pub.Stop() + if gotErr := pub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + +func TestRoutingPublisherStartStop(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + numPartitions := 2 + + verifiers := test.NewVerifiers(t) + barrier := verifiers.GlobalVerifier.PushWithBarrier(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub.Stop() + barrier.Release() + + if gotErr := pub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } + // No publishers should be created. + if got, want := pub.NumPartitionPublishers(), 0; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } +} + +func TestRoutingPublisherRoundRobin(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + numPartitions := 3 + + // Messages have no ordering key, so the roundRobinMsgRouter is used. + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + msg4 := &pb.PubSubMessage{Data: []byte{'4'}} + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + + // Partition 0 + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + stream0.Push(msgPubReq(msg3), msgPubResp(34), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + // Partition 1 + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + stream1.Push(msgPubReq(msg1), msgPubResp(41), nil) + stream1.Push(msgPubReq(msg4), msgPubResp(42), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + // Partition 2 + stream2 := test.NewRPCVerifier(t) + stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) + stream2.Push(msgPubReq(msg2), msgPubResp(78), nil) + verifiers.AddPublishStream(topic, 2, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + // Note: The fake source is initialized with value=1, so Partition=1 publisher + // will be the first chosen by the roundRobinMsgRouter. + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 1) + if err := pub.WaitStarted(); err != nil { + t.Errorf("Start() got err: (%v)", err) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result3 := pub.Publish(msg3) + result4 := pub.Publish(msg4) + pub.Stop() + + result1.ValidateResult(1, 41) + result2.ValidateResult(2, 78) + result3.ValidateResult(0, 34) + result4.ValidateResult(1, 42) + + if err := pub.WaitStopped(); err != nil { + t.Errorf("Stop() got err: (%v)", err) + } +} + +func TestRoutingPublisherHashing(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + numPartitions := 3 + + key0 := []byte("bar") // hashes to partition 0 + key1 := []byte("baz") // hashes to partition 1 + key2 := []byte("foo") // hashes to partition 2 + + // Messages have ordering key, so the hashingMsgRouter is used. + msg1 := &pb.PubSubMessage{Data: []byte{'1'}, Key: key2} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}, Key: key0} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key2} + msg4 := &pb.PubSubMessage{Data: []byte{'4'}, Key: key1} + msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key0} + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + + // Partition 0 + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + stream0.Push(msgPubReq(msg2), msgPubResp(20), nil) + stream0.Push(msgPubReq(msg5), msgPubResp(21), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + // Partition 1 + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + stream1.Push(msgPubReq(msg4), msgPubResp(40), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + // Partition 2 + stream2 := test.NewRPCVerifier(t) + stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) + stream2.Push(msgPubReq(msg1), msgPubResp(10), nil) + stream2.Push(msgPubReq(msg3), msgPubResp(11), nil) + verifiers.AddPublishStream(topic, 2, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + if err := pub.WaitStarted(); err != nil { + t.Errorf("Start() got err: (%v)", err) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result3 := pub.Publish(msg3) + result4 := pub.Publish(msg4) + result5 := pub.Publish(msg5) + + result1.ValidateResult(2, 10) + result2.ValidateResult(0, 20) + result3.ValidateResult(2, 11) + result4.ValidateResult(1, 40) + result5.ValidateResult(0, 21) + + pub.Stop() + if err := pub.WaitStopped(); err != nil { + t.Errorf("Stop() got err: (%v)", err) + } +} + +func TestRoutingPublisherPermanentError(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + numPartitions := 2 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + serverErr := status.Error(codes.FailedPrecondition, "failed") + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + + // Partition 0 + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + stream0.Push(msgPubReq(msg1), msgPubResp(34), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + // Partition 1. Fails due to permanent error, which will also shut down + // partition-0 publisher, but it should be allowed to flush its pending + // messages. + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + stream1.Push(msgPubReq(msg2), nil, serverErr) + verifiers.AddPublishStream(topic, 1, stream1) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + if err := pub.WaitStarted(); err != nil { + t.Errorf("Start() got err: (%v)", err) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + + result1.ValidateResult(0, 34) + result2.ValidateError(serverErr) + + if gotErr := pub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { + t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) + } +} + +func TestRoutingPublisherPublishAfterStop(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + numPartitions := 2 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + + // Partition 0 + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + // Partition 1 + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + if err := pub.WaitStarted(); err != nil { + t.Errorf("Start() got err: (%v)", err) + } + + pub.Stop() + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + + result1.ValidateError(ErrServiceStopped) + result2.ValidateError(ErrServiceStopped) + + if err := pub.WaitStopped(); err != nil { + t.Errorf("Stop() got err: (%v)", err) + } +} + +func TestRoutingPublisherPartitionCountFail(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + wantErr := status.Error(codes.NotFound, "no exist") + + // Retrieving the number of partitions results in an error. Startup cannot + // proceed. + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, wantErr) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + + if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantErr.Error()) { + t.Errorf("Start() got err: (%v), want err: (%v)", gotErr, wantErr) + } + if got, want := pub.NumPartitionPublishers(), 0; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + + // Verify that the publisher does not attempt to restart. The mock server does + // not expect more RPCs. + pub.Start() +} + +func TestRoutingPublisherPartitionCountInvalid(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + + // The number of partitions returned by the server must be valid, otherwise + // startup cannot proceed. + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + + wantMsg := "topic has invalid number of partitions" + if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantMsg) { + t.Errorf("Start() got err: (%v), want msg: %q", gotErr, wantMsg) + } + if got, want := pub.NumPartitionPublishers(), 0; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } +} + +func TestRoutingPublisherPartitionCountIncreases(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + initialPartitionCount := 1 + updatedPartitionCount := 3 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) + + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + stream0.Push(msgPubReq(msg1), msgPubResp(11), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + stream1.Push(msgPubReq(msg2), msgPubResp(22), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + stream2 := test.NewRPCVerifier(t) + stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) + stream2.Push(msgPubReq(msg3), msgPubResp(33), nil) + verifiers.AddPublishStream(topic, 2, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + + t.Run("Initial count", func(t *testing.T) { + if gotErr := pub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) + t.Run("Updated count", func(t *testing.T) { + pub.pub.partitionWatcher.updatePartitionCount() + if got, want := pub.NumPartitionPublishers(), updatedPartitionCount; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) + t.Run("Publish", func(t *testing.T) { + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result3 := pub.Publish(msg3) + + result1.ValidateResult(0, 11) + result2.ValidateResult(1, 22) + result3.ValidateResult(2, 33) + }) + + pub.Stop() + if gotErr := pub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + +func TestRoutingPublisherPartitionCountDecreases(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + initialPartitionCount := 2 + updatedPartitionCount := 1 + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) + + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + + t.Run("Initial count", func(t *testing.T) { + if gotErr := pub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) + t.Run("Updated count", func(t *testing.T) { + pub.pub.partitionWatcher.updatePartitionCount() + + // Decreasing count unsupported. Terminates the routingPublisher. + if gotErr := pub.WaitStopped(); !test.ErrorEqual(gotErr, errDecreasingPartitions) { + t.Errorf("Final error got: (%v), want: (%v)", gotErr, errDecreasingPartitions) + } + if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) +} + +func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + initialPartitionCount := 2 + serverErr := status.Error(codes.NotFound, "deleted") + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) + verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, serverErr) + + stream0 := test.NewRPCVerifier(t) + stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 0, stream0) + + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + verifiers.AddPublishStream(topic, 1, stream1) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + + t.Run("Initial count", func(t *testing.T) { + if gotErr := pub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) + t.Run("Failed update", func(t *testing.T) { + pub.pub.partitionWatcher.updatePartitionCount() + + // Failed background update terminates the routingPublisher. + if gotErr := pub.WaitStopped(); !test.ErrorHasMsg(gotErr, serverErr.Error()) { + t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) + } + if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { + t.Errorf("Num partition publishers: got %d, want %d", got, want) + } + }) +} + +func TestNewPublisherCreatesImpl(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + const region = "us-central1" + + if pub, err := NewPublisher(context.Background(), DefaultPublishSettings, region, topic); err != nil { + t.Errorf("NewPublisher() got error: %v", err) + } else if _, ok := pub.(*routingPublisher); !ok { + t.Error("NewPublisher() did not return a routingPublisher") + } +} + +func TestNewPublisherValidatesSettings(t *testing.T) { + const topic = "projects/123456/locations/us-central1-b/topics/my-topic" + const region = "us-central1" + + settings := DefaultPublishSettings + settings.DelayThreshold = 0 + if _, err := NewPublisher(context.Background(), settings, region, topic); err == nil { + t.Error("NewPublisher() did not return error") + } +} diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index 2dad85cb537..17ad9d05bdc 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -106,6 +106,27 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool { return isEligible(s.Code()) } +// retryableReadOnlyCallOption returns a call option that retries with backoff +// for ResourceExhausted in addition to other default retryable codes for +// Pub/Sub. Suitable for read-only operations which are subject to only QPS +// quota limits. +func retryableReadOnlyCallOption() gax.CallOption { + return gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.Aborted, + codes.DeadlineExceeded, + codes.Internal, + codes.ResourceExhausted, + codes.Unavailable, + codes.Unknown, + }, gax.Backoff{ + Initial: 100 * time.Millisecond, + Max: 60 * time.Second, + Multiplier: 1.3, + }) + }) +} + const ( pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" routingMetadataHeader = "x-goog-request-params" diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index 3486729db6d..e2cc5699746 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -66,6 +66,10 @@ type PublishSettings struct { // throughput capacity can cause the buffers to overflow. For more // information, see https://cloud.google.com/pubsub/lite/docs/topics. BufferedByteLimit int + + // The polling interval to watch for topic partition count updates. Set to 0 + // to disable polling if the number of partitions will never update. + ConfigPollPeriod time.Duration } // DefaultPublishSettings holds the default values for PublishSettings. @@ -77,6 +81,7 @@ var DefaultPublishSettings = PublishSettings{ // By default set to a high limit that is not likely to occur, but prevents // OOM errors in clients. BufferedByteLimit: 1 << 30, // 1 GiB + ConfigPollPeriod: 10 * time.Minute, } func validatePublishSettings(settings PublishSettings) error {