Skip to content

Commit

Permalink
feat(bigquery): augment retry predicate (#5387)
Browse files Browse the repository at this point in the history
* feat(bigquery): augment retry predicate

https://google.aip.dev/194 guidance is that INTERNAL errors are
considered non-retryable.  This PR deviates from that slightly,
allowing internalError to be retried for job insertion and polling
cases, as the BigQuery backend has an expectation that such work
will be retried.  All other retries continue to use the same retry
predicate.
  • Loading branch information
shollyman committed Jan 21, 2022
1 parent adea7fc commit f9608d4
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
27 changes: 21 additions & 6 deletions bigquery/bigquery.go
Expand Up @@ -127,7 +127,8 @@ func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*
// have to read the contents and keep it in memory, and that could be expensive.
// TODO(jba): Look into retrying if media != nil.
if job.JobReference != nil && media == nil {
err = runWithRetry(ctx, invoke)
// We deviate from default retries due to BigQuery wanting to retry structured internal job errors.
err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
} else {
err = invoke()
}
Expand All @@ -152,7 +153,7 @@ func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*
}

// We control request ID, so we can always runWithRetry.
err = runWithRetry(ctx, invoke)
err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
if err != nil {
return nil, err
}
Expand All @@ -174,6 +175,10 @@ func unixMillisToTime(m int64) time.Time {
// See the similar function in ../storage/invoke.go. The main difference is the
// reason for retrying.
func runWithRetry(ctx context.Context, call func() error) error {
return runWithRetryExplicit(ctx, call, defaultRetryReasons)
}

func runWithRetryExplicit(ctx context.Context, call func() error, allowedReasons []string) error {
// These parameters match the suggestions in https://cloud.google.com/bigquery/sla.
backoff := gax.Backoff{
Initial: 1 * time.Second,
Expand All @@ -185,15 +190,20 @@ func runWithRetry(ctx context.Context, call func() error) error {
if err == nil {
return true, nil
}
return !retryableError(err), err
return !retryableError(err, allowedReasons), err
})
}

var (
defaultRetryReasons = []string{"backendError", "rateLimitExceeded"}
jobRetryReasons = []string{"backendError", "rateLimitExceeded", "internalError"}
)

// This is the correct definition of retryable according to the BigQuery team. It
// also considers 502 ("Bad Gateway") and 503 ("Service Unavailable") errors
// retryable; these are returned by systems between the client and the BigQuery
// service.
func retryableError(err error) bool {
func retryableError(err error, allowedReasons []string) bool {
if err == nil {
return false
}
Expand All @@ -215,8 +225,13 @@ func retryableError(err error) bool {
var reason string
if len(e.Errors) > 0 {
reason = e.Errors[0].Reason
for _, r := range allowedReasons {
if reason == r {
return true
}
}
}
if e.Code == http.StatusServiceUnavailable || e.Code == http.StatusBadGateway || reason == "backendError" || reason == "rateLimitExceeded" {
if e.Code == http.StatusServiceUnavailable || e.Code == http.StatusBadGateway {
return true
}
case *url.Error:
Expand All @@ -233,7 +248,7 @@ func retryableError(err error) bool {
}
// Unwrap is only supported in go1.13.x+
if e, ok := err.(interface{ Unwrap() error }); ok {
return retryableError(e.Unwrap())
return retryableError(e.Unwrap(), allowedReasons)
}
return false
}
2 changes: 1 addition & 1 deletion bigquery/bigquery_test.go
Expand Up @@ -119,7 +119,7 @@ func TestRetryableErrors(t *testing.T) {
true,
},
} {
got := retryableError(tc.in)
got := retryableError(tc.in, defaultRetryReasons)
if got != tc.want {
t.Errorf("case (%s) mismatch: got %t want %t", tc.description, got, tc.want)
}
Expand Down
2 changes: 1 addition & 1 deletion bigquery/job.go
Expand Up @@ -345,7 +345,7 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6
err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
res, err = call.Do()
if err != nil {
return !retryableError(err), err
return !retryableError(err, jobRetryReasons), err
}
if !res.JobComplete { // GetQueryResults may return early without error; retry.
return false, nil
Expand Down

0 comments on commit f9608d4

Please sign in to comment.