Skip to content

Commit

Permalink
test(pubsublite): fix flaky TestAssignerHandlePartitionFailure (#3799)
Browse files Browse the repository at this point in the history
The fake error response needs to be set before the assigner is started to prevent races.

Fixes #3789.
  • Loading branch information
tmdiep committed Mar 12, 2021
1 parent 29a6ab6 commit a00a245
Showing 1 changed file with 6 additions and 17 deletions.
23 changes: 6 additions & 17 deletions pubsublite/internal/wire/assigner_test.go
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"errors"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -63,24 +62,23 @@ func fakeGenerateUUID() (uuid.UUID, error) {
// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
// Fake error to simulate receiver unable to handle assignment.
recvError error
mu sync.Mutex

recvError error
t *testing.T
asn *assigner
partitions chan []int

serviceTestProxy
}

func newTestAssigner(t *testing.T, subscription string) *testAssigner {
func newTestAssigner(t *testing.T, subscription string, recvErr error) *testAssigner {
ctx := context.Background()
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}

ta := &testAssigner{
recvError: recvErr,
t: t,
partitions: make(chan []int, 1),
}
Expand All @@ -98,20 +96,12 @@ func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
sort.Ints(p)
ta.partitions <- p

ta.mu.Lock()
defer ta.mu.Unlock()
if ta.recvError != nil {
return ta.recvError
}
return nil
}

func (ta *testAssigner) SetReceiveError(err error) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.recvError = err
}

func (ta *testAssigner) NextPartitions() []int {
select {
case <-time.After(serviceTestWaitTimeout):
Expand All @@ -133,7 +123,7 @@ func TestAssignerNoInitialResponse(t *testing.T) {
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)
asn := newTestAssigner(t, subscription, nil)

// Assigner starts even though no initial response was received from the
// server.
Expand Down Expand Up @@ -167,7 +157,7 @@ func TestAssignerReconnect(t *testing.T) {
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)
asn := newTestAssigner(t, subscription, nil)

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
Expand All @@ -194,10 +184,9 @@ func TestAssignerHandlePartitionFailure(t *testing.T) {
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)
// Simulates the assigningSubscriber discarding assignments.
wantErr := errors.New("subscriber shutting down")
asn.SetReceiveError(wantErr)
asn := newTestAssigner(t, subscription, wantErr)

if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
Expand Down

0 comments on commit a00a245

Please sign in to comment.