Skip to content

Commit

Permalink
chore(storage): implement Read with gRPC (#4401)
Browse files Browse the repository at this point in the history
Adds (unexported) first pass implementation of gRPC-based Reader with integration tests based on the Storage v2 API.

This includes:

Stream reopening/retry
CRC32 checksum verification
Integration tests
local caching of left over response data
Note: This would normally be a feat but we don't want to trigger a semver change.

Co-authored-by: Chris Cotter <cjcotter@google.com>
  • Loading branch information
noahdietz and tritone committed Aug 20, 2021
1 parent 799e6dc commit 68b8eb8
Show file tree
Hide file tree
Showing 5 changed files with 622 additions and 5 deletions.
1 change: 1 addition & 0 deletions storage/go.mod
Expand Up @@ -12,4 +12,5 @@ require (
google.golang.org/api v0.54.0
google.golang.org/genproto v0.0.0-20210820002220-43fce44e7af1
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
)
284 changes: 281 additions & 3 deletions storage/integration_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"io"
"io/ioutil"
"log"
"math"
"math/rand"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -61,13 +62,16 @@ const (
// TODO(jba): move to testutil, factor out from firestore/integration_test.go.
envFirestoreProjID = "GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID"
envFirestorePrivateKey = "GCLOUD_TESTS_GOLANG_FIRESTORE_KEY"
grpcTestPrefix = "golang-grpc-test-"
)

var (
record = flag.Bool("record", false, "record RPCs")

uidSpace *uid.Space
bucketName string
uidSpace *uid.Space
uidSpaceGRPC *uid.Space
bucketName string
grpcBucketName string
// Use our own random number generator to isolate the sequence of random numbers from
// other packages. This makes it possible to use HTTP replay and draw the same sequence
// of numbers as during recording.
Expand Down Expand Up @@ -175,13 +179,18 @@ func initIntegrationTest() func() error {
if err := client.Bucket(bucketName).Create(ctx, testutil.ProjID(), nil); err != nil {
log.Fatalf("creating bucket %q: %v", bucketName, err)
}
if err := client.Bucket(grpcBucketName).Create(ctx, testutil.ProjID(), nil); err != nil {
log.Fatalf("creating bucket %q: %v", grpcBucketName, err)
}
return cleanup
}
}

func initUIDsAndRand(t time.Time) {
uidSpace = uid.NewSpace(testPrefix, &uid.Options{Time: t})
bucketName = uidSpace.New()
uidSpaceGRPC = uid.NewSpace(grpcTestPrefix, &uid.Options{Time: t})
grpcBucketName = uidSpaceGRPC.New()
// Use our own random source, to avoid other parts of the program taking
// random numbers from the global source and putting record and replay
// out of sync.
Expand All @@ -203,6 +212,21 @@ func testConfig(ctx context.Context, t *testing.T) *Client {
return client
}

// testConfigGPRC returns a gRPC-based client to access GCS. testConfigGRPC
// skips the curent test when being run in Short mode.
func testConfigGRPC(ctx context.Context, t *testing.T) (gc *Client) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}

gc, err := newHybridClient(ctx, nil)
if err != nil {
t.Fatalf("newHybridClient: %v", err)
}

return
}

// config is like testConfig, but it doesn't need a *testing.T.
func config(ctx context.Context) *Client {
ts := testutil.TokenSource(ctx, ScopeFullControl)
Expand Down Expand Up @@ -744,6 +768,250 @@ func TestIntegration_ObjectsRangeReader(t *testing.T) {
}
}

func TestIntegration_ObjectReadGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

content := []byte("Hello, world this is a grpc request")

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucketName).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucketName).Object(name)

// Using a negative length to indicate reading to the end.
r, err := obj.NewRangeReader(ctx, 0, -1)
if err != nil {
t.Fatal(err)
}
defer r.Close()

b := new(bytes.Buffer)
b.Grow(len(content))

n, err := io.Copy(b, r)
if err != nil {
t.Fatal(err)
}
if n == 0 {
t.Fatal("Expected to have read more than 0 bytes")
}

got := b.String()
want := string(content)
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("got(-),want(+):\n%s", diff)
}
}

func TestIntegration_ObjectReadChunksGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

// Use a larger blob to test chunking logic. This is a little over 5MB.
content := bytes.Repeat([]byte("a"), 5<<20)

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucketName).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucketName).Object(name)

r, err := obj.NewReader(ctx)
if err != nil {
t.Fatal(err)
}
defer r.Close()

bufSize := len(content)
buf := make([]byte, bufSize)

// Read in smaller chunks, offset to provoke reading across a Recv boundary.
chunk := 4<<10 + 1234
offset := 0
for {
end := math.Min(float64(offset+chunk), float64(bufSize))
n, err := r.Read(buf[offset:int(end)])
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
offset += n
}

