Skip to content

Commit

Permalink
fix(bigtable): Retry on RST_STREAM error (googleapis#9673)
Browse files Browse the repository at this point in the history
* fix(bigtable): Retry on RST_STREAM error

* refactor(bigtable): Take list of error messages to make generic
  • Loading branch information
bhshkh committed May 16, 2024
1 parent 606f925 commit d4da4a5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
41 changes: 39 additions & 2 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net/url"
"strconv"
"strings"
"time"

btopt "cloud.google.com/go/bigtable/internal/option"
Expand Down Expand Up @@ -113,15 +114,51 @@ var (
isIdempotentRetryCode = make(map[codes.Code]bool)
retryOptions = []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes(idempotentRetryCodes, gax.Backoff{
backoff := gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 2 * time.Second,
Multiplier: 1.2,
})
}
return &bigtableRetryer{
Retryer: gax.OnCodes(idempotentRetryCodes, backoff),
Backoff: backoff,
}
}),
}
retryableInternalErrMsgs = []string{
"stream terminated by RST_STREAM", // Retry similar to spanner client. Special case due to https://github.com/googleapis/google-cloud-go/issues/6476
}
)

// bigtableRetryer extends the generic gax Retryer, but also checks
// error messages to check if operation can be retried
type bigtableRetryer struct {
gax.Retryer
gax.Backoff
}

func containsAny(str string, substrs []string) bool {
for _, substr := range substrs {
if strings.Contains(str, substr) {
return true
}
}
return false
}

func (r *bigtableRetryer) Retry(err error) (time.Duration, bool) {
if status.Code(err) == codes.Internal && containsAny(err.Error(), retryableInternalErrMsgs) {
return r.Backoff.Pause(), true
}

delay, shouldRetry := r.Retryer.Retry(err)
if !shouldRetry {
return 0, false
}

return delay, true
}

func init() {
for _, code := range idempotentRetryCodes {
isIdempotentRetryCode[code] = true
Expand Down
26 changes: 25 additions & 1 deletion bigtable/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ func TestRetryApply(t *testing.T) {

errCount := 0
code := codes.Unavailable // Will be retried
errMsg := ""
// Intercept requests and return an error or defer to the underlying handler
errInjector := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if strings.HasSuffix(info.FullMethod, "MutateRow") && errCount < 3 {
errCount++
return nil, status.Errorf(code, "")
return nil, status.Errorf(code, errMsg)
}
return handler(ctx, req)
}
Expand Down Expand Up @@ -127,6 +128,29 @@ func TestRetryApply(t *testing.T) {
}

errCount = 0
code = codes.Internal // Will be retried
errMsg = "stream terminated by RST_STREAM"
if err := tbl.Apply(ctx, "row", mut); err != nil {
t.Errorf("applying single mutation with retries: %v", err)
}
row, err = tbl.ReadRow(ctx, "row")
if err != nil {
t.Errorf("reading single value with retries: %v", err)
}
if row == nil {
t.Errorf("applying single mutation with retries: could not read back row")
}

errCount = 0
errMsg = ""
code = codes.Internal // Won't be retried
errMsg = "Placeholder message"
if err := tbl.Apply(ctx, "row", condMut); err == nil {
t.Errorf("conditionally mutating row with no retries: no error")
}

errCount = 0
errMsg = ""
code = codes.FailedPrecondition // Won't be retried
if err := tbl.Apply(ctx, "row", condMut); err == nil {
t.Errorf("conditionally mutating row with no retries: no error")
Expand Down

0 comments on commit d4da4a5

Please sign in to comment.