Skip to content

Commit

Permalink
fix(storage): try to reopen for failed Reads (googleapis#4226)
Browse files Browse the repository at this point in the history
Errors from reading the response body in Reader.Read will now always trigger a reopen() call (unless the context has been canceled). Previously, this was limited to only INTERNAL_ERROR from HTTP/2.

Fixes googleapis#3040
  • Loading branch information
tritone authored and adityamaru committed Jun 23, 2021
1 parent d1af076 commit b66c964
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 21 deletions.
138 changes: 138 additions & 0 deletions storage/integration_test.go
Expand Up @@ -2821,6 +2821,144 @@ func TestIntegration_ReaderAttrs(t *testing.T) {
}
}

// Test that context cancellation correctly stops a download before completion.
func TestIntegration_ReaderCancel(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()

bkt := client.Bucket(bucketName)

// Upload a 1MB object.
obj := bkt.Object("reader-cancel-obj")
w := obj.NewWriter(ctx)
c := randomContents()
for i := 0; i < 62500; i++ {
if _, err := w.Write(c); err != nil {
t.Fatalf("writer.Write: %v", err)
}

}
w.Close()

// Create a reader (which makes a GET request to GCS and opens the body to
// read the object) and then cancel the context before reading.
readerCtx, cancel := context.WithCancel(ctx)
r, err := obj.NewReader(readerCtx)
if err != nil {
t.Fatalf("obj.NewReader: %v", err)
}
defer r.Close()

cancel()

// Read the object 1KB a time. We cannot guarantee that Reads will return a
// context canceled error immediately, but they should always do so before we
// reach EOF.
var readErr error
for i := 0; i < 1000; i++ {
buf := make([]byte, 1000)
_, readErr = r.Read(buf)
if readErr != nil {
if readErr == context.Canceled {
return
}
break
}
}
t.Fatalf("Reader.Read: got %v, want context.Canceled", readErr)
}

// Ensures that a file stored with a:
// * Content-Encoding of "gzip"
// * Content-Type of "text/plain"
// will be properly served back.
// See:
// * https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip
// * https://github.com/googleapis/google-cloud-go/issues/1800
func TestIntegration_NewReaderWithContentEncodingGzip(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()

h := testHelper{t}

projectID := testutil.ProjID()
bkt := client.Bucket(uidSpace.New())
h.mustCreate(bkt, projectID, nil)
defer h.mustDeleteBucket(bkt)
obj := bkt.Object("decompressive-transcoding")
original := bytes.Repeat([]byte("a"), 4<<10)

// Wrap the file upload in a retry.
// TODO: Investigate removing retry after resolving
// https://github.com/googleapis/google-api-go-client/issues/392.
err := retry(ctx, func() error {
// Firstly upload the gzip compressed file.
w := obj.NewWriter(ctx)
// Compress and upload the content.
gzw := gzip.NewWriter(w)
if _, err := gzw.Write(original); err != nil {
return fmt.Errorf("Failed to compress content: %v", err)
}
if err := gzw.Close(); err != nil {
return fmt.Errorf("Failed to compress content: %v", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("Failed to finish uploading the file: %v", err)
}
return nil
},
nil)

defer h.mustDeleteObject(obj)

// Now update the Content-Encoding and Content-Type to enable
// decompressive transcoding.
updatedAttrs, err := obj.Update(ctx, ObjectAttrsToUpdate{
ContentEncoding: "gzip",
ContentType: "text/plain",
})
if err != nil {
t.Fatalf("Attribute update failure: %v", err)
}
if g, w := updatedAttrs.ContentEncoding, "gzip"; g != w {
t.Fatalf("ContentEncoding mismtach:\nGot: %q\nWant: %q", g, w)
}
if g, w := updatedAttrs.ContentType, "text/plain"; g != w {
t.Fatalf("ContentType mismtach:\nGot: %q\nWant: %q", g, w)
}

rWhole, err := obj.NewReader(ctx)
if err != nil {
t.Fatalf("Failed to create wholesome reader: %v", err)
}
blobWhole, err := ioutil.ReadAll(rWhole)
rWhole.Close()
if err != nil {
t.Fatalf("Failed to read the whole body: %v", err)
}
if g, w := blobWhole, original; !bytes.Equal(g, w) {
t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w)
}

// Now try a range read, which should return the whole body anyways since
// for decompressive transcoding, range requests ARE IGNORED by Cloud Storage.
r2kBTo3kB, err := obj.NewRangeReader(ctx, 2<<10, 3<<10)
if err != nil {
t.Fatalf("Failed to create range reader: %v", err)
}
blob2kBTo3kB, err := ioutil.ReadAll(r2kBTo3kB)
r2kBTo3kB.Close()
if err != nil {
t.Fatalf("Failed to read with the 2kB to 3kB range request: %v", err)
}
// The ENTIRE body MUST be served back regardless of the requested range.
if g, w := blob2kBTo3kB, original; !bytes.Equal(g, w) {
t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w)
}
}