// TODO: Verify content with the checksums.
}

func TestIntegration_ObjectReadRelativeToEndGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

content := []byte("Hello, world this is a grpc request")

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucketName).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucketName).Object(name)

offset := 7
// Using a negative offset to start reading relative to the end of the
// object, and length to indicate reading to the end.
r, err := obj.NewRangeReader(ctx, int64(offset*-1), -1)
if err != nil {
t.Fatal(err)
}
defer r.Close()

b := new(bytes.Buffer)
b.Grow(offset)

n, err := io.Copy(b, r)
if err != nil {
t.Fatal(err)
}
if n == 0 {
t.Fatal("Expected to have read more than 0 bytes")
}

got := b.String()
want := string(content[len(content)-offset:])
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("got(-),want(+):\n%s", diff)
}
}

func TestIntegration_ObjectReadPartialContentGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

content := []byte("Hello, world this is a grpc request")

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucketName).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucketName).Object(name)

offset := 5
length := 5
// Using a negative offset to start reading relative to the end of the
// object, and length to indicate reading to the end.
r, err := obj.NewRangeReader(ctx, int64(offset), int64(length))
if err != nil {
t.Fatal(err)
}
defer r.Close()

b := new(bytes.Buffer)
b.Grow(offset)

n, err := io.Copy(b, r)
if err != nil {
t.Fatal(err)
}
if n == 0 {
t.Fatal("Expected to have read more than 0 bytes")
}

got := b.String()
want := string(content[offset : offset+length])
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("got(-),want(+):\n%s", diff)
}
}

func TestIntegration_ConditionalDownloadGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()
h := testHelper{t}

o := hc.Bucket(grpcBucketName).Object("condread")
defer o.Delete(ctx)

wc := o.NewWriter(ctx)
wc.ContentType = "text/plain"
h.mustWrite(wc, []byte("foo"))

gen := wc.Attrs().Generation
metaGen := wc.Attrs().Metageneration

obj := gc.Bucket(grpcBucketName).Object(o.ObjectName())

if _, err := obj.Generation(gen + 1).NewReader(ctx); err == nil {
t.Fatalf("Unexpected successful download with nonexistent Generation")
}
if _, err := obj.If(Conditions{MetagenerationMatch: metaGen + 1}).NewReader(ctx); err == nil {
t.Fatalf("Unexpected successful download with failed preconditions IfMetaGenerationMatch")
}
if _, err := obj.If(Conditions{GenerationMatch: gen + 1}).NewReader(ctx); err == nil {
t.Fatalf("Unexpected successful download with failed preconditions IfGenerationMatch")
}
if _, err := obj.If(Conditions{GenerationMatch: gen}).NewReader(ctx); err != nil {
t.Fatalf("Download failed: %v", err)
}
}

func TestIntegration_ConditionalDownload(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
Expand Down Expand Up @@ -3823,14 +4091,24 @@ func cleanupBuckets() error {
if err := killBucket(ctx, client, bucketName); err != nil {
return err
}
if err := killBucket(ctx, client, grpcBucketName); err != nil {
return err
}

// Delete buckets whose name begins with our test prefix, and which were
// created a while ago. (Unfortunately GCS doesn't provide last-modified
// time, which would be a better way to check for staleness.)
if err := deleteExpiredBuckets(ctx, client, testPrefix); err != nil {
return err
}
return deleteExpiredBuckets(ctx, client, grpcTestPrefix)
}

func deleteExpiredBuckets(ctx context.Context, client *Client, prefix string) error {
const expireAge = 24 * time.Hour
projectID := testutil.ProjID()
it := client.Buckets(ctx, projectID)
it.Prefix = testPrefix
it.Prefix = prefix
for {
bktAttrs, err := it.Next()
if err == iterator.Done {
Expand Down
9 changes: 9 additions & 0 deletions storage/invoke.go
Expand Up @@ -23,6 +23,8 @@ import (
"cloud.google.com/go/internal"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// runWithRetry calls the function until it returns nil or a non-retryable error, or
Expand Down Expand Up @@ -64,6 +66,13 @@ func shouldRetry(err error) bool {
return true
}
}
// HTTP 429, 502, 503, and 504 all map to gRPC UNAVAILABLE per
// https://grpc.github.io/grpc/core/md_doc_http-grpc-status-mapping.html.
//
// This is only necessary for the experimental gRPC-based media operations.
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
return true
}
// Unwrap is only supported in go1.13.x+
if e, ok := err.(interface{ Unwrap() error }); ok {
return shouldRetry(e.Unwrap())
Expand Down

0 comments on commit 68b8eb8

Please sign in to comment.