diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index 0e9633d4873..cc4f6cbba34 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -966,8 +966,8 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { // Assignment stream asnStream := test.NewRPCVerifier(t) asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil) - assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil) - asnStream.Push(assignmentAckReq(), nil, nil) + assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil) + assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil) verifiers.AddAssignmentStream(subscription, asnStream) // Partition 1 @@ -977,7 +977,7 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { verifiers.AddSubscribeStream(subscription, 1, subStream1) cmtStream1 := test.NewRPCVerifier(t) - cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) + commitBarrier := cmtStream1.PushWithBarrier(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) // Note: no commit expected. verifiers.AddCommitStream(subscription, 1, cmtStream1) @@ -1004,9 +1004,13 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { ack1 := receiver.ValidateMsg(msg1) // Partition assignments will now be {2}. - assignmentBarrier.Release() + assignmentBarrier1.Release() receiver.ValidateMsg(msg2).Ack() + // These barriers ensure that this test is deterministic by ensuring that the + // server has received expected requests before proceeding. + commitBarrier.Release() + assignmentBarrier2.Release() // Partition 1 has already been unassigned, so this ack is discarded. ack1.Ack()