diff --git a/storage/integration_test.go b/storage/integration_test.go index 01229e5b2c94..5107fc1c905d 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2821,6 +2821,144 @@ func TestIntegration_ReaderAttrs(t *testing.T) { } } +// Test that context cancellation correctly stops a download before completion. +func TestIntegration_ReaderCancel(t *testing.T) { + ctx := context.Background() + client := testConfig(ctx, t) + defer client.Close() + + bkt := client.Bucket(bucketName) + + // Upload a 1MB object. + obj := bkt.Object("reader-cancel-obj") + w := obj.NewWriter(ctx) + c := randomContents() + for i := 0; i < 62500; i++ { + if _, err := w.Write(c); err != nil { + t.Fatalf("writer.Write: %v", err) + } + + } + w.Close() + + // Create a reader (which makes a GET request to GCS and opens the body to + // read the object) and then cancel the context before reading. + readerCtx, cancel := context.WithCancel(ctx) + r, err := obj.NewReader(readerCtx) + if err != nil { + t.Fatalf("obj.NewReader: %v", err) + } + defer r.Close() + + cancel() + + // Read the object 1KB a time. We cannot guarantee that Reads will return a + // context canceled error immediately, but they should always do so before we + // reach EOF. + var readErr error + for i := 0; i < 1000; i++ { + buf := make([]byte, 1000) + _, readErr = r.Read(buf) + if readErr != nil { + if readErr == context.Canceled { + return + } + break + } + } + t.Fatalf("Reader.Read: got %v, want context.Canceled", readErr) +} + +// Ensures that a file stored with a: +// * Content-Encoding of "gzip" +// * Content-Type of "text/plain" +// will be properly served back. +// See: +// * https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip +// * https://github.com/googleapis/google-cloud-go/issues/1800 +func TestIntegration_NewReaderWithContentEncodingGzip(t *testing.T) { + ctx := context.Background() + client := testConfig(ctx, t) + defer client.Close() + + h := testHelper{t} + + projectID := testutil.ProjID() + bkt := client.Bucket(uidSpace.New()) + h.mustCreate(bkt, projectID, nil) + defer h.mustDeleteBucket(bkt) + obj := bkt.Object("decompressive-transcoding") + original := bytes.Repeat([]byte("a"), 4<<10) + + // Wrap the file upload in a retry. + // TODO: Investigate removing retry after resolving + // https://github.com/googleapis/google-api-go-client/issues/392. + err := retry(ctx, func() error { + // Firstly upload the gzip compressed file. + w := obj.NewWriter(ctx) + // Compress and upload the content. + gzw := gzip.NewWriter(w) + if _, err := gzw.Write(original); err != nil { + return fmt.Errorf("Failed to compress content: %v", err) + } + if err := gzw.Close(); err != nil { + return fmt.Errorf("Failed to compress content: %v", err) + } + if err := w.Close(); err != nil { + return fmt.Errorf("Failed to finish uploading the file: %v", err) + } + return nil + }, + nil) + + defer h.mustDeleteObject(obj) + + // Now update the Content-Encoding and Content-Type to enable + // decompressive transcoding. + updatedAttrs, err := obj.Update(ctx, ObjectAttrsToUpdate{ + ContentEncoding: "gzip", + ContentType: "text/plain", + }) + if err != nil { + t.Fatalf("Attribute update failure: %v", err) + } + if g, w := updatedAttrs.ContentEncoding, "gzip"; g != w { + t.Fatalf("ContentEncoding mismtach:\nGot: %q\nWant: %q", g, w) + } + if g, w := updatedAttrs.ContentType, "text/plain"; g != w { + t.Fatalf("ContentType mismtach:\nGot: %q\nWant: %q", g, w) + } + + rWhole, err := obj.NewReader(ctx) + if err != nil { + t.Fatalf("Failed to create wholesome reader: %v", err) + } + blobWhole, err := ioutil.ReadAll(rWhole) + rWhole.Close() + if err != nil { + t.Fatalf("Failed to read the whole body: %v", err) + } + if g, w := blobWhole, original; !bytes.Equal(g, w) { + t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w) + } + + // Now try a range read, which should return the whole body anyways since + // for decompressive transcoding, range requests ARE IGNORED by Cloud Storage. + r2kBTo3kB, err := obj.NewRangeReader(ctx, 2<<10, 3<<10) + if err != nil { + t.Fatalf("Failed to create range reader: %v", err) + } + blob2kBTo3kB, err := ioutil.ReadAll(r2kBTo3kB) + r2kBTo3kB.Close() + if err != nil { + t.Fatalf("Failed to read with the 2kB to 3kB range request: %v", err) + } + // The ENTIRE body MUST be served back regardless of the requested range. + if g, w := blob2kBTo3kB, original; !bytes.Equal(g, w) { + t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w) + } +} + func TestIntegration_HMACKey(t *testing.T) { t.Skip("https://github.com/googleapis/google-cloud-go/issues/1526") diff --git a/storage/reader.go b/storage/reader.go index 5c83651bd9b7..1c4fb25808be 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "net/http" "net/url" - "reflect" "strconv" "strings" "time" @@ -130,6 +129,11 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) // Define a function that initiates a Read with offset and length, assuming we // have already read seen bytes. reopen := func(seen int64) (*http.Response, error) { + // If the context has already expired, return immediately without making a + // call. + if err := ctx.Err(); err != nil { + return nil, err + } start := offset + seen if length < 0 && start < 0 { req.Header.Set("Range", fmt.Sprintf("bytes=%d", start)) @@ -337,11 +341,12 @@ func (r *Reader) readWithRetry(p []byte) (int, error) { m, err := r.body.Read(p[n:]) n += m r.seen += int64(m) - if !shouldRetryRead(err) { + if err == nil || err == io.EOF { return n, err } - // Read failed, but we will try again. Send a ranged read request that takes - // into account the number of bytes we've already seen. + // Read failed (likely due to connection issues), but we will try to reopen + // the pipe and continue. Send a ranged read request that takes into account + // the number of bytes we've already seen. res, err := r.reopen(r.seen) if err != nil { // reopen already retries @@ -353,13 +358,6 @@ func (r *Reader) readWithRetry(p []byte) (int, error) { return n, nil } -func shouldRetryRead(err error) bool { - if err == nil { - return false - } - return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2") -} - // Size returns the size of the object in bytes. // The returned value is always the same and is not affected by // calls to Read or Close. diff --git a/storage/reader_test.go b/storage/reader_test.go index 4b35f3c94c0f..81c9a0c6d804 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -131,7 +131,8 @@ func (h http2Error) Error() string { } func TestRangeReaderRetry(t *testing.T) { - retryErr := http2Error("blah blah INTERNAL_ERROR") + internalErr := http2Error("blah blah INTERNAL_ERROR") + goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"") readBytes := []byte(readData) hc, close := newTestServer(handleRangeRead) defer close() @@ -159,7 +160,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: 0, length: -1, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes, counts: []int{3}, err: internalErr}, {data: readBytes[3:], counts: []int{5, 2}, err: io.EOF}, }, want: readData, @@ -168,8 +169,8 @@ func TestRangeReaderRetry(t *testing.T) { offset: 0, length: -1, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{5}, err: retryErr}, - {data: readBytes[5:], counts: []int{1, 3}, err: retryErr}, + {data: readBytes, counts: []int{5}, err: internalErr}, + {data: readBytes[5:], counts: []int{1, 3}, err: goawayErr}, {data: readBytes[9:], counts: []int{1}, err: io.EOF}, }, want: readData, @@ -178,7 +179,16 @@ func TestRangeReaderRetry(t *testing.T) { offset: 0, length: 5, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes, counts: []int{3}, err: internalErr}, + {data: readBytes[3:], counts: []int{2}, err: io.EOF}, + }, + want: readData[:5], + }, + { + offset: 0, + length: 5, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{3}, err: goawayErr}, {data: readBytes[3:], counts: []int{2}, err: io.EOF}, }, want: readData[:5], @@ -187,7 +197,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: 1, length: 5, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes, counts: []int{3}, err: internalErr}, {data: readBytes[3:], counts: []int{2}, err: io.EOF}, }, want: readData[:5], @@ -196,7 +206,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: 1, length: 3, bodies: []fakeReadCloser{ - {data: readBytes[1:], counts: []int{1}, err: retryErr}, + {data: readBytes[1:], counts: []int{1}, err: internalErr}, {data: readBytes[2:], counts: []int{2}, err: io.EOF}, }, want: readData[1:4], @@ -205,8 +215,8 @@ func TestRangeReaderRetry(t *testing.T) { offset: 4, length: -1, bodies: []fakeReadCloser{ - {data: readBytes[4:], counts: []int{1}, err: retryErr}, - {data: readBytes[5:], counts: []int{4}, err: retryErr}, + {data: readBytes[4:], counts: []int{1}, err: internalErr}, + {data: readBytes[5:], counts: []int{4}, err: internalErr}, {data: readBytes[9:], counts: []int{1}, err: io.EOF}, }, want: readData[4:], @@ -215,7 +225,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: -4, length: -1, bodies: []fakeReadCloser{ - {data: readBytes[6:], counts: []int{1}, err: retryErr}, + {data: readBytes[6:], counts: []int{1}, err: internalErr}, {data: readBytes[7:], counts: []int{3}, err: io.EOF}, }, want: readData[6:],