Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): mitigate gRPC stream connection issues #5382

Merged
merged 6 commits into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pubsublite/internal/wire/assigner.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"time"

"github.com/google/uuid"
"google.golang.org/grpc"
Expand Down Expand Up @@ -113,7 +114,7 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment
receiveAssignment: receiver,
metadata: newPubsubMetadata(),
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
a.stream = newRetryableStream(ctx, a, settings.Timeout, 10*time.Minute, reflect.TypeOf(pb.PartitionAssignment{}))
a.metadata.AddClientInfo(settings.Framework)
return a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/committer.go
Expand Up @@ -76,7 +76,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei
acks: acks,
cursorTracker: newCommitCursorTracker(acks),
}
c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.stream = newRetryableStream(ctx, c, settings.Timeout, minDuration(settings.Timeout, defaultStreamIdleTimeout), reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.metadata.AddClientInfo(settings.Framework)

backgroundTask := c.commitOffsetToStream
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/publisher.go
Expand Up @@ -88,7 +88,7 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub
metadata: newPubsubMetadata(),
}
pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch)
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{}))
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, minDuration(f.settings.Timeout, defaultStreamIdleTimeout), reflect.TypeOf(pb.PublishResponse{}))
pp.metadata.AddTopicRoutingMetadata(pp.topic)
pp.metadata.AddClientInfo(f.settings.Framework)
return pp
Expand Down
104 changes: 89 additions & 15 deletions pubsublite/internal/wire/request_timer.go
Expand Up @@ -18,39 +18,47 @@ import (
"time"
)

type requestTimerStatus int
// minDuration returns the minimum of two durations.
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

type timerStatus int

const (
requestTimerNew requestTimerStatus = iota
requestTimerStopped
requestTimerTriggered
timerActive timerStatus = iota
timerStopped
timerTriggered
)

// requestTimer bounds the duration of a request and executes `onTimeout` if
// the timer is triggered.
// requestTimer is a one-shot timer used to bound the duration of a request. It
// executes `onTimeout` if the timeout expires.
type requestTimer struct {
onTimeout func()
timeoutErr error
timer *time.Timer
mu sync.Mutex
status requestTimerStatus
status timerStatus
}

