Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): routing publisher implementation (#3277)
routingPublisher routes messages to multiple singlePartitionPublishers and handles the topic partition count increasing.
partitionCountWatcher polls for updates to the number of topic partitions.
  • Loading branch information
tmdiep committed Dec 2, 2020
1 parent c314f58 commit 88e5466
Show file tree
Hide file tree
Showing 7 changed files with 1,017 additions and 3 deletions.
141 changes: 141 additions & 0 deletions pubsublite/internal/wire/partition_count.go
@@ -0,0 +1,141 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain p copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"fmt"

vkit "cloud.google.com/go/pubsublite/apiv1"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// partitionCountReceiver receives updated partition counts. Calls are
// non-overlapping.
type partitionCountReceiver func(partitionCount int)

// partitionCountWatcher periodically retrieves the number of partitions for a
// topic and notifies a receiver if it changes.
type partitionCountWatcher struct {
// Immutable after creation.
ctx context.Context
adminClient *vkit.AdminClient
topicPath string
receiver partitionCountReceiver
callOption gax.CallOption

// Fields below must be guarded with mu.
partitionCount int
pollUpdate *periodicTask

abstractService
}

func newPartitionCountWatcher(ctx context.Context, adminClient *vkit.AdminClient,
settings PublishSettings, topicPath string, receiver partitionCountReceiver) *partitionCountWatcher {

p := &partitionCountWatcher{
ctx: ctx,
adminClient: adminClient,
topicPath: topicPath,
receiver: receiver,
callOption: retryableReadOnlyCallOption(),
}

// Polling the topic partition count can be disabled in settings if the period
// is <= 0.
backgroundTask := p.updatePartitionCount
if settings.ConfigPollPeriod <= 0 {
backgroundTask = func() {}
}
p.pollUpdate = newPeriodicTask(settings.ConfigPollPeriod, backgroundTask)
return p
}

// Start retrieves the first topic partition count asynchronously.
func (p *partitionCountWatcher) Start() {
p.mu.Lock()
defer p.mu.Unlock()

if p.unsafeUpdateStatus(serviceStarting, nil) {
go p.updatePartitionCount()
}
}

// Stop background polling for partition count updates.
func (p *partitionCountWatcher) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
p.unsafeInitiateShutdown(nil)
}

// updatePartitionCount is called in a goroutine.
func (p *partitionCountWatcher) updatePartitionCount() {
p.mu.Lock()
prevPartitionCount := p.partitionCount
p.mu.Unlock()

newPartitionCount, err := func() (int, error) {
req := &pb.GetTopicPartitionsRequest{Name: p.topicPath}
resp, err := p.adminClient.GetTopicPartitions(p.ctx, req, p.callOption)

p.mu.Lock()
defer p.mu.Unlock()

if p.status >= serviceTerminating {
// Returning the current partition count here ensures that the receiver
// func will not be invoked below.
return p.partitionCount, nil
}
if err != nil {
err = fmt.Errorf("pubsublite: failed to update topic partition count: %v", err)
p.unsafeInitiateShutdown(err)
return 0, err
}
if resp.GetPartitionCount() <= 0 {
err := fmt.Errorf("pubsublite: topic has invalid number of partitions %d", resp.GetPartitionCount())
p.unsafeInitiateShutdown(err)
return 0, err
}

p.partitionCount = int(resp.GetPartitionCount())
return p.partitionCount, nil
}()

if err == nil && prevPartitionCount != newPartitionCount {
p.receiver(newPartitionCount)

if prevPartitionCount == 0 {
p.onStartupComplete()
}
}
}

func (p *partitionCountWatcher) onStartupComplete() {
p.mu.Lock()
defer p.mu.Unlock()

// Set the watcher as active and start background polling updates after the
// initial partition count has been processed.
if p.unsafeUpdateStatus(serviceActive, nil) {
p.pollUpdate.Start()
}
}

func (p *partitionCountWatcher) unsafeInitiateShutdown(err error) {
if p.unsafeUpdateStatus(serviceTerminated, err) {
p.pollUpdate.Stop()
}
}
124 changes: 124 additions & 0 deletions pubsublite/internal/wire/partition_count_test.go
@@ -0,0 +1,124 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"testing"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type testPartitionCountWatcher struct {
t *testing.T
watcher *partitionCountWatcher
gotPartitionCounts []int

serviceTestProxy
}

func (tw *testPartitionCountWatcher) onCountChanged(partitionCount int) {
tw.gotPartitionCounts = append(tw.gotPartitionCounts, partitionCount)
}

func (tw *testPartitionCountWatcher) VerifyCounts(want []int) {
if !testutil.Equal(tw.gotPartitionCounts, want) {
tw.t.Errorf("partition counts: got %v, want %v", tw.gotPartitionCounts, want)
}
}

func (tw *testPartitionCountWatcher) UpdatePartitionCount() {
tw.watcher.updatePartitionCount()
}

func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings PublishSettings) *testPartitionCountWatcher {
ctx := context.Background()
adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...)
if err != nil {
t.Fatal(err)
}
tw := &testPartitionCountWatcher{
t: t,
}
tw.watcher = newPartitionCountWatcher(ctx, adminClient, testPublishSettings(), topicPath, tw.onCountChanged)
tw.initAndStart(t, tw.watcher, "PartitionCountWatcher")
return tw
}

func TestPartitionCountWatcherRetries(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
wantPartitionCount := 2

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.Unavailable, "retryable"))
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.ResourceExhausted, "retryable"))
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount), nil)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr := watcher.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
watcher.VerifyCounts([]int{wantPartitionCount})
watcher.StopVerifyNoError()
}

func TestPartitionCountWatcherZeroPartitionCountFails(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr, wantMsg := watcher.StartError(), "invalid number of partitions 0"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("Start() got err: (%v), want msg: (%q)", gotErr, wantMsg)
}
watcher.VerifyCounts(nil)
}

func TestPartitionCountWatcherPartitionCountUnchanged(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
wantPartitionCount1 := 4
wantPartitionCount2 := 6

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount1), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount1), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount2), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount2), nil)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr := watcher.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
watcher.VerifyCounts([]int{wantPartitionCount1}) // Initial count

// Simulate 3 background updates.
watcher.UpdatePartitionCount()
watcher.UpdatePartitionCount()
watcher.UpdatePartitionCount()
watcher.VerifyCounts([]int{wantPartitionCount1, wantPartitionCount2})
watcher.StopVerifyNoError()
}
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/periodic_task.go
Expand Up @@ -35,7 +35,7 @@ func newPeriodicTask(period time.Duration, task func()) *periodicTask {
// Start the polling goroutine. No-op if the goroutine is already running.
// The task is executed after the polling period.
func (pt *periodicTask) Start() {
if pt.ticker != nil {
if pt.ticker != nil || pt.period <= 0 {
return
}

Expand Down

0 comments on commit 88e5466

Please sign in to comment.