Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): assigning subscriber implementation #3238

Merged
merged 8 commits into from Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -6,6 +6,7 @@ require (
cloud.google.com/go v0.72.0
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.3
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/tools v0.0.0-20201201064407-fd09bd90d85c // indirect
google.golang.org/api v0.35.0
Expand Down
1 change: 1 addition & 0 deletions pubsublite/go.sum
Expand Up @@ -102,6 +102,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
Expand Down
169 changes: 169 additions & 0 deletions pubsublite/internal/wire/assigner.go
@@ -0,0 +1,169 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"errors"
"fmt"
"reflect"

"github.com/google/uuid"
"google.golang.org/grpc"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// partitionSet is a set of partition numbers.
type partitionSet map[int]struct{}

func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet {
var void struct{}
partitions := make(map[int]struct{})
for _, p := range assignmentpb.GetPartitions() {
partitions[int(p)] = void
}
return partitionSet(partitions)
}

func (ps partitionSet) Ints() (partitions []int) {
for p := range ps {
partitions = append(partitions, p)
}
return
}

func (ps partitionSet) Contains(partition int) bool {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
_, exists := ps[partition]
return exists
}

// A function that generates a 16-byte UUID.
type generateUUIDFunc func() (uuid.UUID, error)
tmdiep marked this conversation as resolved.
Show resolved Hide resolved

// partitionAssignmentReceiver must enact the received partition assignment from
// the server, or otherwise return an error, which will break the stream. The
// receiver must not call the assigner, as this would result in a deadlock.
type partitionAssignmentReceiver func(partitionSet) error

// assigner wraps the partition assignment stream and notifies a receiver when
// the server sends a new set of partition assignments for a subscriber.
type assigner struct {
// Immutable after creation.
assignmentClient *vkit.PartitionAssignmentClient
initialReq *pb.PartitionAssignmentRequest
receiveAssignment partitionAssignmentReceiver

// Fields below must be guarded with mu.
stream *retryableStream

abstractService
}

func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) {
clientID, err := genUUID()
if err != nil {
return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err)
}

a := &assigner{
assignmentClient: assignmentClient,
initialReq: &pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Initial{
Initial: &pb.InitialPartitionAssignmentRequest{
Subscription: subscriptionPath,
ClientId: clientID[:],
},
},
},
receiveAssignment: receiver,
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
return a, nil
}

func (a *assigner) Start() {
a.mu.Lock()
defer a.mu.Unlock()
if a.unsafeUpdateStatus(serviceStarting, nil) {
a.stream.Start()
}
}

func (a *assigner) Stop() {
a.mu.Lock()
defer a.mu.Unlock()
a.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) {
return a.assignmentClient.AssignPartitions(ctx)
}

func (a *assigner) initialRequest() (interface{}, bool) {
return a.initialReq, false // No initial response expected
}

func (a *assigner) validateInitialResponse(_ interface{}) error {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
// Should not be called.
return errors.New("pubsublite: unexpected initial response")
}

func (a *assigner) onStreamStatusChange(status streamStatus) {
a.mu.Lock()
defer a.mu.Unlock()

switch status {
case streamConnected:
a.unsafeUpdateStatus(serviceActive, nil)
case streamTerminated:
a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error())
}
}

func (a *assigner) onResponse(response interface{}) {
a.mu.Lock()
defer a.mu.Unlock()

if a.status >= serviceTerminating {
return
}

assignment, _ := response.(*pb.PartitionAssignment)
if err := a.handleAssignment(assignment); err != nil {
a.unsafeInitiateShutdown(serviceTerminated, err)
}
}

func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil {
return err
}

a.stream.Send(&pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Ack{
Ack: &pb.PartitionAssignmentAck{},
},
})
return nil
}

func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !a.unsafeUpdateStatus(targetStatus, err) {
return
}
// No data to send. Immediately terminate the stream.
a.stream.Stop()
}
200 changes: 200 additions & 0 deletions pubsublite/internal/wire/assigner_test.go
@@ -0,0 +1,200 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"errors"
"sort"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

func TestPartitionSet(t *testing.T) {
partitions := newPartitionSet(&pb.PartitionAssignment{
Partitions: []int64{8, 5, 8, 1},
})

wantPartitions := []int{1, 5, 8}
for _, partition := range wantPartitions {
if !partitions.Contains(partition) {
t.Errorf("Contains(%d) got false, want true", partition)
}
}
for _, partition := range []int{2, 3, 4, 6, 7} {
if partitions.Contains(partition) {
t.Errorf("Contains(%d) got true, want false", partition)
}
}

gotPartitions := partitions.Ints()
sort.Ints(gotPartitions)
if !testutil.Equal(gotPartitions, wantPartitions) {
t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions)
}
}

var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'}

func fakeGenerateUUID() (uuid.UUID, error) {
return fakeUUID, nil
}

// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
// Fake error to simulate receiver unable to handle assignment.
RetError error

t *testing.T
asn *assigner
partitions chan []int

serviceTestProxy
}

func newTestAssigner(t *testing.T, subscription string) *testAssigner {
ctx := context.Background()
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...)
if err != nil {
t.Fatal(err)
}

ta := &testAssigner{
t: t,
partitions: make(chan []int, 1),
}
asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment)
if err != nil {
t.Fatal(err)
}
ta.asn = asn
ta.initAndStart(t, ta.asn, "Assigner")
return ta
}

func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
p := partitions.Ints()
sort.Ints(p)
ta.partitions <- p

if ta.RetError != nil {
return ta.RetError
}
return nil
}

func (ta *testAssigner) NextPartitions() []int {
select {
case <-time.After(serviceTestWaitTimeout):
ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout)
return nil
case p := <-ta.partitions:
return p
}
}

func TestAssignerNoInitialResponse(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil)
verifiers.AddAssignmentStream(subscription, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)

// Assigner starts even though no initial response was received from the
// server.
if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
// To ensure test is deterministic, i.e. server must receive initial request
// before stopping the client.
barrier.Release()
asn.StopVerifyNoError()
}

func TestAssignerReconnect(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
permanentErr := status.Error(codes.FailedPrecondition, "failed")

verifiers := test.NewVerifiers(t)

// Simulate a transient error that results in a reconnect.
stream1 := test.NewRPCVerifier(t)
stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable"))
verifiers.AddAssignmentStream(subscription, stream1)

// Send 2 partition assignments before terminating with permanent error.
stream2 := test.NewRPCVerifier(t)
stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil)
stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil)
stream2.Push(assignmentAckReq(), nil, permanentErr)
verifiers.AddAssignmentStream(subscription, stream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) {
t.Errorf("Partition assignment #1: got %v, want %v", got, want)
}
if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) {
t.Errorf("Partition assignment #2: got %v, want %v", got, want)
}
if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}

func TestAssignerHandlePartitionFailure(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil)
verifiers.AddAssignmentStream(subscription, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)
// Simulates the assigningSubscriber discarding assignments.
asn.RetError = errors.New("subscriber shutting down")

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
t.Errorf("Partition assignments: got %v, want %v", got, want)
}
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError)
}
}