Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): try to reopen for failed Reads #4226

Merged
merged 7 commits into from Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions storage/integration_test.go
Expand Up @@ -3178,6 +3178,54 @@ func TestIntegration_ReaderAttrs(t *testing.T) {
}
}

// Test that context cancellation correctly stops a download before completion.
func TestIntegration_ReaderCancel(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()

bkt := client.Bucket(bucketName)

// Upload a 1MB object.
obj := bkt.Object("reader-cancel-obj")
w := obj.NewWriter(ctx)
c := randomContents()
for i := 0; i < 62500; i++ {
if _, err := w.Write(c); err != nil {
t.Fatalf("writer.Write: %v", err)
}

}
w.Close()

// Create a reader (which makes a GET request to GCS and opens the body to
// read the object) and then cancel the context before reading.
readerCtx, cancel := context.WithCancel(ctx)
r, err := obj.NewReader(readerCtx)
if err != nil {
t.Fatalf("obj.NewReader: %v", err)
}
defer r.Close()

cancel()

// Read the object 1KB a time. We cannot guarantee that Reads will return a
// context canceled error immediately, but they should always do so before we
// reach EOF.
var readErr error
for i := 0; i < 1000; i++ {
buf := make([]byte, 1000)
_, readErr = r.Read(buf)
if readErr != nil {
if readErr == context.Canceled {
return
}
break
}
}
t.Fatalf("Reader.Read: got %v, want context.Canceled", readErr)
}

// Ensures that a file stored with a:
// * Content-Encoding of "gzip"
// * Content-Type of "text/plain"
Expand Down
20 changes: 9 additions & 11 deletions storage/reader.go
Expand Up @@ -23,7 +23,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -135,6 +134,11 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
// Define a function that initiates a Read with offset and length, assuming we
// have already read seen bytes.
reopen := func(seen int64) (*http.Response, error) {
// If the context has already expired, return immediately without making a
// call.
if err := ctx.Err(); err != nil {
return nil, err
}
start := offset + seen
if length < 0 && start < 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d", start))
Expand Down Expand Up @@ -369,11 +373,12 @@ func (r *Reader) readWithRetry(p []byte) (int, error) {
m, err := r.body.Read(p[n:])
n += m
r.seen += int64(m)
if !shouldRetryRead(err) {
if err == nil || err == io.EOF {
return n, err
}
// Read failed, but we will try again. Send a ranged read request that takes
// into account the number of bytes we've already seen.
// Read failed (likely due to connection issues), but we will try to reopen
// the pipe and continue. Send a ranged read request that takes into account
// the number of bytes we've already seen.
res, err := r.reopen(r.seen)
if err != nil {
// reopen already retries
Expand All @@ -385,13 +390,6 @@ func (r *Reader) readWithRetry(p []byte) (int, error) {
return n, nil
}

func shouldRetryRead(err error) bool {
if err == nil {
return false
}
return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
}

// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.
Expand Down
30 changes: 20 additions & 10 deletions storage/reader_test.go
Expand Up @@ -131,7 +131,8 @@ func (h http2Error) Error() string {
}

func TestRangeReaderRetry(t *testing.T) {
retryErr := http2Error("blah blah INTERNAL_ERROR")
internalErr := http2Error("blah blah INTERNAL_ERROR")
goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"")
readBytes := []byte(readData)
hc, close := newTestServer(handleRangeRead)
defer close()
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{5, 2}, err: io.EOF},
},
want: readData,
Expand All @@ -168,8 +169,8 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{5}, err: retryErr},
{data: readBytes[5:], counts: []int{1, 3}, err: retryErr},
{data: readBytes, counts: []int{5}, err: internalErr},
{data: readBytes[5:], counts: []int{1, 3}, err: goawayErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData,
Expand All @@ -178,7 +179,16 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
},
{
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: goawayErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
Expand All @@ -187,7 +197,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 1,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
Expand All @@ -196,7 +206,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 1,
length: 3,
bodies: []fakeReadCloser{
{data: readBytes[1:], counts: []int{1}, err: retryErr},
{data: readBytes[1:], counts: []int{1}, err: internalErr},
{data: readBytes[2:], counts: []int{2}, err: io.EOF},
},
want: readData[1:4],
Expand All @@ -205,8 +215,8 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[4:], counts: []int{1}, err: retryErr},
{data: readBytes[5:], counts: []int{4}, err: retryErr},
{data: readBytes[4:], counts: []int{1}, err: internalErr},
{data: readBytes[5:], counts: []int{4}, err: internalErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData[4:],
Expand All @@ -215,7 +225,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: -4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[6:], counts: []int{1}, err: retryErr},
{data: readBytes[6:], counts: []int{1}, err: internalErr},
{data: readBytes[7:], counts: []int{3}, err: io.EOF},
},
want: readData[6:],
Expand Down