Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(pubsublite): assigning subscriber implementation (#3238)
assigningSubscriber uses the Pub/Sub Lite partition assignment service to listen to its assigned partition numbers and dynamically add/remove singlePartitionSubscribers.
- Loading branch information
Showing
6 changed files
with
715 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
// Copyright 2020 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
|
||
package wire | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"reflect" | ||
|
||
"github.com/google/uuid" | ||
"google.golang.org/grpc" | ||
|
||
vkit "cloud.google.com/go/pubsublite/apiv1" | ||
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" | ||
) | ||
|
||
// partitionSet is a set of partition numbers. | ||
type partitionSet map[int]struct{} | ||
|
||
func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet { | ||
var void struct{} | ||
partitions := make(map[int]struct{}) | ||
for _, p := range assignmentpb.GetPartitions() { | ||
partitions[int(p)] = void | ||
} | ||
return partitionSet(partitions) | ||
} | ||
|
||
func (ps partitionSet) Ints() (partitions []int) { | ||
for p := range ps { | ||
partitions = append(partitions, p) | ||
} | ||
return | ||
} | ||
|
||
func (ps partitionSet) Contains(partition int) bool { | ||
_, exists := ps[partition] | ||
return exists | ||
} | ||
|
||
// A function that generates a 16-byte UUID. | ||
type generateUUIDFunc func() (uuid.UUID, error) | ||
|
||
// partitionAssignmentReceiver must enact the received partition assignment from | ||
// the server, or otherwise return an error, which will break the stream. The | ||
// receiver must not call the assigner, as this would result in a deadlock. | ||
type partitionAssignmentReceiver func(partitionSet) error | ||
|
||
// assigner wraps the partition assignment stream and notifies a receiver when | ||
// the server sends a new set of partition assignments for a subscriber. | ||
type assigner struct { | ||
// Immutable after creation. | ||
assignmentClient *vkit.PartitionAssignmentClient | ||
initialReq *pb.PartitionAssignmentRequest | ||
receiveAssignment partitionAssignmentReceiver | ||
|
||
// Fields below must be guarded with mu. | ||
stream *retryableStream | ||
|
||
abstractService | ||
} | ||
|
||
func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) { | ||
clientID, err := genUUID() | ||
if err != nil { | ||
return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err) | ||
} | ||
|
||
a := &assigner{ | ||
assignmentClient: assignmentClient, | ||
initialReq: &pb.PartitionAssignmentRequest{ | ||
Request: &pb.PartitionAssignmentRequest_Initial{ | ||
Initial: &pb.InitialPartitionAssignmentRequest{ | ||
Subscription: subscriptionPath, | ||
ClientId: clientID[:], | ||
}, | ||
}, | ||
}, | ||
receiveAssignment: receiver, | ||
} | ||
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{})) | ||
return a, nil | ||
} | ||
|
||
func (a *assigner) Start() { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
if a.unsafeUpdateStatus(serviceStarting, nil) { | ||
a.stream.Start() | ||
} | ||
} | ||
|
||
func (a *assigner) Stop() { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
a.unsafeInitiateShutdown(serviceTerminating, nil) | ||
} | ||
|
||
func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) { | ||
return a.assignmentClient.AssignPartitions(ctx) | ||
} | ||
|
||
func (a *assigner) initialRequest() (interface{}, bool) { | ||
return a.initialReq, false // No initial response expected | ||
} | ||
|
||
func (a *assigner) validateInitialResponse(_ interface{}) error { | ||
// Should not be called. | ||
return errors.New("pubsublite: unexpected initial response") | ||
} | ||
|
||
func (a *assigner) onStreamStatusChange(status streamStatus) { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
|
||
switch status { | ||
case streamConnected: | ||
a.unsafeUpdateStatus(serviceActive, nil) | ||
case streamTerminated: | ||
a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error()) | ||
} | ||
} | ||
|
||
func (a *assigner) onResponse(response interface{}) { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
|
||
if a.status >= serviceTerminating { | ||
return | ||
} | ||
|
||
assignment, _ := response.(*pb.PartitionAssignment) | ||
if err := a.handleAssignment(assignment); err != nil { | ||
a.unsafeInitiateShutdown(serviceTerminated, err) | ||
} | ||
} | ||
|
||
func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error { | ||
if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil { | ||
return err | ||
} | ||
|
||
a.stream.Send(&pb.PartitionAssignmentRequest{ | ||
Request: &pb.PartitionAssignmentRequest_Ack{ | ||
Ack: &pb.PartitionAssignmentAck{}, | ||
}, | ||
}) | ||
return nil | ||
} | ||
|
||
func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { | ||
if !a.unsafeUpdateStatus(targetStatus, err) { | ||
return | ||
} | ||
// No data to send. Immediately terminate the stream. | ||
a.stream.Stop() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
// Copyright 2020 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
|
||
package wire | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sort" | ||
"testing" | ||
"time" | ||
|
||
"cloud.google.com/go/internal/testutil" | ||
"cloud.google.com/go/pubsublite/internal/test" | ||
"github.com/google/uuid" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" | ||
) | ||
|
||
func TestPartitionSet(t *testing.T) { | ||
partitions := newPartitionSet(&pb.PartitionAssignment{ | ||
Partitions: []int64{8, 5, 8, 1}, | ||
}) | ||
|
||
wantPartitions := []int{1, 5, 8} | ||
for _, partition := range wantPartitions { | ||
if !partitions.Contains(partition) { | ||
t.Errorf("Contains(%d) got false, want true", partition) | ||
} | ||
} | ||
for _, partition := range []int{2, 3, 4, 6, 7} { | ||
if partitions.Contains(partition) { | ||
t.Errorf("Contains(%d) got true, want false", partition) | ||
} | ||
} | ||
|
||
gotPartitions := partitions.Ints() | ||
sort.Ints(gotPartitions) | ||
if !testutil.Equal(gotPartitions, wantPartitions) { | ||
t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions) | ||
} | ||
} | ||
|
||
var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'} | ||
|
||
func fakeGenerateUUID() (uuid.UUID, error) { | ||
return fakeUUID, nil | ||
} | ||
|
||
// testAssigner wraps an assigner for ease of testing. | ||
type testAssigner struct { | ||
// Fake error to simulate receiver unable to handle assignment. | ||
RetError error | ||
|
||
t *testing.T | ||
asn *assigner | ||
partitions chan []int | ||
|
||
serviceTestProxy | ||
} | ||
|
||
func newTestAssigner(t *testing.T, subscription string) *testAssigner { | ||
ctx := context.Background() | ||
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
ta := &testAssigner{ | ||
t: t, | ||
partitions: make(chan []int, 1), | ||
} | ||
asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
ta.asn = asn | ||
ta.initAndStart(t, ta.asn, "Assigner") | ||
return ta | ||
} | ||
|
||
func (ta *testAssigner) receiveAssignment(partitions partitionSet) error { | ||
p := partitions.Ints() | ||
sort.Ints(p) | ||
ta.partitions <- p | ||
|
||
if ta.RetError != nil { | ||
return ta.RetError | ||
} | ||
return nil | ||
} | ||
|
||
func (ta *testAssigner) NextPartitions() []int { | ||
select { | ||
case <-time.After(serviceTestWaitTimeout): | ||
ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout) | ||
return nil | ||
case p := <-ta.partitions: | ||
return p | ||
} | ||
} | ||
|
||
func TestAssignerNoInitialResponse(t *testing.T) { | ||
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" | ||
|
||
verifiers := test.NewVerifiers(t) | ||
stream := test.NewRPCVerifier(t) | ||
barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil) | ||
verifiers.AddAssignmentStream(subscription, stream) | ||
|
||
mockServer.OnTestStart(verifiers) | ||
defer mockServer.OnTestEnd() | ||
|
||
asn := newTestAssigner(t, subscription) | ||
|
||
// Assigner starts even though no initial response was received from the | ||
// server. | ||
if gotErr := asn.StartError(); gotErr != nil { | ||
t.Errorf("Start() got err: (%v)", gotErr) | ||
} | ||
// To ensure test is deterministic, i.e. server must receive initial request | ||
// before stopping the client. | ||
barrier.Release() | ||
asn.StopVerifyNoError() | ||
} | ||
|
||
func TestAssignerReconnect(t *testing.T) { | ||
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" | ||
permanentErr := status.Error(codes.FailedPrecondition, "failed") | ||
|
||
verifiers := test.NewVerifiers(t) | ||
|
||
// Simulate a transient error that results in a reconnect. | ||
stream1 := test.NewRPCVerifier(t) | ||
stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable")) | ||
verifiers.AddAssignmentStream(subscription, stream1) | ||
|
||
// Send 2 partition assignments before terminating with permanent error. | ||
stream2 := test.NewRPCVerifier(t) | ||
stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil) | ||
stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil) | ||
stream2.Push(assignmentAckReq(), nil, permanentErr) | ||
verifiers.AddAssignmentStream(subscription, stream2) | ||
|
||
mockServer.OnTestStart(verifiers) | ||
defer mockServer.OnTestEnd() | ||
|
||
asn := newTestAssigner(t, subscription) | ||
|
||
if gotErr := asn.StartError(); gotErr != nil { | ||
t.Errorf("Start() got err: (%v)", gotErr) | ||
} | ||
if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) { | ||
t.Errorf("Partition assignment #1: got %v, want %v", got, want) | ||
} | ||
if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) { | ||
t.Errorf("Partition assignment #2: got %v, want %v", got, want) | ||
} | ||
if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) { | ||
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) | ||
} | ||
} | ||
|
||
func TestAssignerHandlePartitionFailure(t *testing.T) { | ||
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" | ||
|
||
verifiers := test.NewVerifiers(t) | ||
stream := test.NewRPCVerifier(t) | ||
stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil) | ||
verifiers.AddAssignmentStream(subscription, stream) | ||
|
||
mockServer.OnTestStart(verifiers) | ||
defer mockServer.OnTestEnd() | ||
|
||
asn := newTestAssigner(t, subscription) | ||
// Simulates the assigningSubscriber discarding assignments. | ||
asn.RetError = errors.New("subscriber shutting down") | ||
|
||
if gotErr := asn.StartError(); gotErr != nil { | ||
t.Errorf("Start() got err: (%v)", gotErr) | ||
} | ||
if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) { | ||
t.Errorf("Partition assignments: got %v, want %v", got, want) | ||
} | ||
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) { | ||
t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError) | ||
} | ||
} |
Oops, something went wrong.