func newRequestTimer(duration time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
func newRequestTimer(timeout time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
rt := &requestTimer{
onTimeout: onTimeout,
timeoutErr: timeoutErr,
status: requestTimerNew,
status: timerActive,
}
rt.timer = time.AfterFunc(duration, rt.onTriggered)
rt.timer = time.AfterFunc(timeout, rt.onTriggered)
return rt
}

func (rt *requestTimer) onTriggered() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerNew {
rt.status = requestTimerTriggered
if rt.status == timerActive {
rt.status = timerTriggered
rt.onTimeout()
}
}
Expand All @@ -60,8 +68,8 @@ func (rt *requestTimer) onTriggered() {
func (rt *requestTimer) Stop() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerNew {
rt.status = requestTimerStopped
if rt.status == timerActive {
rt.status = timerStopped
rt.timer.Stop()
}
}
Expand All @@ -71,8 +79,74 @@ func (rt *requestTimer) Stop() {
func (rt *requestTimer) ResolveError(originalErr error) error {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == requestTimerTriggered {
if rt.status == timerTriggered {
return rt.timeoutErr
}
return originalErr
}

// streamIdleTimer is an approximate timer used to detect idle streams.
// `onTimeout` may be called up to (timeout / pollDivisor) after `timeout` has
// expired.
type streamIdleTimer struct {
timeout time.Duration
onTimeout func()
task *periodicTask
mu sync.Mutex
status timerStatus
startTime time.Time
}

const (
pollDivisor = 4
maxPollInterval = time.Minute
)

// newStreamIdleTimer creates an unstarted timer.
func newStreamIdleTimer(timeout time.Duration, onTimeout func()) *streamIdleTimer {
st := &streamIdleTimer{
timeout: timeout,
onTimeout: onTimeout,
status: timerStopped,
}
st.task = newPeriodicTask(minDuration(timeout/pollDivisor, maxPollInterval), st.onPoll)
st.task.Start()
return st
}

// Restart the timer. Should be called when there is stream activity.
func (st *streamIdleTimer) Restart() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerActive
st.startTime = time.Now()
}

// Stop the timer to prevent it from expiring.
func (st *streamIdleTimer) Stop() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerStopped
}

// Shutdown should be called when the timer is no longer used.
func (st *streamIdleTimer) Shutdown() {
st.Stop()
st.task.Stop()
}

func (st *streamIdleTimer) onPoll() {
timeoutExpired := func() bool {
st.mu.Lock()
defer st.mu.Unlock()
// Note: time.Since() uses monotonic clock readings.
if st.status == timerActive && time.Since(st.startTime) > st.timeout {
st.status = timerTriggered
return true
}
return false
}()
if timeoutExpired {
st.onTimeout()
}
}
82 changes: 82 additions & 0 deletions pubsublite/internal/wire/request_timer_test.go
Expand Up @@ -15,12 +15,43 @@ package wire

import (
"errors"
"fmt"
"testing"
"time"

"cloud.google.com/go/pubsublite/internal/test"
)

func TestMinDuration(t *testing.T) {
for _, tc := range []struct {
a time.Duration
b time.Duration
want time.Duration
}{
{
a: 10 * time.Millisecond,
b: 10 * time.Millisecond,
want: 10 * time.Millisecond,
},
{
a: 10 * time.Millisecond,
b: 9 * time.Millisecond,
want: 9 * time.Millisecond,
},
{
a: 5 * time.Millisecond,
b: 5 * time.Second,
want: 5 * time.Millisecond,
},
} {
t.Run(fmt.Sprintf("%s %s", tc.a, tc.b), func(t *testing.T) {
if got := minDuration(tc.a, tc.b); got != tc.want {
t.Errorf("minDuration(%v, %v): got %v, want %v", tc.a, tc.b, got, tc.want)
}
})
}
}

func TestRequestTimerStop(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
Expand Down Expand Up @@ -59,3 +90,54 @@ func TestRequestTimerExpires(t *testing.T) {
t.Errorf("ResolveError() got err: %v, want err: %v", gotErr, timeoutErr)
}
}

func TestStreamIdleTimerExpires(t *testing.T) {
const timeout = 5 * time.Millisecond
expired := test.NewCondition("timer expired")

st := newStreamIdleTimer(timeout, expired.SetDone)
defer st.Shutdown()
st.Restart()
expired.WaitUntilDone(t, serviceTestWaitTimeout)
}

func TestStreamIdleTimerRestart(t *testing.T) {
const timeout = 20 * time.Millisecond
const delta = 15 * time.Millisecond
expired := test.NewCondition("timer expired")

st := newStreamIdleTimer(timeout, expired.SetDone)
defer st.Shutdown()
st.Restart()
time.Sleep(delta)
expired.VerifyNotDone(t)
st.Restart()
time.Sleep(delta)
expired.VerifyNotDone(t)
expired.WaitUntilDone(t, serviceTestWaitTimeout)
}

func TestStreamIdleTimerStop(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
t.Error("onTimeout should not be called")
}

st := newStreamIdleTimer(timeout, onTimeout)
defer st.Shutdown()
st.Restart()
st.Stop()
time.Sleep(2 * timeout)
}

func TestStreamIdleTimerShutdown(t *testing.T) {
const timeout = 5 * time.Millisecond
onTimeout := func() {
t.Error("onTimeout should not be called")
}

st := newStreamIdleTimer(timeout, onTimeout)
st.Restart()
st.Shutdown()
time.Sleep(2 * timeout)
}
45 changes: 32 additions & 13 deletions pubsublite/internal/wire/streams.go
Expand Up @@ -42,8 +42,13 @@ const (
streamTerminated
)

// Abort a stream initialization attempt after this duration to mitigate delays.
const defaultInitTimeout = 2 * time.Minute
const (
// Abort a stream initialization attempt after this duration to mitigate delays.
defaultStreamInitTimeout = 2 * time.Minute

// Reconnect a stream if it has been idle for this duration.
defaultStreamIdleTimeout = 2 * time.Minute
)

var errStreamInitTimeout = status.Error(codes.DeadlineExceeded, "pubsublite: stream initialization timed out")

Expand Down Expand Up @@ -107,6 +112,7 @@ type retryableStream struct {
responseType reflect.Type
connectTimeout time.Duration
initTimeout time.Duration
idleTimer *streamIdleTimer

// Guards access to fields below.
mu sync.Mutex
Expand All @@ -119,21 +125,21 @@ type retryableStream struct {
finalErr error
}

// newRetryableStream creates a new retryable stream wrapper. `timeout` is the
// maximum duration for reconnection. `responseType` is the type of the response
// proto received on the stream.
func newRetryableStream(ctx context.Context, handler streamHandler, timeout time.Duration, responseType reflect.Type) *retryableStream {
initTimeout := defaultInitTimeout
if timeout < defaultInitTimeout {
initTimeout = timeout
}
return &retryableStream{
// newRetryableStream creates a new retryable stream wrapper.
// `connectTimeout` is the maximum duration for reconnection, after which the
// stream will be terminated. Streams are reconnected if idle for `idleTimeout`.
// `responseType` is the type of the response proto received on the stream.
func newRetryableStream(ctx context.Context, handler streamHandler, connectTimeout, idleTimeout time.Duration, responseType reflect.Type) *retryableStream {
initTimeout := minDuration(connectTimeout, defaultStreamInitTimeout)
rs := &retryableStream{
ctx: ctx,
handler: handler,
responseType: responseType,
connectTimeout: timeout,
connectTimeout: connectTimeout,
initTimeout: initTimeout,
}
rs.idleTimer = newStreamIdleTimer(idleTimeout, rs.onStreamIdle)
return rs
}

// Start establishes a stream connection. It is a no-op if the stream has
Expand Down Expand Up @@ -211,6 +217,16 @@ func (rs *retryableStream) unsafeClearStream() {
}
}

func (rs *retryableStream) onStreamIdle() {
rs.mu.Lock()
defer rs.mu.Unlock()

// Invalidate the current stream handle so that subsequent messages and errors
// are discarded.
rs.unsafeClearStream()
go rs.connectStream(notifyReset(false))
}

func (rs *retryableStream) newStreamContext() (ctx context.Context, cancel context.CancelFunc) {
rs.mu.Lock()
defer rs.mu.Unlock()
Expand Down Expand Up @@ -241,6 +257,7 @@ func (rs *retryableStream) connectStream(notifyReset notifyReset) {
}
rs.status = streamReconnecting
rs.unsafeClearStream()
rs.idleTimer.Stop()
return true
}
if !canReconnect() {
Expand Down Expand Up @@ -272,6 +289,7 @@ func (rs *retryableStream) connectStream(notifyReset notifyReset) {
}
rs.status = streamConnected
rs.stream = newStream
rs.idleTimer.Restart()
return true
}
if !connected() {
Expand Down Expand Up @@ -315,7 +333,6 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, err err
}
if err = rs.handler.validateInitialResponse(response); err != nil {
// An unexpected initial response from the server is a permanent error.
cancelFunc()
return 0, false
}
}
Expand Down Expand Up @@ -363,6 +380,7 @@ func (rs *retryableStream) listen(recvStream grpc.ClientStream) {
if rs.currentStream() != recvStream {
break
}
rs.idleTimer.Restart()
if err != nil {
if isRetryableRecvError(err) {
go rs.connectStream(notifyReset(isStreamResetSignal(err)))
Expand All @@ -389,6 +407,7 @@ func (rs *retryableStream) unsafeTerminate(err error) {
}
rs.status = streamTerminated
rs.finalErr = err
rs.idleTimer.Shutdown()
rs.unsafeClearStream()

// terminate can be called from within a streamHandler method with a lock
Expand Down