Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): improve handling of backend unavailability #3846

Merged
merged 5 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pubsublite/internal/wire/errors.go
Expand Up @@ -42,6 +42,11 @@ var (
// cannot perform an operation because it has stopped or is in the process of
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")

// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout.
ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable")
)

func wrapError(context, resource string, err error) error {
Expand Down
19 changes: 11 additions & 8 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -22,7 +22,9 @@ import (

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -50,26 +52,24 @@ func newStreamRetryer(timeout time.Duration) *streamRetryer {
}
}

func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableSendError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableRecvError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) ExceededDeadline() bool {
return time.Now().After(r.deadline)
}

func isRetryableSendCode(code codes.Code) bool {
switch code {
// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
Expand Down Expand Up @@ -135,6 +135,9 @@ const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
Time: 5 * time.Minute,
})),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -82,7 +82,7 @@ var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Minute,
Timeout: 72 * time.Hour,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little arbitrary. Is there an InfiniteFuture() equivalent on Go? Same comment on the ReceiveSettings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a defined infinite duration. It's an int64, so we can compute the max value, however I wanted to avoid depending on implementation details.

I changed this to 1 week, as it would be concerning if a client wasn't able to connect within that time. The original 3 days was to allow clients to cover a weekend.

// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
Expand Down Expand Up @@ -146,7 +146,7 @@ type ReceiveSettings struct {
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
Timeout: 60 * time.Minute,
Timeout: 72 * time.Hour,
}

