diff --git a/pubsublite/rpc.go b/pubsublite/rpc.go new file mode 100644 index 00000000000..94ac1d7dbf5 --- /dev/null +++ b/pubsublite/rpc.go @@ -0,0 +1,100 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + gax "github.com/googleapis/gax-go/v2" +) + +// streamRetryer implements the retry policy for establishing gRPC stream +// connections. +type streamRetryer struct { + bo gax.Backoff + deadline time.Time +} + +func newStreamRetryer(timeout time.Duration) *streamRetryer { + return &streamRetryer{ + bo: gax.Backoff{ + Initial: 10 * time.Millisecond, + Max: 10 * time.Second, + Multiplier: 2, + }, + deadline: time.Now().Add(timeout), + } +} + +func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) { + if time.Now().After(r.deadline) { + return 0, false + } + if isRetryableSendError(err) { + return r.bo.Pause(), true + } + return 0, false +} + +func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) { + if time.Now().After(r.deadline) { + return 0, false + } + if isRetryableRecvError(err) { + return r.bo.Pause(), true + } + return 0, false +} + +func isRetryableSendCode(code codes.Code) bool { + switch code { + // Client-side errors that occur during grpc.ClientStream.SendMsg() have a + // smaller set of retryable codes. + case codes.DeadlineExceeded, codes.Unavailable: + return true + default: + return false + } +} + +func isRetryableRecvCode(code codes.Code) bool { + switch code { + // Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java + case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown: + return true + default: + return false + } +} + +func isRetryableSendError(err error) bool { + return isRetryableStreamError(err, isRetryableSendCode) +} + +func isRetryableRecvError(err error) bool { + return isRetryableStreamError(err, isRetryableRecvCode) +} + +func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool { + s, ok := status.FromError(err) + if !ok { + // Includes io.EOF, normal stream close. + // Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go + return true + } + return isEligible(s.Code()) +} diff --git a/pubsublite/streams.go b/pubsublite/streams.go new file mode 100644 index 00000000000..2e1cb321a41 --- /dev/null +++ b/pubsublite/streams.go @@ -0,0 +1,338 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "context" + "io" + "reflect" + "sync" + "time" + + "google.golang.org/grpc" + + gax "github.com/googleapis/gax-go/v2" +) + +// streamStatus is the status of a retryableStream. A stream starts off +// uninitialized. While it is active, it can transition between reconnecting and +// connected due to retryable errors. When a permanent error occurs, the stream +// is terminated and cannot be reconnected. +type streamStatus int + +const ( + streamUninitialized streamStatus = 0 + streamReconnecting streamStatus = 1 + streamConnected streamStatus = 2 + streamTerminated streamStatus = 3 +) + +// streamHandler provides hooks for different Pub/Sub Lite streaming APIs +// (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream. +// All Pub/Sub Lite streaming APIs implement a similar handshaking protocol, +// where an initial request and response must be transmitted before other +// requests can be sent over the stream. +// +// streamHandler methods must not be called while holding retryableStream.mu in +// order to prevent the streamHandler calling back into the retryableStream and +// deadlocking. +type streamHandler interface { + // newStream implementations must create the client stream with the given + // (cancellable) context. + newStream(context.Context) (grpc.ClientStream, error) + initialRequest() interface{} + validateInitialResponse(interface{}) error + + // onStreamStatusChange is used to notify stream handlers when the stream has + // changed state. In particular, the `streamTerminated` state must be handled. + // retryableStream.Error() returns the error that caused the stream to + // terminate. Stream handlers should perform any necessary reset of state upon + // `streamConnected`. + onStreamStatusChange(streamStatus) + // onResponse forwards a response received on the stream to the stream + // handler. + onResponse(interface{}) +} + +// retryableStream is a wrapper around a bidirectional gRPC client stream to +// handle automatic reconnection when the stream breaks. +// +// A retryableStream cycles between the following goroutines: +// Start() --> reconnect() <--> listen() +// terminate() can be called at any time, either by the client to force stream +// closure, or as a result of an unretryable error. +// +// Safe to call capitalized methods from multiple goroutines. All other methods +// are private implementation. +type retryableStream struct { + // Immutable after creation. + ctx context.Context + handler streamHandler + responseType reflect.Type + timeout time.Duration + + // Guards access to fields below. + mu sync.Mutex + + // The current connected stream. + stream grpc.ClientStream + // Function to cancel the current stream (which may be reconnecting). + cancelStream context.CancelFunc + status streamStatus + finalErr error +} + +// newRetryableStream creates a new retryable stream wrapper. `timeout` is the +// maximum duration for reconnection. `responseType` is the type of the response +// proto received on the stream. +func newRetryableStream(ctx context.Context, handler streamHandler, timeout time.Duration, responseType reflect.Type) *retryableStream { + return &retryableStream{ + ctx: ctx, + handler: handler, + responseType: responseType, + timeout: timeout, + } +} + +// Start establishes a stream connection. It is a no-op if the stream has +// already started. +func (rs *retryableStream) Start() { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.status != streamUninitialized { + return + } + go rs.reconnect() +} + +// Stop gracefully closes the stream without error. +func (rs *retryableStream) Stop() { + rs.terminate(nil) +} + +// Send attempts to send the request to the underlying stream and returns true +// if successfully sent. Returns false if an error occurred or a reconnection is +// in progress. +func (rs *retryableStream) Send(request interface{}) (sent bool) { + rs.mu.Lock() + + if rs.stream != nil { + err := rs.stream.SendMsg(request) + // Note: if SendMsg returns an error, the stream is aborted. + switch { + case err == nil: + sent = true + case err == io.EOF: + // If SendMsg returns io.EOF, RecvMsg will return the status of the + // stream. Nothing to do here. + break + case isRetryableSendError(err): + go rs.reconnect() + default: + rs.mu.Unlock() // terminate acquires the mutex. + rs.terminate(err) + return + } + } + + rs.mu.Unlock() + return +} + +// Status returns the current status of the retryable stream. +func (rs *retryableStream) Status() streamStatus { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.status +} + +// Error returns the error that caused the stream to terminate. Can be nil if it +// was initiated by Stop(). +func (rs *retryableStream) Error() error { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.finalErr +} + +func (rs *retryableStream) currentStream() grpc.ClientStream { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.stream +} + +// clearStream must be called with the retryableStream.mu locked. +func (rs *retryableStream) clearStream() { + if rs.cancelStream != nil { + // If the stream did not already abort due to error, this will abort it. + rs.cancelStream() + rs.cancelStream = nil + } + if rs.stream != nil { + rs.stream = nil + } +} + +func (rs *retryableStream) setCancel(cancel context.CancelFunc) { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.cancelStream = cancel +} + +// reconnect attempts to establish a valid connection with the server. Due to +// the potential high latency, initNewStream() should not be done while holding +// retryableStream.mu. Hence we need to handle the stream being force terminated +// during reconnection. +// +// Intended to be called in a goroutine. It ends once the connection has been +// established or the stream terminated. +func (rs *retryableStream) reconnect() { + canReconnect := func() bool { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.status == streamReconnecting { + // There can only be 1 goroutine reconnecting. + return false + } + if rs.status == streamTerminated { + return false + } + rs.status = streamReconnecting + rs.clearStream() + return true + } + if !canReconnect() { + return + } + rs.handler.onStreamStatusChange(streamReconnecting) + + newStream, cancelFunc, err := rs.initNewStream() + if err != nil { + rs.terminate(err) + return + } + + connected := func() bool { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.status == streamTerminated { + rs.clearStream() + return false + } + rs.status = streamConnected + rs.stream = newStream + rs.cancelStream = cancelFunc + go rs.listen(newStream) + return true + } + if !connected() { + return + } + rs.handler.onStreamStatusChange(streamConnected) +} + +func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelFunc context.CancelFunc, err error) { + r := newStreamRetryer(rs.timeout) + for { + backoff, shouldRetry := func() (time.Duration, bool) { + defer func() { + if err != nil && cancelFunc != nil { + cancelFunc() + cancelFunc = nil + newStream = nil + } + }() + + var cctx context.Context + cctx, cancelFunc = context.WithCancel(rs.ctx) + // Store the cancel func to quickly cancel reconnecting if the stream is + // terminated. + rs.setCancel(cancelFunc) + + newStream, err = rs.handler.newStream(cctx) + if err != nil { + return r.RetryRecv(err) + } + if err = newStream.SendMsg(rs.handler.initialRequest()); err != nil { + return r.RetrySend(err) + } + response := reflect.New(rs.responseType).Interface() + if err = newStream.RecvMsg(response); err != nil { + return r.RetryRecv(err) + } + if err = rs.handler.validateInitialResponse(response); err != nil { + // An unexpected initial response from the server is a permanent error. + return 0, false + } + + // We have a valid connection and should break from the outer loop. + return 0, false + }() + + if !shouldRetry { + break + } + if err = gax.Sleep(rs.ctx, backoff); err != nil { + break + } + } + return +} + +// listen receives responses from the current stream. It initiates reconnection +// upon retryable errors or terminates the stream upon permanent error. +// +// Intended to be called in a goroutine. It ends when recvStream has closed. +func (rs *retryableStream) listen(recvStream grpc.ClientStream) { + for { + response := reflect.New(rs.responseType).Interface() + err := recvStream.RecvMsg(response) + + // If the current stream has changed while listening, any errors or messages + // received now are obsolete. Discard and end the goroutine. Assume the + // stream has been cancelled elsewhere. + if rs.currentStream() != recvStream { + break + } + if err != nil { + if isRetryableRecvError(err) { + go rs.reconnect() + } else { + rs.terminate(err) + } + break + } + rs.handler.onResponse(response) + } +} + +// terminate forces the stream to terminate with the given error (can be nil) +// Is a no-op if the stream has already terminated. +func (rs *retryableStream) terminate(err error) { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.status == streamTerminated { + return + } + rs.status = streamTerminated + rs.finalErr = err + rs.clearStream() + + // terminate can be called from within a streamHandler method with a lock + // held. So notify from a goroutine to prevent deadlock. + go rs.handler.onStreamStatusChange(streamTerminated) +}