Skip to content

Commit

Permalink
Remove the start result channel
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Oct 27, 2020
1 parent e45a47a commit 3989170
Showing 1 changed file with 13 additions and 27 deletions.
40 changes: 13 additions & 27 deletions pubsublite/streams.go
Expand Up @@ -21,17 +21,10 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

gax "github.com/googleapis/gax-go/v2"
)

var (
errDuplicateStreamStart = status.Error(codes.Internal, "pubsublite: stream has already been started")
errStreamStopped = status.Error(codes.Canceled, "pubsublite: stream has stopped")
)

// streamStatus is the state of a gRPC client stream. A stream starts off
// uninitialized. While it is active, it can transition between reconnecting and
// connected due to retryable errors. When a permanent error occurs, the stream
Expand Down Expand Up @@ -79,6 +72,9 @@ type streamHandler interface {
// Start() --> reconnect() <--> listen()
// terminate() can be called at any time, either by the client to force stream
// closure, or as a result of an unretryable error.
//
// Safe to call capitalized methods from multiple goroutines, All other methods
// are private implementation.
type retryableStream struct {
// Immutable after creation.
ctx context.Context
Expand Down Expand Up @@ -109,17 +105,16 @@ func newRetryableStream(ctx context.Context, handler streamHandler, timeout time
}
}

// Start establishes a stream connection and returns the result to the given
// channel.
func (rs *retryableStream) Start(result chan error) {
// Start establishes a stream connection. It is a no-op if the stream is started
// again.
func (rs *retryableStream) Start() {
rs.mu.Lock()
defer rs.mu.Unlock()

if rs.status != streamUninitialized {
result <- errDuplicateStreamStart
return
}
go rs.reconnect(result)
go rs.reconnect()
}

// Stop gracefully closes the stream without error.
Expand All @@ -144,7 +139,7 @@ func (rs *retryableStream) Send(request interface{}) (sent bool) {
// stream. Nothing to do here.
break
case isRetryableSendError(err):
go rs.reconnect(nil)
go rs.reconnect()
default:
rs.mu.Unlock() // terminate acquires the mutex.
rs.terminate(err)
Expand Down Expand Up @@ -202,14 +197,7 @@ func (rs *retryableStream) setCancel(cancel context.CancelFunc) {
//
// Intended to be called in a goroutine. It ends once the connection has been
// established or the stream terminated.
func (rs *retryableStream) reconnect(result chan error) {
var outerErr error
defer func() {
if result != nil {
result <- outerErr
}
}()

func (rs *retryableStream) reconnect() {
canReconnect := func() bool {
rs.mu.Lock()
defer rs.mu.Unlock()
Expand All @@ -219,7 +207,6 @@ func (rs *retryableStream) reconnect(result chan error) {
return false
}
if rs.status == streamTerminated {
outerErr = errStreamStopped
return false
}
rs.status = streamReconnecting
Expand All @@ -231,9 +218,9 @@ func (rs *retryableStream) reconnect(result chan error) {
}
rs.handler.onStreamStatusChange(streamReconnecting)

newStream, cancelFunc, outerErr := rs.initNewStream()
if outerErr != nil {
rs.terminate(outerErr)
newStream, cancelFunc, err := rs.initNewStream()
if err != nil {
rs.terminate(err)
return
}

Expand All @@ -242,7 +229,6 @@ func (rs *retryableStream) reconnect(result chan error) {
defer rs.mu.Unlock()

if rs.status == streamTerminated {
outerErr = errStreamStopped
rs.clearStream()
return false
}
Expand Down Expand Up @@ -322,7 +308,7 @@ func (rs *retryableStream) listen(recvStream grpc.ClientStream) {
}
if err != nil {
if isRetryableRecvError(err) {
go rs.reconnect(nil)
go rs.reconnect()
} else {
rs.terminate(err)
}
Expand Down

0 comments on commit 3989170

Please sign in to comment.