diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 365733f4c5d..1e71716ab92 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go v0.72.0 github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.4 + github.com/google/uuid v1.1.2 github.com/googleapis/gax-go/v2 v2.0.5 golang.org/x/mod v0.4.0 // indirect golang.org/x/tools v0.0.0-20201204162204-73cf035baebf // indirect diff --git a/pubsublite/go.sum b/pubsublite/go.sum index 09da63868ab..ead40e8fee3 100644 --- a/pubsublite/go.sum +++ b/pubsublite/go.sum @@ -102,6 +102,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go new file mode 100644 index 00000000000..eeecfe9bdca --- /dev/null +++ b/pubsublite/internal/wire/assigner.go @@ -0,0 +1,169 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "context" + "errors" + "fmt" + "reflect" + + "github.com/google/uuid" + "google.golang.org/grpc" + + vkit "cloud.google.com/go/pubsublite/apiv1" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// partitionSet is a set of partition numbers. +type partitionSet map[int]struct{} + +func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet { + var void struct{} + partitions := make(map[int]struct{}) + for _, p := range assignmentpb.GetPartitions() { + partitions[int(p)] = void + } + return partitionSet(partitions) +} + +func (ps partitionSet) Ints() (partitions []int) { + for p := range ps { + partitions = append(partitions, p) + } + return +} + +func (ps partitionSet) Contains(partition int) bool { + _, exists := ps[partition] + return exists +} + +// A function that generates a 16-byte UUID. +type generateUUIDFunc func() (uuid.UUID, error) + +// partitionAssignmentReceiver must enact the received partition assignment from +// the server, or otherwise return an error, which will break the stream. The +// receiver must not call the assigner, as this would result in a deadlock. +type partitionAssignmentReceiver func(partitionSet) error + +// assigner wraps the partition assignment stream and notifies a receiver when +// the server sends a new set of partition assignments for a subscriber. +type assigner struct { + // Immutable after creation. + assignmentClient *vkit.PartitionAssignmentClient + initialReq *pb.PartitionAssignmentRequest + receiveAssignment partitionAssignmentReceiver + + // Fields below must be guarded with mu. + stream *retryableStream + + abstractService +} + +func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) { + clientID, err := genUUID() + if err != nil { + return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err) + } + + a := &assigner{ + assignmentClient: assignmentClient, + initialReq: &pb.PartitionAssignmentRequest{ + Request: &pb.PartitionAssignmentRequest_Initial{ + Initial: &pb.InitialPartitionAssignmentRequest{ + Subscription: subscriptionPath, + ClientId: clientID[:], + }, + }, + }, + receiveAssignment: receiver, + } + a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{})) + return a, nil +} + +func (a *assigner) Start() { + a.mu.Lock() + defer a.mu.Unlock() + if a.unsafeUpdateStatus(serviceStarting, nil) { + a.stream.Start() + } +} + +func (a *assigner) Stop() { + a.mu.Lock() + defer a.mu.Unlock() + a.unsafeInitiateShutdown(serviceTerminating, nil) +} + +func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) { + return a.assignmentClient.AssignPartitions(ctx) +} + +func (a *assigner) initialRequest() (interface{}, bool) { + return a.initialReq, false // No initial response expected +} + +func (a *assigner) validateInitialResponse(_ interface{}) error { + // Should not be called. + return errors.New("pubsublite: unexpected initial response") +} + +func (a *assigner) onStreamStatusChange(status streamStatus) { + a.mu.Lock() + defer a.mu.Unlock() + + switch status { + case streamConnected: + a.unsafeUpdateStatus(serviceActive, nil) + case streamTerminated: + a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error()) + } +} + +func (a *assigner) onResponse(response interface{}) { + a.mu.Lock() + defer a.mu.Unlock() + + if a.status >= serviceTerminating { + return + } + + assignment, _ := response.(*pb.PartitionAssignment) + if err := a.handleAssignment(assignment); err != nil { + a.unsafeInitiateShutdown(serviceTerminated, err) + } +} + +func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error { + if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil { + return err + } + + a.stream.Send(&pb.PartitionAssignmentRequest{ + Request: &pb.PartitionAssignmentRequest_Ack{ + Ack: &pb.PartitionAssignmentAck{}, + }, + }) + return nil +} + +func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { + if !a.unsafeUpdateStatus(targetStatus, err) { + return + } + // No data to send. Immediately terminate the stream. + a.stream.Stop() +} diff --git a/pubsublite/internal/wire/assigner_test.go b/pubsublite/internal/wire/assigner_test.go new file mode 100644 index 00000000000..4c7c3f02e36 --- /dev/null +++ b/pubsublite/internal/wire/assigner_test.go @@ -0,0 +1,200 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "context" + "errors" + "sort" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/internal/test" + "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +func TestPartitionSet(t *testing.T) { + partitions := newPartitionSet(&pb.PartitionAssignment{ + Partitions: []int64{8, 5, 8, 1}, + }) + + wantPartitions := []int{1, 5, 8} + for _, partition := range wantPartitions { + if !partitions.Contains(partition) { + t.Errorf("Contains(%d) got false, want true", partition) + } + } + for _, partition := range []int{2, 3, 4, 6, 7} { + if partitions.Contains(partition) { + t.Errorf("Contains(%d) got true, want false", partition) + } + } + + gotPartitions := partitions.Ints() + sort.Ints(gotPartitions) + if !testutil.Equal(gotPartitions, wantPartitions) { + t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions) + } +} + +var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'} + +func fakeGenerateUUID() (uuid.UUID, error) { + return fakeUUID, nil +} + +// testAssigner wraps an assigner for ease of testing. +type testAssigner struct { + // Fake error to simulate receiver unable to handle assignment. + RetError error + + t *testing.T + asn *assigner + partitions chan []int + + serviceTestProxy +} + +func newTestAssigner(t *testing.T, subscription string) *testAssigner { + ctx := context.Background() + assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + + ta := &testAssigner{ + t: t, + partitions: make(chan []int, 1), + } + asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment) + if err != nil { + t.Fatal(err) + } + ta.asn = asn + ta.initAndStart(t, ta.asn, "Assigner") + return ta +} + +func (ta *testAssigner) receiveAssignment(partitions partitionSet) error { + p := partitions.Ints() + sort.Ints(p) + ta.partitions <- p + + if ta.RetError != nil { + return ta.RetError + } + return nil +} + +func (ta *testAssigner) NextPartitions() []int { + select { + case <-time.After(serviceTestWaitTimeout): + ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout) + return nil + case p := <-ta.partitions: + return p + } +} + +func TestAssignerNoInitialResponse(t *testing.T) { + subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil) + verifiers.AddAssignmentStream(subscription, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + asn := newTestAssigner(t, subscription) + + // Assigner starts even though no initial response was received from the + // server. + if gotErr := asn.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + // To ensure test is deterministic, i.e. server must receive initial request + // before stopping the client. + barrier.Release() + asn.StopVerifyNoError() +} + +func TestAssignerReconnect(t *testing.T) { + subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" + permanentErr := status.Error(codes.FailedPrecondition, "failed") + + verifiers := test.NewVerifiers(t) + + // Simulate a transient error that results in a reconnect. + stream1 := test.NewRPCVerifier(t) + stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable")) + verifiers.AddAssignmentStream(subscription, stream1) + + // Send 2 partition assignments before terminating with permanent error. + stream2 := test.NewRPCVerifier(t) + stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil) + stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil) + stream2.Push(assignmentAckReq(), nil, permanentErr) + verifiers.AddAssignmentStream(subscription, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + asn := newTestAssigner(t, subscription) + + if gotErr := asn.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) { + t.Errorf("Partition assignment #1: got %v, want %v", got, want) + } + if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) { + t.Errorf("Partition assignment #2: got %v, want %v", got, want) + } + if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) + } +} + +func TestAssignerHandlePartitionFailure(t *testing.T) { + subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil) + verifiers.AddAssignmentStream(subscription, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + asn := newTestAssigner(t, subscription) + // Simulates the assigningSubscriber discarding assignments. + asn.RetError = errors.New("subscriber shutting down") + + if gotErr := asn.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) { + t.Errorf("Partition assignments: got %v, want %v", got, want) + } + if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError) + } +} diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 9f1d2c6a3de..8b9c9894c28 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -19,6 +19,8 @@ import ( "reflect" "time" + "github.com/google/uuid" + "google.golang.org/api/option" "google.golang.org/grpc" vkit "cloud.google.com/go/pubsublite/apiv1" @@ -345,6 +347,9 @@ func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err // - subscribeStream to receive messages from the subscribe stream. // - committer to commit cursor offsets to the streaming commit cursor stream. type singlePartitionSubscriber struct { + subscriber *subscribeStream + committer *committer + compositeService } @@ -363,7 +368,10 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu acks := newAckTracker() commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks) sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks) - ps := new(singlePartitionSubscriber) + ps := &singlePartitionSubscriber{ + subscriber: sub, + committer: commit, + } ps.init() ps.unsafeAddServices(sub, commit) return ps @@ -385,3 +393,107 @@ func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) * } return ms } + +// assigningSubscriber uses the Pub/Sub Lite partition assignment service to +// listen to its assigned partition numbers and dynamically add/remove +// singlePartitionSubscribers. +type assigningSubscriber struct { + // Immutable after creation. + subFactory *singlePartitionSubscriberFactory + assigner *assigner + + // Fields below must be guarded with mu. + // Subscribers keyed by partition number. Updated as assignments change. + subscribers map[int]*singlePartitionSubscriber + + compositeService +} + +func newAssigningSubscriber(assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) { + as := &assigningSubscriber{ + subFactory: subFactory, + subscribers: make(map[int]*singlePartitionSubscriber), + } + as.init() + + assigner, err := newAssigner(subFactory.ctx, assignmentClient, genUUID, subFactory.settings, subFactory.subscriptionPath, as.handleAssignment) + if err != nil { + return nil, err + } + as.assigner = assigner + as.unsafeAddServices(assigner) + return as, nil +} + +func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error { + as.mu.Lock() + defer as.mu.Unlock() + + // Handle new partitions. + for _, partition := range partitions.Ints() { + if _, exists := as.subscribers[partition]; !exists { + subscriber := as.subFactory.New(partition) + if err := as.unsafeAddServices(subscriber); err != nil { + // Occurs when the assigningSubscriber is stopping/stopped. + return err + } + as.subscribers[partition] = subscriber + } + } + + // Handle removed partitions. + for partition, subscriber := range as.subscribers { + if !partitions.Contains(partition) { + as.unsafeRemoveService(subscriber) + // Safe to delete map entry during range loop: + // https://golang.org/ref/spec#For_statements + delete(as.subscribers, partition) + } + } + return nil +} + +// Subscriber is the client interface exported from this package for receiving +// messages. +type Subscriber interface { + Start() + WaitStarted() error + Stop() + WaitStopped() error +} + +// NewSubscriber creates a new client for receiving messages. +func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error) { + if err := ValidateRegion(region); err != nil { + return nil, err + } + if err := validateReceiveSettings(settings); err != nil { + return nil, err + } + subClient, err := newSubscriberClient(ctx, region, opts...) + if err != nil { + return nil, err + } + cursorClient, err := newCursorClient(ctx, region, opts...) + if err != nil { + return nil, err + } + + subFactory := &singlePartitionSubscriberFactory{ + ctx: ctx, + subClient: subClient, + cursorClient: cursorClient, + settings: settings, + subscriptionPath: subscriptionPath, + receiver: receiver, + } + + if len(settings.Partitions) > 0 { + return newMultiPartitionSubscriber(subFactory), nil + } + partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...) + if err != nil { + return nil, err + } + return newAssigningSubscriber(partitionClient, uuid.NewRandom, subFactory) +} diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index ea437522132..a08094669e6 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -747,3 +747,234 @@ func TestMultiPartitionSubscriberPermanentError(t *testing.T) { msg2Barrier.Release() receiver.VerifyNoMsgs() } + +func (as *assigningSubscriber) Partitions() []int { + as.mu.Lock() + defer as.mu.Unlock() + + var partitions []int + for p := range as.subscribers { + partitions = append(partitions, p) + } + sort.Ints(partitions) + return partitions +} + +func (as *assigningSubscriber) FlushCommits() { + as.mu.Lock() + defer as.mu.Unlock() + + for _, sub := range as.subscribers { + sub.committer.commitOffsetToStream() + } +} + +func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string) *assigningSubscriber { + ctx := context.Background() + subClient, err := newSubscriberClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + cursorClient, err := newCursorClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + + f := &singlePartitionSubscriberFactory{ + ctx: ctx, + subClient: subClient, + cursorClient: cursorClient, + settings: testSubscriberSettings(), + subscriptionPath: subscriptionPath, + receiver: receiverFunc, + disableTasks: true, // Background tasks disabled to control event order + } + sub, err := newAssigningSubscriber(assignmentClient, fakeGenerateUUID, f) + if err != nil { + t.Fatal(err) + } + sub.Start() + return sub +} + +func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { + const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" + receiver := newTestMessageReceiver(t) + msg1 := seqMsgWithOffsetAndSize(33, 100) + msg2 := seqMsgWithOffsetAndSize(34, 200) + msg3 := seqMsgWithOffsetAndSize(66, 100) + msg4 := seqMsgWithOffsetAndSize(67, 100) + msg5 := seqMsgWithOffsetAndSize(88, 100) + + verifiers := test.NewVerifiers(t) + + // Assignment stream + asnStream := test.NewRPCVerifier(t) + asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 6}), nil) + assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil) + asnStream.Push(assignmentAckReq(), nil, nil) + verifiers.AddAssignmentStream(subscription, asnStream) + + // Partition 3 + subStream3 := test.NewRPCVerifier(t) + subStream3.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil) + subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil) + msg2Barrier := subStream3.PushWithBarrier(nil, msgSubResp(msg2), nil) + verifiers.AddSubscribeStream(subscription, 3, subStream3) + + cmtStream3 := test.NewRPCVerifier(t) + cmtStream3.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 3}), initCommitResp(), nil) + cmtStream3.Push(commitReq(34), commitResp(1), nil) + cmtStream3.Push(commitReq(35), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 3, cmtStream3) + + // Partition 6 + subStream6 := test.NewRPCVerifier(t) + subStream6.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil) + subStream6.Push(initFlowControlReq(), msgSubResp(msg3), nil) + // msg4 should not be received. + msg4Barrier := subStream6.PushWithBarrier(nil, msgSubResp(msg4), nil) + verifiers.AddSubscribeStream(subscription, 6, subStream6) + + cmtStream6 := test.NewRPCVerifier(t) + cmtStream6.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 6}), initCommitResp(), nil) + cmtStream6.Push(commitReq(67), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 6, cmtStream6) + + // Partition 8 + subStream8 := test.NewRPCVerifier(t) + subStream8.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil) + subStream8.Push(initFlowControlReq(), msgSubResp(msg5), nil) + verifiers.AddSubscribeStream(subscription, 8, subStream8) + + cmtStream8 := test.NewRPCVerifier(t) + cmtStream8.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 8}), initCommitResp(), nil) + cmtStream8.Push(commitReq(89), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 8, cmtStream8) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription) + if gotErr := sub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + // Partition assignments are initially {3, 6}. + receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3}) + if got, want := sub.Partitions(), []int{3, 6}; !testutil.Equal(got, want) { + t.Errorf("subscriber partitions: got %d, want %d", got, want) + } + + // Partition assignments will now be {3, 8}. + assignmentBarrier.Release() + receiver.ValidateMsgs([]*pb.SequencedMessage{msg5}) + if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) { + t.Errorf("subscriber partitions: got %d, want %d", got, want) + } + + // msg2 is from partition 3 and should be received. msg4 is from partition 6 + // (removed) and should be discarded. + sub.FlushCommits() + msg2Barrier.Release() + msg4Barrier.Release() + receiver.ValidateMsgs([]*pb.SequencedMessage{msg2}) + + // Stop should flush all commit cursors. + sub.Stop() + if gotErr := sub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + +func TestAssigningSubscriberPermanentError(t *testing.T) { + const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" + receiver := newTestMessageReceiver(t) + msg1 := seqMsgWithOffsetAndSize(11, 100) + msg2 := seqMsgWithOffsetAndSize(22, 200) + serverErr := status.Error(codes.FailedPrecondition, "failed") + + verifiers := test.NewVerifiers(t) + + // Assignment stream + asnStream := test.NewRPCVerifier(t) + asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil) + errBarrier := asnStream.PushWithBarrier(assignmentAckReq(), nil, serverErr) + verifiers.AddAssignmentStream(subscription, asnStream) + + // Partition 1 + subStream1 := test.NewRPCVerifier(t) + subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) + verifiers.AddSubscribeStream(subscription, 1, subStream1) + + cmtStream1 := test.NewRPCVerifier(t) + cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) + cmtStream1.Push(commitReq(12), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 1, cmtStream1) + + // Partition 2 + subStream2 := test.NewRPCVerifier(t) + subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) + subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil) + verifiers.AddSubscribeStream(subscription, 2, subStream2) + + cmtStream2 := test.NewRPCVerifier(t) + cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil) + cmtStream2.Push(commitReq(23), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 2, cmtStream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription) + if gotErr := sub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2}) + + // Permanent assignment stream error should terminate subscriber. Commits are + // still flushed. + errBarrier.Release() + if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { + t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) + } +} + +func TestNewSubscriberCreatesCorrectImpl(t *testing.T) { + const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" + const region = "us-central1" + receiver := newTestMessageReceiver(t) + + sub, err := NewSubscriber(context.Background(), DefaultReceiveSettings, receiver.onMessage, region, subscription) + if err != nil { + t.Errorf("NewSubscriber() got error: %v", err) + } else if _, ok := sub.(*assigningSubscriber); !ok { + t.Error("NewSubscriber() did not return a assigningSubscriber") + } + + settings := DefaultReceiveSettings + settings.Partitions = []int{1, 2, 3} + sub, err = NewSubscriber(context.Background(), settings, receiver.onMessage, region, subscription) + if err != nil { + t.Errorf("NewSubscriber() got error: %v", err) + } else if _, ok := sub.(*multiPartitionSubscriber); !ok { + t.Error("NewSubscriber() did not return a multiPartitionSubscriber") + } +} + +func TestNewSubscriberValidatesSettings(t *testing.T) { + const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" + const region = "us-central1" + receiver := newTestMessageReceiver(t) + + settings := DefaultReceiveSettings + settings.MaxOutstandingMessages = 0 + if _, err := NewSubscriber(context.Background(), settings, receiver.onMessage, region, subscription); err == nil { + t.Error("NewSubscriber() did not return error") + } +}