diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index d9d24c01f01..c447dad6f46 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -51,6 +51,7 @@ type committer struct { acks *ackTracker cursorTracker *commitCursorTracker pollCommits *periodicTask + enableCommits bool abstractService } @@ -135,6 +136,7 @@ func (c *committer) onStreamStatusChange(status streamStatus) { switch status { case streamConnected: + c.enableCommits = true c.unsafeUpdateStatus(serviceActive, nil) // Once the stream connects, clear unconfirmed commits and immediately send // the latest desired commit offset. @@ -143,6 +145,8 @@ func (c *committer) onStreamStatusChange(status streamStatus) { c.pollCommits.Start() case streamReconnecting: + // Ensure there are no commits until streamConnected has been handled above. + c.enableCommits = false c.pollCommits.Stop() case streamTerminated: @@ -185,6 +189,9 @@ func (c *committer) commitOffsetToStream() { } func (c *committer) unsafeCommitOffsetToStream() { + if !c.enableCommits { + return + } nextOffset := c.cursorTracker.NextOffset() if nextOffset == nilCursorOffset { return @@ -204,6 +211,7 @@ func (c *committer) unsafeCommitOffsetToStream() { func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) { + c.unsafeCheckDone() return } diff --git a/pubsublite/internal/wire/committer_test.go b/pubsublite/internal/wire/committer_test.go index 59a0b5d6b7f..7fe864b9820 100644 --- a/pubsublite/internal/wire/committer_test.go +++ b/pubsublite/internal/wire/committer_test.go @@ -158,6 +158,33 @@ func TestCommitterTerminateDiscardsOutstandingAcks(t *testing.T) { } } +func TestCommitterStopThenTerminateDiscardsOutstandingAcks(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0} + ack := newAckConsumer(33, 0, nil) + acks := newAckTracker() + acks.Push(ack) + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initCommitReq(subscription), initCommitResp(), nil) + // No commits expected. + verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + cmt := newTestCommitter(t, subscription, acks) + if gotErr := cmt.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + cmt.Stop() // Stop waits for outstanding acks + cmt.Terminate() // Terminate should discard all outstanding acks + if gotErr := cmt.FinalError(); gotErr != nil { + t.Errorf("Final err: (%v), want: ", gotErr) + } +} + func TestCommitterPermanentStreamError(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0} acks := newAckTracker() diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index 135cacd1ac2..5b4116a22be 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -983,31 +983,20 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { // Assignment stream asnStream := test.NewRPCVerifier(t) asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil) - assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil) + assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{}), nil) assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil) verifiers.AddAssignmentStream(subscription, asnStream) // Partition 1 - subStream1 := test.NewRPCVerifier(t) - subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) - subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) - verifiers.AddSubscribeStream(subscription, 1, subStream1) - - cmtStream1 := test.NewRPCVerifier(t) - commitBarrier := cmtStream1.PushWithBarrier(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) - // Note: no commit expected. - verifiers.AddCommitStream(subscription, 1, cmtStream1) - - // Partition 2 - subStream2 := test.NewRPCVerifier(t) - subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) - subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil) - verifiers.AddSubscribeStream(subscription, 2, subStream2) + subStream := test.NewRPCVerifier(t) + subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) + verifiers.AddSubscribeStream(subscription, 1, subStream) - cmtStream2 := test.NewRPCVerifier(t) - cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil) - cmtStream2.Push(commitReq(23), commitResp(1), nil) - verifiers.AddCommitStream(subscription, 2, cmtStream2) + cmtStream := test.NewRPCVerifier(t) + cmtStream.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) + cmtStream.Push(commitReq(12), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 1, cmtStream) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() @@ -1018,18 +1007,15 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { } // Partition assignments are initially {1}. - ack1 := receiver.ValidateMsg(msg1) + receiver.ValidateMsg(msg1).Ack() + ack2 := receiver.ValidateMsg(msg2) - // Partition assignments will now be {2}. + // Partition assignments will now be {}. assignmentBarrier1.Release() - receiver.ValidateMsg(msg2).Ack() + assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic - // These barriers ensure that this test is deterministic by ensuring that the - // server has received expected requests before proceeding. - commitBarrier.Release() - assignmentBarrier2.Release() // Partition 1 has already been unassigned, so this ack is discarded. - ack1.Ack() + ack2.Ack() sub.Stop() if gotErr := sub.WaitStopped(); gotErr != nil {