Skip to content

Commit

Permalink
fix(pubsublite): improve handling of backend unavailability
Browse files Browse the repository at this point in the history
- define ErrBackendUnavailable
- mitigate stream initialization delays
  • Loading branch information
tmdiep committed Mar 22, 2021
1 parent 4a0a22e commit 75fdabe
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 61 deletions.
5 changes: 5 additions & 0 deletions pubsublite/internal/wire/errors.go
Expand Up @@ -42,6 +42,11 @@ var (
// cannot perform an operation because it has stopped or is in the process of
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")

// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout.
ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable")
)

func wrapError(context, resource string, err error) error {
Expand Down
19 changes: 11 additions & 8 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -22,7 +22,9 @@ import (

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -50,26 +52,24 @@ func newStreamRetryer(timeout time.Duration) *streamRetryer {
}
}

func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableSendError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
if time.Now().After(r.deadline) {
return 0, false
}
func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) {
if isRetryableRecvError(err) {
return r.bo.Pause(), true
}
return 0, false
}

func (r *streamRetryer) ExceededDeadline() bool {
return time.Now().After(r.deadline)
}

func isRetryableSendCode(code codes.Code) bool {
switch code {
// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
Expand Down Expand Up @@ -135,6 +135,9 @@ const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
})),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -82,7 +82,7 @@ var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Minute,
Timeout: 72 * time.Hour,
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
Expand Down Expand Up @@ -146,7 +146,7 @@ type ReceiveSettings struct {
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
Timeout: 60 * time.Minute,
Timeout: 72 * time.Hour,
}

