From 68b8eb85d90418d8de01341ee11dedfc55984917 Mon Sep 17 00:00:00 2001 From: Noah Dietz Date: Fri, 20 Aug 2021 13:30:15 -0700 Subject: [PATCH] chore(storage): implement Read with gRPC (#4401) Adds (unexported) first pass implementation of gRPC-based Reader with integration tests based on the Storage v2 API. This includes: Stream reopening/retry CRC32 checksum verification Integration tests local caching of left over response data Note: This would normally be a feat but we don't want to trigger a semver change. Co-authored-by: Chris Cotter --- storage/go.mod | 1 + storage/integration_test.go | 284 ++++++++++++++++++++++++++++++++++- storage/invoke.go | 9 ++ storage/reader.go | 289 +++++++++++++++++++++++++++++++++++- storage/storage.go | 44 ++++++ 5 files changed, 622 insertions(+), 5 deletions(-) diff --git a/storage/go.mod b/storage/go.mod index 9fdd12160b6..02812b8a1c1 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -12,4 +12,5 @@ require ( google.golang.org/api v0.54.0 google.golang.org/genproto v0.0.0-20210820002220-43fce44e7af1 google.golang.org/grpc v1.40.0 + google.golang.org/protobuf v1.27.1 ) diff --git a/storage/integration_test.go b/storage/integration_test.go index 82745f68252..9ce5d438fb2 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -28,6 +28,7 @@ import ( "io" "io/ioutil" "log" + "math" "math/rand" "mime/multipart" "net/http" @@ -61,13 +62,16 @@ const ( // TODO(jba): move to testutil, factor out from firestore/integration_test.go. envFirestoreProjID = "GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID" envFirestorePrivateKey = "GCLOUD_TESTS_GOLANG_FIRESTORE_KEY" + grpcTestPrefix = "golang-grpc-test-" ) var ( record = flag.Bool("record", false, "record RPCs") - uidSpace *uid.Space - bucketName string + uidSpace *uid.Space + uidSpaceGRPC *uid.Space + bucketName string + grpcBucketName string // Use our own random number generator to isolate the sequence of random numbers from // other packages. This makes it possible to use HTTP replay and draw the same sequence // of numbers as during recording. @@ -175,6 +179,9 @@ func initIntegrationTest() func() error { if err := client.Bucket(bucketName).Create(ctx, testutil.ProjID(), nil); err != nil { log.Fatalf("creating bucket %q: %v", bucketName, err) } + if err := client.Bucket(grpcBucketName).Create(ctx, testutil.ProjID(), nil); err != nil { + log.Fatalf("creating bucket %q: %v", grpcBucketName, err) + } return cleanup } } @@ -182,6 +189,8 @@ func initIntegrationTest() func() error { func initUIDsAndRand(t time.Time) { uidSpace = uid.NewSpace(testPrefix, &uid.Options{Time: t}) bucketName = uidSpace.New() + uidSpaceGRPC = uid.NewSpace(grpcTestPrefix, &uid.Options{Time: t}) + grpcBucketName = uidSpaceGRPC.New() // Use our own random source, to avoid other parts of the program taking // random numbers from the global source and putting record and replay // out of sync. @@ -203,6 +212,21 @@ func testConfig(ctx context.Context, t *testing.T) *Client { return client } +// testConfigGPRC returns a gRPC-based client to access GCS. testConfigGRPC +// skips the curent test when being run in Short mode. +func testConfigGRPC(ctx context.Context, t *testing.T) (gc *Client) { + if testing.Short() { + t.Skip("Integration tests skipped in short mode") + } + + gc, err := newHybridClient(ctx, nil) + if err != nil { + t.Fatalf("newHybridClient: %v", err) + } + + return +} + // config is like testConfig, but it doesn't need a *testing.T. func config(ctx context.Context) *Client { ts := testutil.TokenSource(ctx, ScopeFullControl) @@ -744,6 +768,250 @@ func TestIntegration_ObjectsRangeReader(t *testing.T) { } } +func TestIntegration_ObjectReadGRPC(t *testing.T) { + ctx := context.Background() + + // Create an HTTP client to upload test data and a gRPC client to test with. + hc := testConfig(ctx, t) + defer hc.Close() + gc := testConfigGRPC(ctx, t) + defer gc.Close() + + content := []byte("Hello, world this is a grpc request") + + // Upload test data. + name := uidSpace.New() + ho := hc.Bucket(grpcBucketName).Object(name) + if err := writeObject(ctx, ho, "text/plain", content); err != nil { + t.Fatal(err) + } + defer func() { + if err := ho.Delete(ctx); err != nil { + log.Printf("failed to delete test object: %v", err) + } + }() + + obj := gc.Bucket(grpcBucketName).Object(name) + + // Using a negative length to indicate reading to the end. + r, err := obj.NewRangeReader(ctx, 0, -1) + if err != nil { + t.Fatal(err) + } + defer r.Close() + + b := new(bytes.Buffer) + b.Grow(len(content)) + + n, err := io.Copy(b, r) + if err != nil { + t.Fatal(err) + } + if n == 0 { + t.Fatal("Expected to have read more than 0 bytes") + } + + got := b.String() + want := string(content) + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("got(-),want(+):\n%s", diff) + } +} + +func TestIntegration_ObjectReadChunksGRPC(t *testing.T) { + ctx := context.Background() + + // Create an HTTP client to upload test data and a gRPC client to test with. + hc := testConfig(ctx, t) + defer hc.Close() + gc := testConfigGRPC(ctx, t) + defer gc.Close() + + // Use a larger blob to test chunking logic. This is a little over 5MB. + content := bytes.Repeat([]byte("a"), 5<<20) + + // Upload test data. + name := uidSpace.New() + ho := hc.Bucket(grpcBucketName).Object(name) + if err := writeObject(ctx, ho, "text/plain", content); err != nil { + t.Fatal(err) + } + defer func() { + if err := ho.Delete(ctx); err != nil { + log.Printf("failed to delete test object: %v", err) + } + }() + + obj := gc.Bucket(grpcBucketName).Object(name) + + r, err := obj.NewReader(ctx) + if err != nil { + t.Fatal(err) + } + defer r.Close() + + bufSize := len(content) + buf := make([]byte, bufSize) + + // Read in smaller chunks, offset to provoke reading across a Recv boundary. + chunk := 4<<10 + 1234 + offset := 0 + for { + end := math.Min(float64(offset+chunk), float64(bufSize)) + n, err := r.Read(buf[offset:int(end)]) + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + offset += n + } + + // TODO: Verify content with the checksums. +} + +func TestIntegration_ObjectReadRelativeToEndGRPC(t *testing.T) { + ctx := context.Background() + + // Create an HTTP client to upload test data and a gRPC client to test with. + hc := testConfig(ctx, t) + defer hc.Close() + gc := testConfigGRPC(ctx, t) + defer gc.Close() + + content := []byte("Hello, world this is a grpc request") + + // Upload test data. + name := uidSpace.New() + ho := hc.Bucket(grpcBucketName).Object(name) + if err := writeObject(ctx, ho, "text/plain", content); err != nil { + t.Fatal(err) + } + defer func() { + if err := ho.Delete(ctx); err != nil { + log.Printf("failed to delete test object: %v", err) + } + }() + + obj := gc.Bucket(grpcBucketName).Object(name) + + offset := 7 + // Using a negative offset to start reading relative to the end of the + // object, and length to indicate reading to the end. + r, err := obj.NewRangeReader(ctx, int64(offset*-1), -1) + if err != nil { + t.Fatal(err) + } + defer r.Close() + + b := new(bytes.Buffer) + b.Grow(offset) + + n, err := io.Copy(b, r) + if err != nil { + t.Fatal(err) + } + if n == 0 { + t.Fatal("Expected to have read more than 0 bytes") + } + + got := b.String() + want := string(content[len(content)-offset:]) + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("got(-),want(+):\n%s", diff) + } +} + +func TestIntegration_ObjectReadPartialContentGRPC(t *testing.T) { + ctx := context.Background() + + // Create an HTTP client to upload test data and a gRPC client to test with. + hc := testConfig(ctx, t) + defer hc.Close() + gc := testConfigGRPC(ctx, t) + defer gc.Close() + + content := []byte("Hello, world this is a grpc request") + + // Upload test data. + name := uidSpace.New() + ho := hc.Bucket(grpcBucketName).Object(name) + if err := writeObject(ctx, ho, "text/plain", content); err != nil { + t.Fatal(err) + } + defer func() { + if err := ho.Delete(ctx); err != nil { + log.Printf("failed to delete test object: %v", err) + } + }() + + obj := gc.Bucket(grpcBucketName).Object(name) + + offset := 5 + length := 5 + // Using a negative offset to start reading relative to the end of the + // object, and length to indicate reading to the end. + r, err := obj.NewRangeReader(ctx, int64(offset), int64(length)) + if err != nil { + t.Fatal(err) + } + defer r.Close() + + b := new(bytes.Buffer) + b.Grow(offset) + + n, err := io.Copy(b, r) + if err != nil { + t.Fatal(err) + } + if n == 0 { + t.Fatal("Expected to have read more than 0 bytes") + } + + got := b.String() + want := string(content[offset : offset+length]) + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("got(-),want(+):\n%s", diff) + } +} + +func TestIntegration_ConditionalDownloadGRPC(t *testing.T) { + ctx := context.Background() + + // Create an HTTP client to upload test data and a gRPC client to test with. + hc := testConfig(ctx, t) + defer hc.Close() + gc := testConfigGRPC(ctx, t) + defer gc.Close() + h := testHelper{t} + + o := hc.Bucket(grpcBucketName).Object("condread") + defer o.Delete(ctx) + + wc := o.NewWriter(ctx) + wc.ContentType = "text/plain" + h.mustWrite(wc, []byte("foo")) + + gen := wc.Attrs().Generation + metaGen := wc.Attrs().Metageneration + + obj := gc.Bucket(grpcBucketName).Object(o.ObjectName()) + + if _, err := obj.Generation(gen + 1).NewReader(ctx); err == nil { + t.Fatalf("Unexpected successful download with nonexistent Generation") + } + if _, err := obj.If(Conditions{MetagenerationMatch: metaGen + 1}).NewReader(ctx); err == nil { + t.Fatalf("Unexpected successful download with failed preconditions IfMetaGenerationMatch") + } + if _, err := obj.If(Conditions{GenerationMatch: gen + 1}).NewReader(ctx); err == nil { + t.Fatalf("Unexpected successful download with failed preconditions IfGenerationMatch") + } + if _, err := obj.If(Conditions{GenerationMatch: gen}).NewReader(ctx); err != nil { + t.Fatalf("Download failed: %v", err) + } +} + func TestIntegration_ConditionalDownload(t *testing.T) { ctx := context.Background() client := testConfig(ctx, t) @@ -3823,14 +4091,24 @@ func cleanupBuckets() error { if err := killBucket(ctx, client, bucketName); err != nil { return err } + if err := killBucket(ctx, client, grpcBucketName); err != nil { + return err + } // Delete buckets whose name begins with our test prefix, and which were // created a while ago. (Unfortunately GCS doesn't provide last-modified // time, which would be a better way to check for staleness.) + if err := deleteExpiredBuckets(ctx, client, testPrefix); err != nil { + return err + } + return deleteExpiredBuckets(ctx, client, grpcTestPrefix) +} + +func deleteExpiredBuckets(ctx context.Context, client *Client, prefix string) error { const expireAge = 24 * time.Hour projectID := testutil.ProjID() it := client.Buckets(ctx, projectID) - it.Prefix = testPrefix + it.Prefix = prefix for { bktAttrs, err := it.Next() if err == iterator.Done { diff --git a/storage/invoke.go b/storage/invoke.go index f5cf554a8c7..2ecf48fffd4 100644 --- a/storage/invoke.go +++ b/storage/invoke.go @@ -23,6 +23,8 @@ import ( "cloud.google.com/go/internal" gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/googleapi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // runWithRetry calls the function until it returns nil or a non-retryable error, or @@ -64,6 +66,13 @@ func shouldRetry(err error) bool { return true } } + // HTTP 429, 502, 503, and 504 all map to gRPC UNAVAILABLE per + // https://grpc.github.io/grpc/core/md_doc_http-grpc-status-mapping.html. + // + // This is only necessary for the experimental gRPC-based media operations. + if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable { + return true + } // Unwrap is only supported in go1.13.x+ if e, ok := err.(interface{ Unwrap() error }); ok { return shouldRetry(e.Unwrap()) diff --git a/storage/reader.go b/storage/reader.go index a992249846b..d403a17bbea 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -29,6 +29,8 @@ import ( "cloud.google.com/go/internal/trace" "google.golang.org/api/googleapi" + storagepb "google.golang.org/genproto/googleapis/storage/v2" + "google.golang.org/protobuf/proto" ) var crc32cTable = crc32.MakeTable(crc32.Castagnoli) @@ -94,6 +96,10 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader") defer func() { trace.EndSpan(ctx, err) }() + if o.c.gc != nil { + return o.newRangeReaderWithGRPC(ctx, offset, length) + } + if err := o.validate(); err != nil { return nil, err } @@ -365,15 +371,36 @@ type Reader struct { wantCRC uint32 // the CRC32c value the server sent in the header gotCRC uint32 // running crc reopen func(seen int64) (*http.Response, error) + + // The following fields are only for use in the gRPC hybrid client. + stream storagepb.Storage_ReadObjectClient + reopenWithGRPC func(seen int64) (*readStreamResponse, context.CancelFunc, error) + leftovers []byte + cancelStream context.CancelFunc +} + +type readStreamResponse struct { + stream storagepb.Storage_ReadObjectClient + response *storagepb.ReadObjectResponse } // Close closes the Reader. It must be called when done reading. func (r *Reader) Close() error { - return r.body.Close() + if r.body != nil { + return r.body.Close() + } + + r.closeStream() + return nil } func (r *Reader) Read(p []byte) (int, error) { - n, err := r.readWithRetry(p) + read := r.readWithRetry + if r.reopenWithGRPC != nil { + read = r.readWithGRPC + } + + n, err := read(p) if r.remain != -1 { r.remain -= int64(n) } @@ -392,6 +419,132 @@ func (r *Reader) Read(p []byte) (int, error) { return n, err } +// newRangeReaderWithGRPC creates a new Reader with the given range that uses +// gRPC to read Object content. +// +// This is an experimental API and not intended for public use. +func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, length int64) (r *Reader, err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.newRangeReaderWithGRPC") + defer func() { trace.EndSpan(ctx, err) }() + + if o.c.gc == nil { + err = fmt.Errorf("handle doesn't have a gRPC client initialized") + return + } + if err = o.validate(); err != nil { + return + } + + // A negative length means "read to the end of the object", but the + // read_limit field it corresponds to uses zero to mean the same thing. Thus + // we coerce the length to 0 to read to the end of the object. + if length < 0 { + length = 0 + } + + // For now, there are only globally unique buckets, and "_" is the alias + // project ID for such buckets. + b := bucket("_", o.bucket) + req := &storagepb.ReadObjectRequest{ + Bucket: b, + Object: o.object, + } + // The default is a negative value, which means latest. + if o.gen >= 0 { + req.Generation = o.gen + } + + // Define a function that initiates a Read with offset and length, assuming + // we have already read seen bytes. + reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) { + // If the context has already expired, return immediately without making + // we call. + if err := ctx.Err(); err != nil { + return nil, nil, err + } + + cc, cancel := context.WithCancel(ctx) + + start := offset + seen + // Only set a ReadLimit if length is greater than zero, because zero + // means read it all. + if length > 0 { + req.ReadLimit = length - seen + } + req.ReadOffset = start + + setRequestConditions(req, o.conds) + + var stream storagepb.Storage_ReadObjectClient + var msg *storagepb.ReadObjectResponse + var err error + + err = runWithRetry(cc, func() error { + stream, err = o.c.gc.ReadObject(cc, req) + if err != nil { + return err + } + + msg, err = stream.Recv() + + return err + }) + if err != nil { + // Close the stream context we just created to ensure we don't leak + // resources. + cancel() + return nil, nil, err + } + + return &readStreamResponse{stream, msg}, cancel, nil + } + + res, cancel, err := reopen(0) + if err != nil { + return nil, err + } + + r = &Reader{ + stream: res.stream, + reopenWithGRPC: reopen, + cancelStream: cancel, + } + + // The first message was Recv'd on stream open, use it to populate the + // object metadata. + msg := res.response + obj := msg.GetMetadata() + // This is the size of the content the Reader will convey. It can be the + // entire object, or just the size of the request range. + size := msg.GetContentRange().GetCompleteLength() + + r.Attrs = ReaderObjectAttrs{ + Size: size, + ContentType: obj.GetContentType(), + ContentEncoding: obj.GetContentEncoding(), + CacheControl: obj.GetCacheControl(), + LastModified: obj.GetUpdateTime().AsTime(), + Metageneration: obj.GetMetageneration(), + Generation: obj.GetGeneration(), + } + if cr := msg.GetContentRange(); cr != nil { + r.Attrs.StartOffset = cr.GetStart() + } + // Only support checksums when reading an entire object, not a range. + if msg.GetObjectChecksums().Crc32C != nil && offset == 0 && length == 0 { + r.wantCRC = msg.GetObjectChecksums().GetCrc32C() + r.checkCRC = true + } + + // Store the content from the first Recv in the client buffer for reading + // later. + r.leftovers = msg.GetChecksummedData().GetContent() + r.remain = size + r.size = size + + return r, nil +} + func (r *Reader) readWithRetry(p []byte) (int, error) { n := 0 for len(p[n:]) > 0 { @@ -415,6 +568,138 @@ func (r *Reader) readWithRetry(p []byte) (int, error) { return n, nil } +// closeStream cancels a stream's context in order for it to be closed and +// collected. +// +// This is an experimental API and not intended for public use. +func (r *Reader) closeStream() { + if r.cancelStream != nil { + r.cancelStream() + } + r.stream = nil +} + +// readWithGRPC reads bytes into the user's buffer from an open gRPC stream. +// +// This is an experimental API and not intended for public use. +func (r *Reader) readWithGRPC(p []byte) (int, error) { + // No stream to read from, either never initiliazed or Close was called. + // Note: There is a potential concurrency issue if multiple routines are + // using the same reader. One encounters an error and the stream is closed + // and then reopened while the other routine attempts to read from it. + if r.stream == nil { + return 0, fmt.Errorf("reader has been closed") + } + + // The entire object has been read by this reader, return EOF. + if r.size != 0 && r.size == r.seen { + return 0, io.EOF + } + + var n int + // Read leftovers and return what was available to conform to the Reader + // interface: https://pkg.go.dev/io#Reader. + if len(r.leftovers) > 0 { + n = copy(p, r.leftovers) + r.seen += int64(n) + r.leftovers = r.leftovers[n:] + return n, nil + } + + // Attempt to Recv the next message on the stream. + msg, err := r.recv() + if err != nil { + return 0, err + } + + // TODO: Determine if we need to capture incremental CRC32C for this + // chunk. The Object CRC32C checksum is captured when directed to read + // the entire Object. If directed to read a range, we may need to + // calculate the range's checksum for verification if the checksum is + // present in the response here. + // TODO: Figure out if we need to support decompressive transcoding + // https://cloud.google.com/storage/docs/transcoding. + content := msg.GetChecksummedData().GetContent() + n = copy(p[n:], content) + leftover := len(content) - n + if leftover > 0 { + // Wasn't able to copy all of the data in the message, store for + // future Read calls. + // TODO: Instead of acquiring a new block of memory, should we reuse + // the existing leftovers slice, expanding it if necessary? + r.leftovers = make([]byte, leftover) + copy(r.leftovers, content[n:]) + } + r.seen += int64(n) + + return n, nil +} + +// recv attempts to Recv the next message on the stream. In the event +// that a retryable error is encountered, the stream will be closed, reopened, +// and Recv again. This will attempt to Recv until one of the following is true: +// +// * Recv is successful +// * A non-retryable error is encountered +// * The Reader's context is canceled +// +// The last error received is the one that is returned, which could be from +// an attempt to reopen the stream. +// +// This is an experimental API and not intended for public use. +func (r *Reader) recv() (*storagepb.ReadObjectResponse, error) { + msg, err := r.stream.Recv() + if err != nil && shouldRetry(err) { + // This will "close" the existing stream and immediately attempt to + // reopen the stream, but will backoff if further attempts are necessary. + // Reopening the stream Recvs the first message, so if retrying is + // successful, the next logical chunk will be returned. + msg, err = r.reopenStream(r.seen) + } + + return msg, err +} + +// reopenStream "closes" the existing stream and attempts to reopen a stream and +// sets the Reader's stream and cancelStream properties in the process. +// +// This is an experimental API and not intended for public use. +func (r *Reader) reopenStream(seen int64) (*storagepb.ReadObjectResponse, error) { + // Close existing stream and initialize new stream with updated offset. + r.closeStream() + + res, cancel, err := r.reopenWithGRPC(r.seen) + if err != nil { + return nil, err + } + r.stream = res.stream + r.cancelStream = cancel + return res.response, nil +} + +// setRequestConditions is used to apply the given Conditions to a gRPC request +// message. +// +// This is an experimental API and not intended for public use. +func setRequestConditions(req *storagepb.ReadObjectRequest, conds *Conditions) { + if conds == nil { + return + } + if conds.MetagenerationMatch != 0 { + req.IfMetagenerationMatch = proto.Int64(conds.MetagenerationMatch) + } else if conds.MetagenerationNotMatch != 0 { + req.IfMetagenerationNotMatch = proto.Int64(conds.MetagenerationNotMatch) + } + switch { + case conds.GenerationNotMatch != 0: + req.IfGenerationNotMatch = proto.Int64(conds.GenerationNotMatch) + case conds.GenerationMatch != 0: + req.IfGenerationMatch = proto.Int64(conds.GenerationMatch) + case conds.DoesNotExist: + req.IfGenerationMatch = proto.Int64(0) + } +} + // Size returns the size of the object in bytes. // The returned value is always the same and is not affected by // calls to Read or Close. diff --git a/storage/storage.go b/storage/storage.go index fc4d67be702..1b2e25dadd1 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -40,6 +40,7 @@ import ( "cloud.google.com/go/internal/optional" "cloud.google.com/go/internal/trace" "cloud.google.com/go/internal/version" + gapic "cloud.google.com/go/storage/internal/apiv2" "google.golang.org/api/googleapi" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" @@ -92,6 +93,11 @@ type Client struct { scheme string // ReadHost is the default host used on the reader. readHost string + + // gc is an optional gRPC-based, GAPIC client. + // + // This is an experimental field and not intended for public use. + gc *gapic.Client } // NewClient creates a new Google Cloud Storage client. @@ -164,6 +170,34 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error }, nil } +// hybridClientOptions carries the set of client options for HTTP and gRPC clients. +type hybridClientOptions struct { + HTTPOpts []option.ClientOption + GRPCOpts []option.ClientOption +} + +// newHybridClient creates a new Storage client that initializes a gRPC-based client +// for media upload and download operations. +// +// This is an experimental API and not intended for public use. +func newHybridClient(ctx context.Context, opts *hybridClientOptions) (*Client, error) { + if opts == nil { + opts = &hybridClientOptions{} + } + c, err := NewClient(ctx, opts.HTTPOpts...) + if err != nil { + return nil, err + } + + g, err := gapic.NewClient(ctx, opts.GRPCOpts...) + if err != nil { + return nil, err + } + c.gc = g + + return c, nil +} + // Close closes the Client. // // Close need not be called at program exit. @@ -171,6 +205,9 @@ func (c *Client) Close() error { // Set fields to nil so that subsequent uses will panic. c.hc = nil c.raw = nil + if c.gc != nil { + return c.gc.Close() + } return nil } @@ -1649,3 +1686,10 @@ func (c *Client) ServiceAccount(ctx context.Context, projectID string) (string, } return res.EmailAddress, nil } + +// bucket formats the given project ID and bucket ID into a Bucket resource +// name. This is the format necessary for the gRPC API as it conforms to the +// Resource-oriented design practices in https://google.aip.dev/121. +func bucket(p, b string) string { + return fmt.Sprintf("projects/%s/buckets/%s", p, b) +}