func TestIntegration_HMACKey(t *testing.T) {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/1526")

Expand Down
20 changes: 9 additions & 11 deletions storage/reader.go
Expand Up @@ -23,7 +23,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -130,6 +129,11 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
// Define a function that initiates a Read with offset and length, assuming we
// have already read seen bytes.
reopen := func(seen int64) (*http.Response, error) {
// If the context has already expired, return immediately without making a
// call.
if err := ctx.Err(); err != nil {
return nil, err
}
start := offset + seen
if length < 0 && start < 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d", start))
Expand Down Expand Up @@ -337,11 +341,12 @@ func (r *Reader) readWithRetry(p []byte) (int, error) {
m, err := r.body.Read(p[n:])
n += m
r.seen += int64(m)
if !shouldRetryRead(err) {
if err == nil || err == io.EOF {
return n, err
}
// Read failed, but we will try again. Send a ranged read request that takes
// into account the number of bytes we've already seen.
// Read failed (likely due to connection issues), but we will try to reopen
// the pipe and continue. Send a ranged read request that takes into account
// the number of bytes we've already seen.
res, err := r.reopen(r.seen)
if err != nil {
// reopen already retries
Expand All @@ -353,13 +358,6 @@ func (r *Reader) readWithRetry(p []byte) (int, error) {
return n, nil
}

func shouldRetryRead(err error) bool {
if err == nil {
return false
}
return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
}

// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.
Expand Down
30 changes: 20 additions & 10 deletions storage/reader_test.go
Expand Up @@ -131,7 +131,8 @@ func (h http2Error) Error() string {
}

func TestRangeReaderRetry(t *testing.T) {
retryErr := http2Error("blah blah INTERNAL_ERROR")
internalErr := http2Error("blah blah INTERNAL_ERROR")
goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"")
readBytes := []byte(readData)
hc, close := newTestServer(handleRangeRead)
defer close()
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{5, 2}, err: io.EOF},
},
want: readData,
Expand All @@ -168,8 +169,8 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{5}, err: retryErr},
{data: readBytes[5:], counts: []int{1, 3}, err: retryErr},
{data: readBytes, counts: []int{5}, err: internalErr},
{data: readBytes[5:], counts: []int{1, 3}, err: goawayErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData,
Expand All @@ -178,7 +179,16 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
},
{
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: goawayErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
Expand All @@ -187,7 +197,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 1,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
Expand All @@ -196,7 +206,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 1,
length: 3,
bodies: []fakeReadCloser{
{data: readBytes[1:], counts: []int{1}, err: retryErr},
{data: readBytes[1:], counts: []int{1}, err: internalErr},
{data: readBytes[2:], counts: []int{2}, err: io.EOF},
},
want: readData[1:4],
Expand All @@ -205,8 +215,8 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[4:], counts: []int{1}, err: retryErr},
{data: readBytes[5:], counts: []int{4}, err: retryErr},
{data: readBytes[4:], counts: []int{1}, err: internalErr},
{data: readBytes[5:], counts: []int{4}, err: internalErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData[4:],
Expand All @@ -215,7 +225,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: -4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[6:], counts: []int{1}, err: retryErr},
{data: readBytes[6:], counts: []int{1}, err: internalErr},
{data: readBytes[7:], counts: []int{3}, err: io.EOF},
},
want: readData[6:],
Expand Down

0 comments on commit b66c964

Please sign in to comment.