Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): mitigate gRPC stream connection issues #5382

Merged
merged 6 commits into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pubsublite/internal/wire/assigner.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"time"

"github.com/google/uuid"
"google.golang.org/grpc"
Expand Down Expand Up @@ -113,7 +114,7 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment
receiveAssignment: receiver,
metadata: newPubsubMetadata(),
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
a.stream = newRetryableStream(ctx, a, settings.Timeout, 10*time.Minute, reflect.TypeOf(pb.PartitionAssignment{}))
a.metadata.AddClientInfo(settings.Framework)
return a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/committer.go
Expand Up @@ -76,7 +76,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei
acks: acks,
cursorTracker: newCommitCursorTracker(acks),
}
c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.stream = newRetryableStream(ctx, c, settings.Timeout, streamIdleTimeout(settings.Timeout), reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.metadata.AddClientInfo(settings.Framework)

backgroundTask := c.commitOffsetToStream
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/publisher.go
Expand Up @@ -88,7 +88,7 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub
metadata: newPubsubMetadata(),
}
pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch)
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{}))
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, streamIdleTimeout(f.settings.Timeout), reflect.TypeOf(pb.PublishResponse{}))
pp.metadata.AddTopicRoutingMetadata(pp.topic)
pp.metadata.AddClientInfo(f.settings.Framework)
return pp
Expand Down
104 changes: 89 additions & 15 deletions pubsublite/internal/wire/request_timer.go
Expand Up @@ -18,39 +18,47 @@ import (
"time"
)

type requestTimerStatus int
// minDuration returns the minimum of two durations.
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

type timerStatus int

const (
requestTimerNew requestTimerStatus = iota
requestTimerStopped
requestTimerTriggered
timerActive timerStatus = iota
timerStopped
timerTriggered
)

// requestTimer bounds the duration of a request and executes `onTimeout` if
// the timer is triggered.
// requestTimer is a one-shot timer used to bound the duration of a request. It
// executes `onTimeout` if the timeout expires.
type requestTimer struct {
onTimeout func()
timeoutErr error
timer *time.Timer
mu sync.Mutex
status requestTimerStatus
status timerStatus
}

func newRequestTimer(duration time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
func newRequestTimer(timeout time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
rt := &requestTimer{
onTimeout: onTimeout,
timeoutErr: timeoutErr,
status: requestTimerNew,
status: timerActive,
}
rt.timer = time.AfterFunc(duration, rt.onTriggered)
rt.timer = time.AfterFunc(timeout, rt.onTriggered)
return rt
}

func (rt *requestTimer) onTriggered() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerNew {
rt.status = requestTimerTriggered
if rt.status == timerActive {
rt.status = timerTriggered
rt.onTimeout()
}
}
Expand All @@ -60,8 +68,8 @@ func (rt *requestTimer) onTriggered() {
func (rt *requestTimer) Stop() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerNew {
rt.status = requestTimerStopped
if rt.status == timerActive {
rt.status = timerStopped
rt.timer.Stop()
}
}
Expand All @@ -71,8 +79,74 @@ func (rt *requestTimer) Stop() {
func (rt *requestTimer) ResolveError(originalErr error) error {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerTriggered {
if rt.status == timerTriggered {
return rt.timeoutErr
}
return originalErr
}

// streamIdleTimer is an approximate timer used to detect idle streams.
// `onTimeout` may be called up to (timeout / pollDivisor) after `timeout` has
// expired.
type streamIdleTimer struct {
timeout time.Duration
onTimeout func()
task *periodicTask
mu sync.Mutex
status timerStatus
startTime time.Time
}

const (
pollDivisor = 4
maxPollInterval = time.Minute
)

// newStreamIdleTimer creates an unstarted timer.
func newStreamIdleTimer(timeout time.Duration, onTimeout func()) *streamIdleTimer {
st := &streamIdleTimer{
timeout: timeout,
onTimeout: onTimeout,
status: timerStopped,
}
st.task = newPeriodicTask(minDuration(timeout/pollDivisor, maxPollInterval), st.onPoll)
st.task.Start()
return st
}

// Restart the timer. Should be called when there is stream activity.
func (st *streamIdleTimer) Restart() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerActive
st.startTime = time.Now()
}

// Stop the timer to prevent it from expiring.
func (st *streamIdleTimer) Stop() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerStopped
}

// Shutdown should be called when the timer is no longer used.
func (st *streamIdleTimer) Shutdown() {
st.Stop()
st.task.Stop()
}

func (st *streamIdleTimer) onPoll() {
timeoutExpired := func() bool {
st.mu.Lock()
defer st.mu.Unlock()
// Note: time.Since() uses monotonic clock readings.
if st.status == timerActive && time.Since(st.startTime) > st.timeout {
st.status = timerTriggered
return true
}
return false
}()
if timeoutExpired {
st.onTimeout()
}
}
82 changes: 82 additions & 0 deletions pubsublite/internal/wire/request_timer_test.go
Expand Up @@ -15,12 +15,43 @@ package wire

import (
"errors"
"fmt"
"testing"
"time"

"cloud.google.com/go/pubsublite/internal/test"
)

func TestMinDuration(t *testing.T) {
for _, tc := range []struct {
a time.Duration
b time.Duration
want time.Duration
}{
{
a: 10 * time.Millisecond,
b: 10 * time.Millisecond,
want: 10 * time.Millisecond,
},
{
a: 10 * time.Millisecond,
b: 9 * time.Millisecond,
want: 9 * time.Millisecond,
},
{
a: 5 * time.Millisecond,
b: 5 * time.Second,
want: 5 * time.Millisecond,
},
} {
t.Run(fmt.Sprintf("%s %s", tc.a, tc.b), func(t *testing.T) {
if got := minDuration(tc.a, tc.b); got != tc.want {
t.Errorf("minDuration(%v, %v): got %v, want %v", tc.a, tc.b, got, tc.want)
}
})
}
}

func TestRequestTimerStop(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
Expand Down Expand Up @@ -59,3 +90,54 @@ func TestRequestTimerExpires(t *testing.T) {
t.Errorf("ResolveError() got err: %v, want err: %v", gotErr, timeoutErr)
}
}

func TestStreamIdleTimerExpires(t *testing.T) {
const timeout = 5 * time.Millisecond
expired := test.NewCondition("timer expired")

st := newStreamIdleTimer(timeout, expired.SetDone)
defer st.Shutdown()
st.Restart()
expired.WaitUntilDone(t, serviceTestWaitTimeout)
}

func TestStreamIdleTimerRestart(t *testing.T) {
const timeout = 20 * time.Millisecond
const delta = 15 * time.Millisecond
expired := test.NewCondition("timer expired")

st := newStreamIdleTimer(timeout, expired.SetDone)
defer st.Shutdown()
st.Restart()
time.Sleep(delta)
expired.VerifyNotDone(t)
st.Restart()
time.Sleep(delta)
expired.VerifyNotDone(t)
expired.WaitUntilDone(t, serviceTestWaitTimeout)
}

func TestStreamIdleTimerStop(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
t.Error("onTimeout should not be called")
}

st := newStreamIdleTimer(timeout, onTimeout)
defer st.Shutdown()
st.Restart()
st.Stop()
time.Sleep(2 * timeout)
}

func TestStreamIdleTimerShutdown(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
t.Error("onTimeout should not be called")
}

st := newStreamIdleTimer(timeout, onTimeout)
st.Restart()
st.Shutdown()
time.Sleep(2 * timeout)
}