func validateReceiveSettings(settings ReceiveSettings) error {
Expand Down
106 changes: 80 additions & 26 deletions pubsublite/internal/wire/streams.go
Expand Up @@ -15,11 +15,13 @@ package wire

import (
"context"
"errors"
"io"
"reflect"
"sync"
"time"

"golang.org/x/xerrors"
"google.golang.org/grpc"

gax "github.com/googleapis/gax-go/v2"
Expand All @@ -32,14 +34,20 @@ import (
type streamStatus int

const (
streamUninitialized streamStatus = 0
streamReconnecting streamStatus = 1
streamConnected streamStatus = 2
streamTerminated streamStatus = 3
streamUninitialized streamStatus = iota
streamReconnecting
streamConnected
streamTerminated
)

type initialResponseRequired bool

// Abort a stream initialization attempt after this duration to mitigate server
// delays.
const defaultStreamInitTimeout = 1 * time.Minute

var errStreamInitTimeout = errors.New("pubsublite: stream initialization timed out")

// streamHandler provides hooks for different Pub/Sub Lite streaming APIs
// (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream.
// All Pub/Sub Lite streaming APIs implement a similar handshaking protocol,
Expand Down Expand Up @@ -88,10 +96,11 @@ type streamHandler interface {
// are private implementation.
type retryableStream struct {
// Immutable after creation.
ctx context.Context
handler streamHandler
responseType reflect.Type
timeout time.Duration
ctx context.Context
handler streamHandler
responseType reflect.Type
timeout time.Duration
streamInitTimeout time.Duration

// Guards access to fields below.
mu sync.Mutex
Expand All @@ -109,10 +118,11 @@ type retryableStream struct {
// proto received on the stream.
func newRetryableStream(ctx context.Context, handler streamHandler, timeout time.Duration, responseType reflect.Type) *retryableStream {
return &retryableStream{
ctx: ctx,
handler: handler,
responseType: responseType,
timeout: timeout,
ctx: ctx,
handler: handler,
responseType: responseType,
timeout: timeout,
streamInitTimeout: defaultStreamInitTimeout,
}
}

Expand Down Expand Up @@ -210,13 +220,10 @@ func (rs *retryableStream) connectStream() {
rs.mu.Lock()
defer rs.mu.Unlock()

if rs.status == streamReconnecting {
if rs.status == streamReconnecting || rs.status == streamTerminated {
// There can only be 1 goroutine reconnecting.
return false
}
if rs.status == streamTerminated {
return false
}
rs.status = streamReconnecting
rs.unsafeClearStream()
return true
Expand Down Expand Up @@ -253,51 +260,98 @@ func (rs *retryableStream) connectStream() {
rs.listen(newStream)
}

type initStatus int

const (
initInProgress initStatus = iota
initCanceled
initSuccess
)

func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelFunc context.CancelFunc, err error) {
r := newStreamRetryer(rs.timeout)
var initStatusMu sync.Mutex

for {
backoff, shouldRetry := func() (time.Duration, bool) {
defer func() {
if err != nil && cancelFunc != nil {
cancelFunc()
cancelFunc = nil
newStream = nil
}
}()

var cctx context.Context
cctx, cancelFunc = context.WithCancel(rs.ctx)
// Store the cancel func to quickly cancel reconnecting if the stream is
// terminated.
rs.setCancel(cancelFunc)

// Bounds the duration of the stream initialization attempt - from stream
// creation to the initial response.
status := initInProgress
initTimer := time.AfterFunc(rs.streamInitTimeout, func() {
initStatusMu.Lock()
defer initStatusMu.Unlock()
if status == initInProgress {
status = initCanceled
cancelFunc()
}
})
defer initTimer.Stop()

resolveError := func(err error) error {
initStatusMu.Lock()
defer initStatusMu.Unlock()
// If initTimer canceled the stream, the gRPC error would be Canceled
// (non-retryable) and should be replaced.
if status == initCanceled {
return errStreamInitTimeout
}
status = initCanceled
return err
}

newStream, err = rs.handler.newStream(cctx)
if err != nil {
err = resolveError(err)
return r.RetryRecv(err)
}
initReq, needsResponse := rs.handler.initialRequest()
if err = newStream.SendMsg(initReq); err != nil {
err = resolveError(err)
return r.RetrySend(err)
}
if needsResponse {
response := reflect.New(rs.responseType).Interface()
if err = newStream.RecvMsg(response); err != nil {
err = resolveError(err)
return r.RetryRecv(err)
}
if err = rs.handler.validateInitialResponse(response); err != nil {
// An unexpected initial response from the server is a permanent error.
cancelFunc()
return 0, false
}
}

initStatusMu.Lock()
defer initStatusMu.Unlock()
if status == initCanceled {
// Unfortunately, the initTimer just fired, so the stream needs to be
// reconnected.
err = errStreamInitTimeout
return r.RetryRecv(err)
}
// We have a valid connection and should break from the outer loop.
status = initSuccess
return 0, false
}()

if !shouldRetry {
if (shouldRetry || err != nil) && cancelFunc != nil {
// Ensure that streams aren't leaked.
cancelFunc()
cancelFunc = nil
newStream = nil
}
if !shouldRetry || rs.Status() == streamTerminated {
break
}
if rs.Status() == streamTerminated {
if r.ExceededDeadline() {
err = xerrors.Errorf("%v: %w", err, ErrBackendUnavailable)
break
}
if err = gax.Sleep(rs.ctx, backoff); err != nil {
Expand Down
44 changes: 42 additions & 2 deletions pubsublite/internal/wire/streams_test.go
Expand Up @@ -22,6 +22,7 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -250,6 +251,45 @@ func TestRetryableStreamConnectRetries(t *testing.T) {
}
}

func TestRetryableStreamInitTimeout(t *testing.T) {
const streamInitTimeout = 50 * time.Millisecond
const streamResponseDelay = 75 * time.Millisecond

pub := newTestStreamHandler(t, defaultStreamTimeout)
pub.Stream.streamInitTimeout = streamInitTimeout

verifiers := test.NewVerifiers(t)

// First stream will have a delayed response.
stream1 := test.NewRPCVerifier(t)
barrier1 := stream1.PushWithBarrier(pub.InitialReq, initPubResp(), nil)
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1)

// Second stream should succeed.
stream2 := test.NewRPCVerifier(t)
stream2.Push(pub.InitialReq, initPubResp(), nil)
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

pub.Stream.Start()
if got, want := pub.NextStatus(), streamReconnecting; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}

time.Sleep(streamResponseDelay)
barrier1.Release()
if got, want := pub.NextStatus(), streamConnected; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}

pub.Stream.Stop()
if got, want := pub.NextStatus(), streamTerminated; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}
}

func TestRetryableStreamConnectPermanentFailure(t *testing.T) {
pub := newTestStreamHandler(t, defaultStreamTimeout)
permanentErr := status.Error(codes.PermissionDenied, "denied")
Expand Down Expand Up @@ -308,8 +348,8 @@ func TestRetryableStreamConnectTimeout(t *testing.T) {
if pub.Stream.currentStream() != nil {
t.Error("Client stream should be nil")
}
if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, wantErr)
if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable)
}
}

Expand Down

0 comments on commit 75fdabe

Please sign in to comment.