Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): routing publisher implementation #3277

Merged
merged 8 commits into from Dec 2, 2020
138 changes: 138 additions & 0 deletions pubsublite/internal/wire/partition_count.go
@@ -0,0 +1,138 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain p copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"fmt"

vkit "cloud.google.com/go/pubsublite/apiv1"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// partitionCountReceiver receives updated partition counts. Calls are
// non-overlapping.
type partitionCountReceiver func(partitionCount int)

// partitionCountWatcher periodically retrieves the number of partitions for a
// topic and notifies a receiver if it changes.
type partitionCountWatcher struct {
// Immutable after creation.
ctx context.Context
adminClient *vkit.AdminClient
topicPath string
receiver partitionCountReceiver
callOption gax.CallOption

// Fields below must be guarded with mu.
partitionCount int
pollUpdate *periodicTask

abstractService
}

func newPartitionCountWatcher(ctx context.Context, adminClient *vkit.AdminClient,
settings PublishSettings, topicPath string, receiver partitionCountReceiver) *partitionCountWatcher {

p := &partitionCountWatcher{
ctx: ctx,
adminClient: adminClient,
topicPath: topicPath,
receiver: receiver,
callOption: retryableReadOnlyCallOption(),
}

// Polling the topic partition count can be disabled in settings if the period
// is <= 0.
backgroundTask := p.updatePartitionCount
if settings.ConfigPollPeriod <= 0 {
backgroundTask = func() {}
}
p.pollUpdate = newPeriodicTask(settings.ConfigPollPeriod, backgroundTask)
return p
}

// Start retrieves the first topic partition count asynchronously.
func (p *partitionCountWatcher) Start() {
p.mu.Lock()
defer p.mu.Unlock()

if p.unsafeUpdateStatus(serviceStarting, nil) {
go p.updatePartitionCount()
}
}

// Stop background polling for partition count updates.
func (p *partitionCountWatcher) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
p.unsafeInitiateShutdown(serviceTerminated, nil)
}

// updatePartitionCount is called in a goroutine.
func (p *partitionCountWatcher) updatePartitionCount() {
p.mu.Lock()
prevPartitionCount := p.partitionCount
p.mu.Unlock()

newPartitionCount, err := func() (int, error) {
req := &pb.GetTopicPartitionsRequest{Name: p.topicPath}
resp, err := p.adminClient.GetTopicPartitions(p.ctx, req, p.callOption)

p.mu.Lock()
defer p.mu.Unlock()

if p.status >= serviceTerminating {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
return p.partitionCount, nil
}
if err != nil {
err = fmt.Errorf("pubsublite: failed to update topic partition count: %v", err)
p.unsafeInitiateShutdown(serviceTerminated, err)
return 0, err
}
if resp.GetPartitionCount() <= 0 {
err := fmt.Errorf("pubsublite: topic has invalid number of partitions %d", resp.GetPartitionCount())
p.unsafeInitiateShutdown(serviceTerminated, err)
return 0, err
}

p.partitionCount = int(resp.GetPartitionCount())
return p.partitionCount, nil
}()

if err == nil && prevPartitionCount != newPartitionCount {
p.receiver(newPartitionCount)

if prevPartitionCount == 0 {
p.onStartupComplete()
}
}
}

func (p *partitionCountWatcher) onStartupComplete() {
p.mu.Lock()
defer p.mu.Unlock()

// Notify the service as active and start background polling updates after the
// initial partition count has been processed.
if p.unsafeUpdateStatus(serviceActive, nil) {
p.pollUpdate.Start()
}
}

func (p *partitionCountWatcher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
p.unsafeUpdateStatus(targetStatus, err)
p.pollUpdate.Stop()
}
124 changes: 124 additions & 0 deletions pubsublite/internal/wire/partition_count_test.go
@@ -0,0 +1,124 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"testing"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type testPartitionCountWatcher struct {
t *testing.T
watcher *partitionCountWatcher
gotPartitionCounts []int

serviceTestProxy
}

func (tw *testPartitionCountWatcher) onCountChanged(partitionCount int) {
tw.gotPartitionCounts = append(tw.gotPartitionCounts, partitionCount)
}

func (tw *testPartitionCountWatcher) VerifyCounts(want []int) {
if !testutil.Equal(tw.gotPartitionCounts, want) {
tw.t.Errorf("partition counts: got %v, want %v", tw.gotPartitionCounts, want)
}
}

func (tw *testPartitionCountWatcher) UpdatePartitionCount() {
tw.watcher.updatePartitionCount()
}

func newTestPartitionCountWatcher(t *testing.T, topicPath string, settings PublishSettings) *testPartitionCountWatcher {
ctx := context.Background()
adminClient, err := NewAdminClient(ctx, "ignored", testClientOpts...)
if err != nil {
t.Fatal(err)
}
tw := &testPartitionCountWatcher{
t: t,
}
tw.watcher = newPartitionCountWatcher(ctx, adminClient, testPublishSettings(), topicPath, tw.onCountChanged)
tw.initAndStart(t, tw.watcher, "PartitionCountWatcher")
return tw
}

func TestPartitionCountWatcherRetries(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
wantPartitionCount := 2

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.Unavailable, "retryable"))
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.ResourceExhausted, "retryable"))
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount), nil)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr := watcher.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
watcher.VerifyCounts([]int{wantPartitionCount})
watcher.StopVerifyNoError()
}

func TestPartitionCountWatcherZeroPartitionCountFails(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr, wantMsg := watcher.StartError(), "invalid number of partitions 0"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("Start() got err: (%v), want msg: (%q)", gotErr, wantMsg)
}
watcher.VerifyCounts(nil)
}

func TestPartitionCountWatcherPartitionCountUnchanged(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
wantPartitionCount1 := 4
wantPartitionCount2 := 6

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount1), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount1), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount2), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount2), nil)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr := watcher.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
watcher.VerifyCounts([]int{wantPartitionCount1}) // Initial count

// Simulate 3 background updates.
watcher.UpdatePartitionCount()
watcher.UpdatePartitionCount()
watcher.UpdatePartitionCount()
watcher.VerifyCounts([]int{wantPartitionCount1, wantPartitionCount2})
watcher.StopVerifyNoError()
}
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/periodic_task.go
Expand Up @@ -35,7 +35,7 @@ func newPeriodicTask(period time.Duration, task func()) *periodicTask {
// Start the polling goroutine. No-op if the goroutine is already running.
// The task is executed after the polling period.
func (pt *periodicTask) Start() {
if pt.ticker != nil {
if pt.ticker != nil || pt.period <= 0 {
return
}

Expand Down