From 8763ef3d2da18e8fed6e350aef76d26a135246c2 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 8 Feb 2022 11:04:34 +1100 Subject: [PATCH] fix(pubsublite): mitigate gRPC stream connection issues (#5382) Mitigates hanging streams by detecting idle streams and reconnecting after a timeout (default 10min for partition assignment streams, 2min for all others). If the user has specified a lower timeout in settings, this will be used for publish, subscribe and commit streams. The StreamIdleTimer is restarted when the client receives a response on the stream. The stream is reinitialized when the timeout expires. For publish and commit streams, the timeout will still expire even if there is no user activity. --- pubsublite/internal/wire/assigner.go | 3 +- pubsublite/internal/wire/committer.go | 2 +- pubsublite/internal/wire/publisher.go | 2 +- pubsublite/internal/wire/request_timer.go | 104 +++++++++++++++--- .../internal/wire/request_timer_test.go | 82 ++++++++++++++ pubsublite/internal/wire/streams.go | 50 ++++++--- pubsublite/internal/wire/streams_test.go | 87 ++++++++++++--- pubsublite/internal/wire/subscriber.go | 2 +- 8 files changed, 282 insertions(+), 50 deletions(-) diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go index 8f0ad4dc9cf..a8ad950db0a 100644 --- a/pubsublite/internal/wire/assigner.go +++ b/pubsublite/internal/wire/assigner.go @@ -19,6 +19,7 @@ import ( "fmt" "reflect" "sort" + "time" "github.com/google/uuid" "google.golang.org/grpc" @@ -113,7 +114,7 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment receiveAssignment: receiver, metadata: newPubsubMetadata(), } - a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{})) + a.stream = newRetryableStream(ctx, a, settings.Timeout, 10*time.Minute, reflect.TypeOf(pb.PartitionAssignment{})) a.metadata.AddClientInfo(settings.Framework) return a, nil } diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index 2d2a3244658..dcb0b612eb7 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -76,7 +76,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei acks: acks, cursorTracker: newCommitCursorTracker(acks), } - c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{})) + c.stream = newRetryableStream(ctx, c, settings.Timeout, streamIdleTimeout(settings.Timeout), reflect.TypeOf(pb.StreamingCommitCursorResponse{})) c.metadata.AddClientInfo(settings.Framework) backgroundTask := c.commitOffsetToStream diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index 740be64911b..a1e8b39fe18 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -88,7 +88,7 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub metadata: newPubsubMetadata(), } pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch) - pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{})) + pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, streamIdleTimeout(f.settings.Timeout), reflect.TypeOf(pb.PublishResponse{})) pp.metadata.AddTopicRoutingMetadata(pp.topic) pp.metadata.AddClientInfo(f.settings.Framework) return pp diff --git a/pubsublite/internal/wire/request_timer.go b/pubsublite/internal/wire/request_timer.go index ee3a1565e54..54a130c4f2c 100644 --- a/pubsublite/internal/wire/request_timer.go +++ b/pubsublite/internal/wire/request_timer.go @@ -18,39 +18,47 @@ import ( "time" ) -type requestTimerStatus int +// minDuration returns the minimum of two durations. +func minDuration(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + +type timerStatus int const ( - requestTimerNew requestTimerStatus = iota - requestTimerStopped - requestTimerTriggered + timerActive timerStatus = iota + timerStopped + timerTriggered ) -// requestTimer bounds the duration of a request and executes `onTimeout` if -// the timer is triggered. +// requestTimer is a one-shot timer used to bound the duration of a request. It +// executes `onTimeout` if the timeout expires. type requestTimer struct { onTimeout func() timeoutErr error timer *time.Timer mu sync.Mutex - status requestTimerStatus + status timerStatus } -func newRequestTimer(duration time.Duration, onTimeout func(), timeoutErr error) *requestTimer { +func newRequestTimer(timeout time.Duration, onTimeout func(), timeoutErr error) *requestTimer { rt := &requestTimer{ onTimeout: onTimeout, timeoutErr: timeoutErr, - status: requestTimerNew, + status: timerActive, } - rt.timer = time.AfterFunc(duration, rt.onTriggered) + rt.timer = time.AfterFunc(timeout, rt.onTriggered) return rt } func (rt *requestTimer) onTriggered() { rt.mu.Lock() defer rt.mu.Unlock() - if rt.status == requestTimerNew { - rt.status = requestTimerTriggered + if rt.status == timerActive { + rt.status = timerTriggered rt.onTimeout() } } @@ -60,8 +68,8 @@ func (rt *requestTimer) onTriggered() { func (rt *requestTimer) Stop() { rt.mu.Lock() defer rt.mu.Unlock() - if rt.status == requestTimerNew { - rt.status = requestTimerStopped + if rt.status == timerActive { + rt.status = timerStopped rt.timer.Stop() } } @@ -71,8 +79,74 @@ func (rt *requestTimer) Stop() { func (rt *requestTimer) ResolveError(originalErr error) error { rt.mu.Lock() defer rt.mu.Unlock() - if rt.status == requestTimerTriggered { + if rt.status == timerTriggered { return rt.timeoutErr } return originalErr } + +// streamIdleTimer is an approximate timer used to detect idle streams. +// `onTimeout` may be called up to (timeout / pollDivisor) after `timeout` has +// expired. +type streamIdleTimer struct { + timeout time.Duration + onTimeout func() + task *periodicTask + mu sync.Mutex + status timerStatus + startTime time.Time +} + +const ( + pollDivisor = 4 + maxPollInterval = time.Minute +) + +// newStreamIdleTimer creates an unstarted timer. +func newStreamIdleTimer(timeout time.Duration, onTimeout func()) *streamIdleTimer { + st := &streamIdleTimer{ + timeout: timeout, + onTimeout: onTimeout, + status: timerStopped, + } + st.task = newPeriodicTask(minDuration(timeout/pollDivisor, maxPollInterval), st.onPoll) + st.task.Start() + return st +} + +// Restart the timer. Should be called when there is stream activity. +func (st *streamIdleTimer) Restart() { + st.mu.Lock() + defer st.mu.Unlock() + st.status = timerActive + st.startTime = time.Now() +} + +// Stop the timer to prevent it from expiring. +func (st *streamIdleTimer) Stop() { + st.mu.Lock() + defer st.mu.Unlock() + st.status = timerStopped +} + +// Shutdown should be called when the timer is no longer used. +func (st *streamIdleTimer) Shutdown() { + st.Stop() + st.task.Stop() +} + +func (st *streamIdleTimer) onPoll() { + timeoutExpired := func() bool { + st.mu.Lock() + defer st.mu.Unlock() + // Note: time.Since() uses monotonic clock readings. + if st.status == timerActive && time.Since(st.startTime) > st.timeout { + st.status = timerTriggered + return true + } + return false + }() + if timeoutExpired { + st.onTimeout() + } +} diff --git a/pubsublite/internal/wire/request_timer_test.go b/pubsublite/internal/wire/request_timer_test.go index 86c96c2b9dd..25fec35e1db 100644 --- a/pubsublite/internal/wire/request_timer_test.go +++ b/pubsublite/internal/wire/request_timer_test.go @@ -15,12 +15,43 @@ package wire import ( "errors" + "fmt" "testing" "time" "cloud.google.com/go/pubsublite/internal/test" ) +func TestMinDuration(t *testing.T) { + for _, tc := range []struct { + a time.Duration + b time.Duration + want time.Duration + }{ + { + a: 10 * time.Millisecond, + b: 10 * time.Millisecond, + want: 10 * time.Millisecond, + }, + { + a: 10 * time.Millisecond, + b: 9 * time.Millisecond, + want: 9 * time.Millisecond, + }, + { + a: 5 * time.Millisecond, + b: 5 * time.Second, + want: 5 * time.Millisecond, + }, + } { + t.Run(fmt.Sprintf("%s %s", tc.a, tc.b), func(t *testing.T) { + if got := minDuration(tc.a, tc.b); got != tc.want { + t.Errorf("minDuration(%v, %v): got %v, want %v", tc.a, tc.b, got, tc.want) + } + }) + } +} + func TestRequestTimerStop(t *testing.T) { const timeout = 5 * time.Millisecond onTimeout := func() { @@ -59,3 +90,54 @@ func TestRequestTimerExpires(t *testing.T) { t.Errorf("ResolveError() got err: %v, want err: %v", gotErr, timeoutErr) } } + +func TestStreamIdleTimerExpires(t *testing.T) { + const timeout = 5 * time.Millisecond + expired := test.NewCondition("timer expired") + + st := newStreamIdleTimer(timeout, expired.SetDone) + defer st.Shutdown() + st.Restart() + expired.WaitUntilDone(t, serviceTestWaitTimeout) +} + +func TestStreamIdleTimerRestart(t *testing.T) { + const timeout = 20 * time.Millisecond + const delta = 15 * time.Millisecond + expired := test.NewCondition("timer expired") + + st := newStreamIdleTimer(timeout, expired.SetDone) + defer st.Shutdown() + st.Restart() + time.Sleep(delta) + expired.VerifyNotDone(t) + st.Restart() + time.Sleep(delta) + expired.VerifyNotDone(t) + expired.WaitUntilDone(t, serviceTestWaitTimeout) +} + +func TestStreamIdleTimerStop(t *testing.T) { + const timeout = 5 * time.Millisecond + onTimeout := func() { + t.Error("onTimeout should not be called") + } + + st := newStreamIdleTimer(timeout, onTimeout) + defer st.Shutdown() + st.Restart() + st.Stop() + time.Sleep(2 * timeout) +} + +func TestStreamIdleTimerShutdown(t *testing.T) { + const timeout = 5 * time.Millisecond + onTimeout := func() { + t.Error("onTimeout should not be called") + } + + st := newStreamIdleTimer(timeout, onTimeout) + st.Restart() + st.Shutdown() + time.Sleep(2 * timeout) +} diff --git a/pubsublite/internal/wire/streams.go b/pubsublite/internal/wire/streams.go index 76624178c07..f89e73ce970 100644 --- a/pubsublite/internal/wire/streams.go +++ b/pubsublite/internal/wire/streams.go @@ -42,14 +42,23 @@ const ( streamTerminated ) -// Abort a stream initialization attempt after this duration to mitigate delays. -const defaultInitTimeout = 2 * time.Minute +const ( + // Abort a stream initialization attempt after this duration to mitigate delays. + defaultStreamInitTimeout = 2 * time.Minute + + // Reconnect a stream if it has been idle for this duration. + defaultStreamIdleTimeout = 2 * time.Minute +) var errStreamInitTimeout = status.Error(codes.DeadlineExceeded, "pubsublite: stream initialization timed out") type initialResponseRequired bool type notifyReset bool +func streamIdleTimeout(userTimeout time.Duration) time.Duration { + return minDuration(userTimeout/2, defaultStreamIdleTimeout) +} + // streamHandler provides hooks for different Pub/Sub Lite streaming APIs // (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream. // All Pub/Sub Lite streaming APIs implement a similar handshaking protocol, @@ -107,6 +116,7 @@ type retryableStream struct { responseType reflect.Type connectTimeout time.Duration initTimeout time.Duration + idleTimer *streamIdleTimer // Guards access to fields below. mu sync.Mutex @@ -119,21 +129,22 @@ type retryableStream struct { finalErr error } -// newRetryableStream creates a new retryable stream wrapper. `timeout` is the -// maximum duration for reconnection. `responseType` is the type of the response -// proto received on the stream. -func newRetryableStream(ctx context.Context, handler streamHandler, timeout time.Duration, responseType reflect.Type) *retryableStream { - initTimeout := defaultInitTimeout - if timeout < defaultInitTimeout { - initTimeout = timeout - } - return &retryableStream{ +// newRetryableStream creates a new retryable stream wrapper. +// `connectTimeout` is the maximum duration for reconnection, after which the +// stream will be terminated. Streams are reconnected if idle for `idleTimeout`. +// `responseType` is the type of the response proto received on the stream. +func newRetryableStream(ctx context.Context, handler streamHandler, connectTimeout, idleTimeout time.Duration, responseType reflect.Type) *retryableStream { + // Retry initialization before the reconnection timeout. + initTimeout := minDuration(connectTimeout/2, defaultStreamInitTimeout) + rs := &retryableStream{ ctx: ctx, handler: handler, responseType: responseType, - connectTimeout: timeout, + connectTimeout: connectTimeout, initTimeout: initTimeout, } + rs.idleTimer = newStreamIdleTimer(idleTimeout, rs.onStreamIdle) + return rs } // Start establishes a stream connection. It is a no-op if the stream has @@ -211,6 +222,16 @@ func (rs *retryableStream) unsafeClearStream() { } } +func (rs *retryableStream) onStreamIdle() { + rs.mu.Lock() + defer rs.mu.Unlock() + + // Invalidate the current stream handle so that subsequent messages and errors + // are discarded. + rs.unsafeClearStream() + go rs.connectStream(notifyReset(false)) +} + func (rs *retryableStream) newStreamContext() (ctx context.Context, cancel context.CancelFunc) { rs.mu.Lock() defer rs.mu.Unlock() @@ -241,6 +262,7 @@ func (rs *retryableStream) connectStream(notifyReset notifyReset) { } rs.status = streamReconnecting rs.unsafeClearStream() + rs.idleTimer.Stop() return true } if !canReconnect() { @@ -272,6 +294,7 @@ func (rs *retryableStream) connectStream(notifyReset notifyReset) { } rs.status = streamConnected rs.stream = newStream + rs.idleTimer.Restart() return true } if !connected() { @@ -315,7 +338,6 @@ func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, err err } if err = rs.handler.validateInitialResponse(response); err != nil { // An unexpected initial response from the server is a permanent error. - cancelFunc() return 0, false } } @@ -363,6 +385,7 @@ func (rs *retryableStream) listen(recvStream grpc.ClientStream) { if rs.currentStream() != recvStream { break } + rs.idleTimer.Restart() if err != nil { if isRetryableRecvError(err) { go rs.connectStream(notifyReset(isStreamResetSignal(err))) @@ -389,6 +412,7 @@ func (rs *retryableStream) unsafeTerminate(err error) { } rs.status = streamTerminated rs.finalErr = err + rs.idleTimer.Shutdown() rs.unsafeClearStream() // terminate can be called from within a streamHandler method with a lock diff --git a/pubsublite/internal/wire/streams_test.go b/pubsublite/internal/wire/streams_test.go index 368f39dacaf..10458886359 100644 --- a/pubsublite/internal/wire/streams_test.go +++ b/pubsublite/internal/wire/streams_test.go @@ -31,7 +31,7 @@ import ( pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" ) -const defaultStreamTimeout = 30 * time.Second +const streamTestTimeout = 30 * time.Second var errInvalidInitialResponse = errors.New("invalid initial response") @@ -48,7 +48,7 @@ type testStreamHandler struct { pubClient *vkit.PublisherClient } -func newTestStreamHandler(t *testing.T, timeout time.Duration) *testStreamHandler { +func newTestStreamHandler(t *testing.T, connectTimeout, idleTimeout time.Duration) *testStreamHandler { ctx := context.Background() pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) if err != nil { @@ -64,7 +64,7 @@ func newTestStreamHandler(t *testing.T, timeout time.Duration) *testStreamHandle responses: make(chan interface{}, 1), pubClient: pubClient, } - sh.Stream = newRetryableStream(ctx, sh, timeout, reflect.TypeOf(pb.PublishResponse{})) + sh.Stream = newRetryableStream(ctx, sh, connectTimeout, idleTimeout, reflect.TypeOf(pb.PublishResponse{})) return sh } @@ -72,8 +72,8 @@ func (sh *testStreamHandler) NextStatus() streamStatus { select { case status := <-sh.statuses: return status - case <-time.After(defaultStreamTimeout): - sh.t.Errorf("Stream did not change state within %v", defaultStreamTimeout) + case <-time.After(streamTestTimeout): + sh.t.Errorf("Stream did not change state within %v", streamTestTimeout) return streamUninitialized } } @@ -82,8 +82,8 @@ func (sh *testStreamHandler) NextResponse() interface{} { select { case response := <-sh.responses: return response - case <-time.After(defaultStreamTimeout): - sh.t.Errorf("Stream did not receive response within %v", defaultStreamTimeout) + case <-time.After(streamTestTimeout): + sh.t.Errorf("Stream did not receive response within %v", streamTestTimeout) return nil } } @@ -118,7 +118,7 @@ func (sh *testStreamHandler) onResponse(response interface{}) { } func TestRetryableStreamStartOnce(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) @@ -150,7 +150,7 @@ func TestRetryableStreamStartOnce(t *testing.T) { } func TestRetryableStreamStopWhileConnecting(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) @@ -182,7 +182,7 @@ func TestRetryableStreamStopWhileConnecting(t *testing.T) { } func TestRetryableStreamStopAbortsRetries(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) @@ -216,7 +216,7 @@ func TestRetryableStreamStopAbortsRetries(t *testing.T) { } func TestRetryableStreamConnectRetries(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) verifiers := test.NewVerifiers(t) @@ -252,7 +252,7 @@ func TestRetryableStreamConnectRetries(t *testing.T) { } func TestRetryableStreamConnectPermanentFailure(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) permanentErr := status.Error(codes.PermissionDenied, "denied") verifiers := test.NewVerifiers(t) @@ -283,8 +283,8 @@ func TestRetryableStreamConnectPermanentFailure(t *testing.T) { func TestRetryableStreamConnectTimeout(t *testing.T) { // Set a very low timeout to ensure no retries. timeout := time.Millisecond - pub := newTestStreamHandler(t, timeout) - pub.Stream.initTimeout = defaultInitTimeout + pub := newTestStreamHandler(t, timeout, streamTestTimeout) + pub.Stream.initTimeout = defaultStreamInitTimeout wantErr := status.Error(codes.DeadlineExceeded, "timeout") verifiers := test.NewVerifiers(t) @@ -319,7 +319,7 @@ func TestRetryableStreamInitTimeout(t *testing.T) { const streamInitTimeout = 50 * time.Millisecond const streamResponseDelay = 75 * time.Millisecond - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) pub.Stream.initTimeout = streamInitTimeout verifiers := test.NewVerifiers(t) @@ -356,7 +356,7 @@ func TestRetryableStreamInitTimeout(t *testing.T) { } func TestRetryableStreamSendReceive(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) req := msgPubReq(&pb.PubSubMessage{Data: []byte("msg")}) wantResp := msgPubResp(5) @@ -401,7 +401,7 @@ func TestRetryableStreamSendReceive(t *testing.T) { } func TestRetryableStreamConnectReceivesResetSignal(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) verifiers := test.NewVerifiers(t) @@ -438,7 +438,7 @@ func TestRetryableStreamConnectReceivesResetSignal(t *testing.T) { } func TestRetryableStreamDisconnectedWithResetSignal(t *testing.T) { - pub := newTestStreamHandler(t, defaultStreamTimeout) + pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout) verifiers := test.NewVerifiers(t) @@ -480,3 +480,54 @@ func TestRetryableStreamDisconnectedWithResetSignal(t *testing.T) { t.Errorf("Stream final err: got (%v), want ", gotErr) } } + +func TestRetryableStreamIdleStreamDetection(t *testing.T) { + pub := newTestStreamHandler(t, streamTestTimeout, 50*time.Millisecond) + req := msgPubReq(&pb.PubSubMessage{Data: []byte("msg")}) + wantResp := msgPubResp(5) + + verifiers := test.NewVerifiers(t) + + stream1 := test.NewRPCVerifier(t) + stream1.Push(pub.InitialReq, initPubResp(), nil) + verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1) + + stream2 := test.NewRPCVerifier(t) + stream2.Push(pub.InitialReq, initPubResp(), nil) + stream2.Push(req, wantResp, nil) + verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + // First stream connection becomes idle (no responses received). + pub.Stream.Start() + if got, want := pub.NextStatus(), streamReconnecting; got != want { + t.Errorf("Stream status change: got %d, want %d", got, want) + } + if got, want := pub.NextStatus(), streamConnected; got != want { + t.Errorf("Stream status change: got %d, want %d", got, want) + } + + // Second stream connection. + if got, want := pub.NextStatus(), streamReconnecting; got != want { + t.Errorf("Stream status change: got %d, want %d", got, want) + } + if got, want := pub.NextStatus(), streamConnected; got != want { + t.Errorf("Stream status change: got %d, want %d", got, want) + } + if got, want := pub.Stream.Send(req), true; got != want { + t.Errorf("Stream send: got %v, want %v", got, want) + } + if gotResp := pub.NextResponse(); !testutil.Equal(gotResp, wantResp) { + t.Errorf("Stream response: got %v, want %v", gotResp, wantResp) + } + + pub.Stream.Stop() + if got, want := pub.NextStatus(), streamTerminated; got != want { + t.Errorf("Stream status change: got %d, want %d", got, want) + } + if gotErr := pub.Stream.Error(); gotErr != nil { + t.Errorf("Stream final err: got (%v), want ", gotErr) + } +} diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index d8b84b433d8..c357b718ad5 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -166,7 +166,7 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages), metadata: newPubsubMetadata(), } - s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{})) + s.stream = newRetryableStream(ctx, s, settings.Timeout, streamIdleTimeout(settings.Timeout), reflect.TypeOf(pb.SubscribeResponse{})) s.metadata.AddSubscriptionRoutingMetadata(s.subscription) s.metadata.AddClientInfo(settings.Framework)