func validateReceiveSettings(settings ReceiveSettings) error {
Expand Down
106 changes: 80 additions & 26 deletions pubsublite/internal/wire/streams.go
Expand Up @@ -15,11 +15,13 @@ package wire

import (
"context"
"errors"
"io"
"reflect"
"sync"
"time"

"golang.org/x/xerrors"
"google.golang.org/grpc"

gax "github.com/googleapis/gax-go/v2"
Expand All @@ -32,14 +34,20 @@ import (
type streamStatus int

const (
streamUninitialized streamStatus = 0
streamReconnecting streamStatus = 1
streamConnected streamStatus = 2
streamTerminated streamStatus = 3
streamUninitialized streamStatus = iota
streamReconnecting
streamConnected
streamTerminated
)

type initialResponseRequired bool

// Abort a stream initialization attempt after this duration to mitigate server
// delays.
const defaultStreamInitTimeout = 1 * time.Minute

var errStreamInitTimeout = errors.New("pubsublite: stream initialization timed out")

// streamHandler provides hooks for different Pub/Sub Lite streaming APIs
// (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream.
// All Pub/Sub Lite streaming APIs implement a similar handshaking protocol,
Expand Down Expand Up @@ -88,10 +96,11 @@ type streamHandler interface {
// are private implementation.
type retryableStream struct {
// Immutable after creation.
ctx context.Context
handler streamHandler
responseType reflect.Type
timeout time.Duration
ctx context.Context
handler streamHandler
responseType reflect.Type
timeout time.Duration
streamInitTimeout time.Duration
tmdiep marked this conversation as resolved.
Show resolved Hide resolved

// Guards access to fields below.
mu sync.Mutex
Expand All @@ -109,10 +118,11 @@ type retryableStream struct {
// proto received on the stream.
func newRetryableStream(ctx context.Context, handler streamHandler, timeout time.Duration, responseType reflect.Type) *retryableStream {
return &retryableStream{
ctx: ctx,
handler: handler,
responseType: responseType,
timeout: timeout,
ctx: ctx,
handler: handler,
responseType: responseType,
timeout: timeout,
streamInitTimeout: defaultStreamInitTimeout,
}
}

Expand Down Expand Up @@ -210,13 +220,10 @@ func (rs *retryableStream) connectStream() {
rs.mu.Lock()
defer rs.mu.Unlock()

if rs.status == streamReconnecting {
if rs.status == streamReconnecting || rs.status == streamTerminated {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
// There can only be 1 goroutine reconnecting.
return false
}
if rs.status == streamTerminated {
return false
}
rs.status = streamReconnecting
rs.unsafeClearStream()
return true
Expand Down Expand Up @@ -253,51 +260,98 @@ func (rs *retryableStream) connectStream() {
rs.listen(newStream)
}

type initStatus int

const (
initInProgress initStatus = iota
initCanceled
initSuccess
)

func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelFunc context.CancelFunc, err error) {
r := newStreamRetryer(rs.timeout)
var initStatusMu sync.Mutex

for {
backoff, shouldRetry := func() (time.Duration, bool) {
defer func() {
if err != nil && cancelFunc != nil {
cancelFunc()
cancelFunc = nil
newStream = nil
}
}()

var cctx context.Context
cctx, cancelFunc = context.WithCancel(rs.ctx)
// Store the cancel func to quickly cancel reconnecting if the stream is
// terminated.
rs.setCancel(cancelFunc)

// Bounds the duration of the stream initialization attempt - from stream
// creation to the initial response.
status := initInProgress
initTimer := time.AfterFunc(rs.streamInitTimeout, func() {
initStatusMu.Lock()
defer initStatusMu.Unlock()
if status == initInProgress {
status = initCanceled
cancelFunc()
}
})
defer initTimer.Stop()

resolveError := func(err error) error {
initStatusMu.Lock()
defer initStatusMu.Unlock()
// If initTimer canceled the stream, the gRPC error would be Canceled
// (non-retryable) and should be replaced.
if status == initCanceled {
return errStreamInitTimeout
}
status = initCanceled
return err
}

newStream, err = rs.handler.newStream(cctx)
if err != nil {
err = resolveError(err)
return r.RetryRecv(err)
}
initReq, needsResponse := rs.handler.initialRequest()
if err = newStream.SendMsg(initReq); err != nil {
err = resolveError(err)
return r.RetrySend(err)
}
if needsResponse {
response := reflect.New(rs.responseType).Interface()
if err = newStream.RecvMsg(response); err != nil {
err = resolveError(err)
return r.RetryRecv(err)
}
if err = rs.handler.validateInitialResponse(response); err != nil {
// An unexpected initial response from the server is a permanent error.
cancelFunc()
return 0, false
}
}

initStatusMu.Lock()
defer initStatusMu.Unlock()
if status == initCanceled {
// Unfortunately, the initTimer just fired, so the stream needs to be
// reconnected.
err = errStreamInitTimeout
return r.RetryRecv(err)
}
// We have a valid connection and should break from the outer loop.
status = initSuccess
return 0, false
}()

if !shouldRetry {
if (shouldRetry || err != nil) && cancelFunc != nil {
// Ensure that streams aren't leaked.
cancelFunc()
cancelFunc = nil
newStream = nil
}
if !shouldRetry || rs.Status() == streamTerminated {
break
}
if rs.Status() == streamTerminated {
if r.ExceededDeadline() {
err = xerrors.Errorf("%v: %w", err, ErrBackendUnavailable)
break
}
if err = gax.Sleep(rs.ctx, backoff); err != nil {
Expand Down
44 changes: 42 additions & 2 deletions pubsublite/internal/wire/streams_test.go
Expand Up @@ -22,6 +22,7 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -250,6 +251,45 @@ func TestRetryableStreamConnectRetries(t *testing.T) {
}
}

func TestRetryableStreamInitTimeout(t *testing.T) {
const streamInitTimeout = 50 * time.Millisecond
const streamResponseDelay = 75 * time.Millisecond

pub := newTestStreamHandler(t, defaultStreamTimeout)
pub.Stream.streamInitTimeout = streamInitTimeout

verifiers := test.NewVerifiers(t)

// First stream will have a delayed response.
stream1 := test.NewRPCVerifier(t)
barrier1 := stream1.PushWithBarrier(pub.InitialReq, initPubResp(), nil)
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1)

// Second stream should succeed.
stream2 := test.NewRPCVerifier(t)
stream2.Push(pub.InitialReq, initPubResp(), nil)
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

pub.Stream.Start()
if got, want := pub.NextStatus(), streamReconnecting; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}

time.Sleep(streamResponseDelay)
barrier1.Release()
if got, want := pub.NextStatus(), streamConnected; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}

pub.Stream.Stop()
if got, want := pub.NextStatus(), streamTerminated; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}
}

func TestRetryableStreamConnectPermanentFailure(t *testing.T) {
pub := newTestStreamHandler(t, defaultStreamTimeout)
permanentErr := status.Error(codes.PermissionDenied, "denied")
Expand Down Expand Up @@ -308,8 +348,8 @@ func TestRetryableStreamConnectTimeout(t *testing.T) {
if pub.Stream.currentStream() != nil {
t.Error("Client stream should be nil")
}
if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, wantErr)
if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable)
}
}

Expand Down