From a0568b2501004cb462a703a37175f8a698655ab2 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Tue, 15 Dec 2020 11:46:51 +1100 Subject: [PATCH 01/13] test(spanner): configurable host for it tests (#3456) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * test(spanner): configurable host for it tests Makes it possible to set a different spanner api host to be used for the integration tests * fix: reformats code Co-authored-by: Knut Olav Løite Co-authored-by: Knut Olav Løite --- spanner/integration_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 47b266d7e75..2881813c533 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -52,6 +52,10 @@ var ( // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. testProjectID = testutil.ProjID() + // spannerHost specifies the spanner API host used for testing. It can be changed + // by setting the environment variable GCLOUD_TESTS_GOLANG_SPANNER_HOST + spannerHost = getSpannerHost() + dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) @@ -165,6 +169,10 @@ func parseInstanceName(inst string) (project, instance string, err error) { return matches[1], matches[2], nil } +func getSpannerHost() string { + return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_HOST") +} + const ( str1 = "alice" str2 = "a@example.com" @@ -195,6 +203,9 @@ func initIntegrationTests() (cleanup func()) { } opts := grpcHeaderChecker.CallOptions() + if spannerHost != "" { + opts = append(opts, option.WithEndpoint(spannerHost)) + } var err error // Create InstanceAdmin and DatabaseAdmin clients. instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) @@ -3090,7 +3101,11 @@ func isNaN(x interface{}) bool { // createClient creates Cloud Spanner data client. func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) { - client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}) + opts := grpcHeaderChecker.CallOptions() + if spannerHost != "" { + opts = append(opts, option.WithEndpoint(spannerHost)) + } + client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...) if err != nil { return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) } From 10620e7ed3a2b9e50589fb5a3b4451a199b13076 Mon Sep 17 00:00:00 2001 From: Cody Oss <6331106+codyoss@users.noreply.github.com> Date: Tue, 15 Dec 2020 11:00:02 -0700 Subject: [PATCH 02/13] feat(asset)!: remove v1beta1 (#3462) Protos are no longer present and the api has been turned down. --- asset/apiv1beta1/asset_client.go | 292 ------------------ asset/apiv1beta1/asset_client_example_test.go | 79 ----- asset/apiv1beta1/doc.go | 116 ------- asset/apiv1beta1/mock_test.go | 265 ---------------- internal/gapicgen/generator/config.go | 8 - 5 files changed, 760 deletions(-) delete mode 100644 asset/apiv1beta1/asset_client.go delete mode 100644 asset/apiv1beta1/asset_client_example_test.go delete mode 100644 asset/apiv1beta1/doc.go delete mode 100644 asset/apiv1beta1/mock_test.go diff --git a/asset/apiv1beta1/asset_client.go b/asset/apiv1beta1/asset_client.go deleted file mode 100644 index 2d325803415..00000000000 --- a/asset/apiv1beta1/asset_client.go +++ /dev/null @@ -1,292 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by protoc-gen-go_gapic. DO NOT EDIT. - -package asset - -import ( - "context" - "fmt" - "math" - "net/url" - "time" - - "cloud.google.com/go/longrunning" - lroauto "cloud.google.com/go/longrunning/autogen" - gax "github.com/googleapis/gax-go/v2" - "google.golang.org/api/option" - "google.golang.org/api/option/internaloption" - gtransport "google.golang.org/api/transport/grpc" - assetpb "google.golang.org/genproto/googleapis/cloud/asset/v1beta1" - longrunningpb "google.golang.org/genproto/googleapis/longrunning" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" -) - -var newClientHook clientHook - -// CallOptions contains the retry settings for each method of Client. -type CallOptions struct { - ExportAssets []gax.CallOption - BatchGetAssetsHistory []gax.CallOption -} - -func defaultClientOptions() []option.ClientOption { - return []option.ClientOption{ - internaloption.WithDefaultEndpoint("cloudasset.googleapis.com:443"), - internaloption.WithDefaultMTLSEndpoint("cloudasset.mtls.googleapis.com:443"), - internaloption.WithDefaultAudience("https://cloudasset.googleapis.com/"), - internaloption.WithDefaultScopes(DefaultAuthScopes()...), - option.WithGRPCDialOption(grpc.WithDisableServiceConfig()), - option.WithGRPCDialOption(grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt32))), - } -} - -func defaultCallOptions() *CallOptions { - return &CallOptions{ - ExportAssets: []gax.CallOption{}, - BatchGetAssetsHistory: []gax.CallOption{ - gax.WithRetry(func() gax.Retryer { - return gax.OnCodes([]codes.Code{ - codes.DeadlineExceeded, - codes.Unavailable, - }, gax.Backoff{ - Initial: 100 * time.Millisecond, - Max: 60000 * time.Millisecond, - Multiplier: 1.30, - }) - }), - }, - } -} - -// Client is a client for interacting with Cloud Asset API. -// -// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls. -type Client struct { - // Connection pool of gRPC connections to the service. - connPool gtransport.ConnPool - - // flag to opt out of default deadlines via GOOGLE_API_GO_EXPERIMENTAL_DISABLE_DEFAULT_DEADLINE - disableDeadlines bool - - // The gRPC API client. - client assetpb.AssetServiceClient - - // LROClient is used internally to handle longrunning operations. - // It is exposed so that its CallOptions can be modified if required. - // Users should not Close this client. - LROClient *lroauto.OperationsClient - - // The call options for this service. - CallOptions *CallOptions - - // The x-goog-* metadata to be sent with each request. - xGoogMetadata metadata.MD -} - -// NewClient creates a new asset service client. -// -// Asset service definition. -func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error) { - clientOpts := defaultClientOptions() - - if newClientHook != nil { - hookOpts, err := newClientHook(ctx, clientHookParams{}) - if err != nil { - return nil, err - } - clientOpts = append(clientOpts, hookOpts...) - } - - disableDeadlines, err := checkDisableDeadlines() - if err != nil { - return nil, err - } - - connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...) - if err != nil { - return nil, err - } - c := &Client{ - connPool: connPool, - disableDeadlines: disableDeadlines, - CallOptions: defaultCallOptions(), - - client: assetpb.NewAssetServiceClient(connPool), - } - c.setGoogleClientInfo() - - c.LROClient, err = lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool)) - if err != nil { - // This error "should not happen", since we are just reusing old connection pool - // and never actually need to dial. - // If this does happen, we could leak connp. However, we cannot close conn: - // If the user invoked the constructor with option.WithGRPCConn, - // we would close a connection that's still in use. - // TODO: investigate error conditions. - return nil, err - } - return c, nil -} - -// Connection returns a connection to the API service. -// -// Deprecated. -func (c *Client) Connection() *grpc.ClientConn { - return c.connPool.Conn() -} - -// Close closes the connection to the API service. The user should invoke this when -// the client is no longer required. -func (c *Client) Close() error { - return c.connPool.Close() -} - -// setGoogleClientInfo sets the name and version of the application in -// the `x-goog-api-client` header passed on each request. Intended for -// use by Google-written clients. -func (c *Client) setGoogleClientInfo(keyval ...string) { - kv := append([]string{"gl-go", versionGo()}, keyval...) - kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version) - c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)) -} - -// ExportAssets exports assets with time and resource types to a given Cloud Storage -// location. The output format is newline-delimited JSON. -// This API implements the -// google.longrunning.Operation API allowing -// you to keep track of the export. -func (c *Client) ExportAssets(ctx context.Context, req *assetpb.ExportAssetsRequest, opts ...gax.CallOption) (*ExportAssetsOperation, error) { - if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines { - cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond) - defer cancel() - ctx = cctx - } - md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))) - ctx = insertMetadata(ctx, c.xGoogMetadata, md) - opts = append(c.CallOptions.ExportAssets[0:len(c.CallOptions.ExportAssets):len(c.CallOptions.ExportAssets)], opts...) - var resp *longrunningpb.Operation - err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { - var err error - resp, err = c.client.ExportAssets(ctx, req, settings.GRPC...) - return err - }, opts...) - if err != nil { - return nil, err - } - return &ExportAssetsOperation{ - lro: longrunning.InternalNewOperation(c.LROClient, resp), - }, nil -} - -// BatchGetAssetsHistory batch gets the update history of assets that overlap a time window. -// For RESOURCE content, this API outputs history with asset in both -// non-delete or deleted status. -// For IAM_POLICY content, this API outputs history when the asset and its -// attached IAM POLICY both exist. This can create gaps in the output history. -// If a specified asset does not exist, this API returns an INVALID_ARGUMENT -// error. -func (c *Client) BatchGetAssetsHistory(ctx context.Context, req *assetpb.BatchGetAssetsHistoryRequest, opts ...gax.CallOption) (*assetpb.BatchGetAssetsHistoryResponse, error) { - if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines { - cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond) - defer cancel() - ctx = cctx - } - md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "parent", url.QueryEscape(req.GetParent()))) - ctx = insertMetadata(ctx, c.xGoogMetadata, md) - opts = append(c.CallOptions.BatchGetAssetsHistory[0:len(c.CallOptions.BatchGetAssetsHistory):len(c.CallOptions.BatchGetAssetsHistory)], opts...) - var resp *assetpb.BatchGetAssetsHistoryResponse - err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { - var err error - resp, err = c.client.BatchGetAssetsHistory(ctx, req, settings.GRPC...) - return err - }, opts...) - if err != nil { - return nil, err - } - return resp, nil -} - -// ExportAssetsOperation manages a long-running operation from ExportAssets. -type ExportAssetsOperation struct { - lro *longrunning.Operation -} - -// ExportAssetsOperation returns a new ExportAssetsOperation from a given name. -// The name must be that of a previously created ExportAssetsOperation, possibly from a different process. -func (c *Client) ExportAssetsOperation(name string) *ExportAssetsOperation { - return &ExportAssetsOperation{ - lro: longrunning.InternalNewOperation(c.LROClient, &longrunningpb.Operation{Name: name}), - } -} - -// Wait blocks until the long-running operation is completed, returning the response and any errors encountered. -// -// See documentation of Poll for error-handling information. -func (op *ExportAssetsOperation) Wait(ctx context.Context, opts ...gax.CallOption) (*assetpb.ExportAssetsResponse, error) { - var resp assetpb.ExportAssetsResponse - if err := op.lro.WaitWithInterval(ctx, &resp, time.Minute, opts...); err != nil { - return nil, err - } - return &resp, nil -} - -// Poll fetches the latest state of the long-running operation. -// -// Poll also fetches the latest metadata, which can be retrieved by Metadata. -// -// If Poll fails, the error is returned and op is unmodified. If Poll succeeds and -// the operation has completed with failure, the error is returned and op.Done will return true. -// If Poll succeeds and the operation has completed successfully, -// op.Done will return true, and the response of the operation is returned. -// If Poll succeeds and the operation has not completed, the returned response and error are both nil. -func (op *ExportAssetsOperation) Poll(ctx context.Context, opts ...gax.CallOption) (*assetpb.ExportAssetsResponse, error) { - var resp assetpb.ExportAssetsResponse - if err := op.lro.Poll(ctx, &resp, opts...); err != nil { - return nil, err - } - if !op.Done() { - return nil, nil - } - return &resp, nil -} - -// Metadata returns metadata associated with the long-running operation. -// Metadata itself does not contact the server, but Poll does. -// To get the latest metadata, call this method after a successful call to Poll. -// If the metadata is not available, the returned metadata and error are both nil. -func (op *ExportAssetsOperation) Metadata() (*assetpb.ExportAssetsRequest, error) { - var meta assetpb.ExportAssetsRequest - if err := op.lro.Metadata(&meta); err == longrunning.ErrNoMetadata { - return nil, nil - } else if err != nil { - return nil, err - } - return &meta, nil -} - -// Done reports whether the long-running operation has completed. -func (op *ExportAssetsOperation) Done() bool { - return op.lro.Done() -} - -// Name returns the name of the long-running operation. -// The name is assigned by the server and is unique within the service from which the operation is created. -func (op *ExportAssetsOperation) Name() string { - return op.lro.Name() -} diff --git a/asset/apiv1beta1/asset_client_example_test.go b/asset/apiv1beta1/asset_client_example_test.go deleted file mode 100644 index 45daff4c5b8..00000000000 --- a/asset/apiv1beta1/asset_client_example_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by protoc-gen-go_gapic. DO NOT EDIT. - -package asset_test - -import ( - "context" - - asset "cloud.google.com/go/asset/apiv1beta1" - assetpb "google.golang.org/genproto/googleapis/cloud/asset/v1beta1" -) - -func ExampleNewClient() { - ctx := context.Background() - c, err := asset.NewClient(ctx) - if err != nil { - // TODO: Handle error. - } - // TODO: Use client. - _ = c -} - -func ExampleClient_ExportAssets() { - // import assetpb "google.golang.org/genproto/googleapis/cloud/asset/v1beta1" - - ctx := context.Background() - c, err := asset.NewClient(ctx) - if err != nil { - // TODO: Handle error. - } - - req := &assetpb.ExportAssetsRequest{ - // TODO: Fill request struct fields. - } - op, err := c.ExportAssets(ctx, req) - if err != nil { - // TODO: Handle error. - } - - resp, err := op.Wait(ctx) - if err != nil { - // TODO: Handle error. - } - // TODO: Use resp. - _ = resp -} - -func ExampleClient_BatchGetAssetsHistory() { - // import assetpb "google.golang.org/genproto/googleapis/cloud/asset/v1beta1" - - ctx := context.Background() - c, err := asset.NewClient(ctx) - if err != nil { - // TODO: Handle error. - } - - req := &assetpb.BatchGetAssetsHistoryRequest{ - // TODO: Fill request struct fields. - } - resp, err := c.BatchGetAssetsHistory(ctx, req) - if err != nil { - // TODO: Handle error. - } - // TODO: Use resp. - _ = resp -} diff --git a/asset/apiv1beta1/doc.go b/asset/apiv1beta1/doc.go deleted file mode 100644 index e117d1eed36..00000000000 --- a/asset/apiv1beta1/doc.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by protoc-gen-go_gapic. DO NOT EDIT. - -// Package asset is an auto-generated package for the -// Cloud Asset API. -// -// The cloud asset API manages the history and inventory of cloud resources. -// -// Use of Context -// -// The ctx passed to NewClient is used for authentication requests and -// for creating the underlying connection, but is not used for subsequent calls. -// Individual methods on the client use the ctx given to them. -// -// To close the open connection, use the Close() method. -// -// For information about setting deadlines, reusing contexts, and more -// please visit pkg.go.dev/cloud.google.com/go. -package asset // import "cloud.google.com/go/asset/apiv1beta1" - -import ( - "context" - "os" - "runtime" - "strconv" - "strings" - "unicode" - - "google.golang.org/api/option" - "google.golang.org/grpc/metadata" -) - -// For more information on implementing a client constructor hook, see -// https://github.com/googleapis/google-cloud-go/wiki/Customizing-constructors. -type clientHookParams struct{} -type clientHook func(context.Context, clientHookParams) ([]option.ClientOption, error) - -const versionClient = "20201211" - -func insertMetadata(ctx context.Context, mds ...metadata.MD) context.Context { - out, _ := metadata.FromOutgoingContext(ctx) - out = out.Copy() - for _, md := range mds { - for k, v := range md { - out[k] = append(out[k], v...) - } - } - return metadata.NewOutgoingContext(ctx, out) -} - -func checkDisableDeadlines() (bool, error) { - raw, ok := os.LookupEnv("GOOGLE_API_GO_EXPERIMENTAL_DISABLE_DEFAULT_DEADLINE") - if !ok { - return false, nil - } - - b, err := strconv.ParseBool(raw) - return b, err -} - -// DefaultAuthScopes reports the default set of authentication scopes to use with this package. -func DefaultAuthScopes() []string { - return []string{ - "https://www.googleapis.com/auth/cloud-platform", - } -} - -// versionGo returns the Go runtime version. The returned string -// has no whitespace, suitable for reporting in header. -func versionGo() string { - const develPrefix = "devel +" - - s := runtime.Version() - if strings.HasPrefix(s, develPrefix) { - s = s[len(develPrefix):] - if p := strings.IndexFunc(s, unicode.IsSpace); p >= 0 { - s = s[:p] - } - return s - } - - notSemverRune := func(r rune) bool { - return !strings.ContainsRune("0123456789.", r) - } - - if strings.HasPrefix(s, "go1") { - s = s[2:] - var prerelease string - if p := strings.IndexFunc(s, notSemverRune); p >= 0 { - s, prerelease = s[:p], s[p:] - } - if strings.HasSuffix(s, ".") { - s += "0" - } else if strings.Count(s, ".") < 2 { - s += ".0" - } - if prerelease != "" { - s += "-" + prerelease - } - return s - } - return "UNKNOWN" -} diff --git a/asset/apiv1beta1/mock_test.go b/asset/apiv1beta1/mock_test.go deleted file mode 100644 index 4e3d4796d56..00000000000 --- a/asset/apiv1beta1/mock_test.go +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright 2019 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by gapic-generator. DO NOT EDIT. - -package asset - -import ( - "context" - "flag" - "fmt" - "io" - "log" - "net" - "os" - "strings" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "google.golang.org/api/option" - assetpb "google.golang.org/genproto/googleapis/cloud/asset/v1beta1" - longrunningpb "google.golang.org/genproto/googleapis/longrunning" - - status "google.golang.org/genproto/googleapis/rpc/status" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - - gstatus "google.golang.org/grpc/status" -) - -var _ = io.EOF -var _ = ptypes.MarshalAny -var _ status.Status - -type mockAssetServer struct { - // Embed for forward compatibility. - // Tests will keep working if more methods are added - // in the future. - assetpb.AssetServiceServer - - reqs []proto.Message - - // If set, all calls return this error. - err error - - // responses to return if err == nil - resps []proto.Message -} - -func (s *mockAssetServer) ExportAssets(ctx context.Context, req *assetpb.ExportAssetsRequest) (*longrunningpb.Operation, error) { - md, _ := metadata.FromIncomingContext(ctx) - if xg := md["x-goog-api-client"]; len(xg) == 0 || !strings.Contains(xg[0], "gl-go/") { - return nil, fmt.Errorf("x-goog-api-client = %v, expected gl-go key", xg) - } - s.reqs = append(s.reqs, req) - if s.err != nil { - return nil, s.err - } - return s.resps[0].(*longrunningpb.Operation), nil -} - -func (s *mockAssetServer) BatchGetAssetsHistory(ctx context.Context, req *assetpb.BatchGetAssetsHistoryRequest) (*assetpb.BatchGetAssetsHistoryResponse, error) { - md, _ := metadata.FromIncomingContext(ctx) - if xg := md["x-goog-api-client"]; len(xg) == 0 || !strings.Contains(xg[0], "gl-go/") { - return nil, fmt.Errorf("x-goog-api-client = %v, expected gl-go key", xg) - } - s.reqs = append(s.reqs, req) - if s.err != nil { - return nil, s.err - } - return s.resps[0].(*assetpb.BatchGetAssetsHistoryResponse), nil -} - -// clientOpt is the option tests should use to connect to the test server. -// It is initialized by TestMain. -var clientOpt option.ClientOption - -var ( - mockAsset mockAssetServer -) - -func TestMain(m *testing.M) { - flag.Parse() - - serv := grpc.NewServer() - assetpb.RegisterAssetServiceServer(serv, &mockAsset) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - log.Fatal(err) - } - go serv.Serve(lis) - - conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) - if err != nil { - log.Fatal(err) - } - clientOpt = option.WithGRPCConn(conn) - - os.Exit(m.Run()) -} - -func TestAssetServiceExportAssets(t *testing.T) { - var expectedResponse *assetpb.ExportAssetsResponse = &assetpb.ExportAssetsResponse{} - - mockAsset.err = nil - mockAsset.reqs = nil - - any, err := ptypes.MarshalAny(expectedResponse) - if err != nil { - t.Fatal(err) - } - mockAsset.resps = append(mockAsset.resps[:0], &longrunningpb.Operation{ - Name: "longrunning-test", - Done: true, - Result: &longrunningpb.Operation_Response{Response: any}, - }) - - var formattedParent string = fmt.Sprintf("projects/%s", "[PROJECT]") - var outputConfig *assetpb.OutputConfig = &assetpb.OutputConfig{} - var request = &assetpb.ExportAssetsRequest{ - Parent: formattedParent, - OutputConfig: outputConfig, - } - - c, err := NewClient(context.Background(), clientOpt) - if err != nil { - t.Fatal(err) - } - - respLRO, err := c.ExportAssets(context.Background(), request) - if err != nil { - t.Fatal(err) - } - resp, err := respLRO.Wait(context.Background()) - - if err != nil { - t.Fatal(err) - } - - if want, got := request, mockAsset.reqs[0]; !proto.Equal(want, got) { - t.Errorf("wrong request %q, want %q", got, want) - } - - if want, got := expectedResponse, resp; !proto.Equal(want, got) { - t.Errorf("wrong response %q, want %q)", got, want) - } -} - -func TestAssetServiceExportAssetsError(t *testing.T) { - errCode := codes.PermissionDenied - mockAsset.err = nil - mockAsset.resps = append(mockAsset.resps[:0], &longrunningpb.Operation{ - Name: "longrunning-test", - Done: true, - Result: &longrunningpb.Operation_Error{ - Error: &status.Status{ - Code: int32(errCode), - Message: "test error", - }, - }, - }) - - var formattedParent string = fmt.Sprintf("projects/%s", "[PROJECT]") - var outputConfig *assetpb.OutputConfig = &assetpb.OutputConfig{} - var request = &assetpb.ExportAssetsRequest{ - Parent: formattedParent, - OutputConfig: outputConfig, - } - - c, err := NewClient(context.Background(), clientOpt) - if err != nil { - t.Fatal(err) - } - - respLRO, err := c.ExportAssets(context.Background(), request) - if err != nil { - t.Fatal(err) - } - resp, err := respLRO.Wait(context.Background()) - - if st, ok := gstatus.FromError(err); !ok { - t.Errorf("got error %v, expected grpc error", err) - } else if c := st.Code(); c != errCode { - t.Errorf("got error code %q, want %q", c, errCode) - } - _ = resp -} -func TestAssetServiceBatchGetAssetsHistory(t *testing.T) { - var expectedResponse *assetpb.BatchGetAssetsHistoryResponse = &assetpb.BatchGetAssetsHistoryResponse{} - - mockAsset.err = nil - mockAsset.reqs = nil - - mockAsset.resps = append(mockAsset.resps[:0], expectedResponse) - - var formattedParent string = fmt.Sprintf("projects/%s", "[PROJECT]") - var contentType assetpb.ContentType = assetpb.ContentType_CONTENT_TYPE_UNSPECIFIED - var readTimeWindow *assetpb.TimeWindow = &assetpb.TimeWindow{} - var request = &assetpb.BatchGetAssetsHistoryRequest{ - Parent: formattedParent, - ContentType: contentType, - ReadTimeWindow: readTimeWindow, - } - - c, err := NewClient(context.Background(), clientOpt) - if err != nil { - t.Fatal(err) - } - - resp, err := c.BatchGetAssetsHistory(context.Background(), request) - - if err != nil { - t.Fatal(err) - } - - if want, got := request, mockAsset.reqs[0]; !proto.Equal(want, got) { - t.Errorf("wrong request %q, want %q", got, want) - } - - if want, got := expectedResponse, resp; !proto.Equal(want, got) { - t.Errorf("wrong response %q, want %q)", got, want) - } -} - -func TestAssetServiceBatchGetAssetsHistoryError(t *testing.T) { - errCode := codes.PermissionDenied - mockAsset.err = gstatus.Error(errCode, "test error") - - var formattedParent string = fmt.Sprintf("projects/%s", "[PROJECT]") - var contentType assetpb.ContentType = assetpb.ContentType_CONTENT_TYPE_UNSPECIFIED - var readTimeWindow *assetpb.TimeWindow = &assetpb.TimeWindow{} - var request = &assetpb.BatchGetAssetsHistoryRequest{ - Parent: formattedParent, - ContentType: contentType, - ReadTimeWindow: readTimeWindow, - } - - c, err := NewClient(context.Background(), clientOpt) - if err != nil { - t.Fatal(err) - } - - resp, err := c.BatchGetAssetsHistory(context.Background(), request) - - if st, ok := gstatus.FromError(err); !ok { - t.Errorf("got error %v, expected grpc error", err) - } else if c := st.Code(); c != errCode { - t.Errorf("got error code %q, want %q", c, errCode) - } - _ = resp -} diff --git a/internal/gapicgen/generator/config.go b/internal/gapicgen/generator/config.go index 62c43c2c927..fdc58fa82a6 100644 --- a/internal/gapicgen/generator/config.go +++ b/internal/gapicgen/generator/config.go @@ -616,14 +616,6 @@ var microgenGapicConfigs = []*microgenConfig{ apiServiceConfigPath: "google/cloud/videointelligence/videointelligence_v1beta2.yaml", releaseLevel: "beta", }, - { - inputDirectoryPath: "google/cloud/asset/v1beta1", - pkg: "asset", - importPath: "cloud.google.com/go/asset/apiv1beta1", - gRPCServiceConfigPath: "google/cloud/asset/v1beta1/cloudasset_grpc_service_config.json", - apiServiceConfigPath: "google/cloud/asset/v1beta1/cloudasset_v1beta1.yaml", - releaseLevel: "beta", - }, { inputDirectoryPath: "google/cloud/asset/v1p2beta1", pkg: "asset", From 519bfb7423956acb1b6a60e878023d5d3ef3af66 Mon Sep 17 00:00:00 2001 From: Tyler Bui-Palsulich <26876514+tbpg@users.noreply.github.com> Date: Wed, 16 Dec 2020 10:21:58 -0500 Subject: [PATCH 03/13] chore: update GoDoc badge to pkg.go.dev (#3469) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1ffe6700b5c..5bd4d49651f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Google Cloud Client Libraries for Go -[![GoDoc](https://godoc.org/cloud.google.com/go?status.svg)](https://pkg.go.dev/cloud.google.com/go) +[![Go Reference](https://pkg.go.dev/badge/cloud.google.com/go.svg)](https://pkg.go.dev/cloud.google.com/go) Go packages for [Google Cloud Platform](https://cloud.google.com) services. From 11fb91711db5b941995737980cef7b48b611fefd Mon Sep 17 00:00:00 2001 From: 839 <8398a7@gmail.com> Date: Thu, 17 Dec 2020 01:44:03 +0900 Subject: [PATCH 04/13] fix(spanner): fix session leak (#3461) Fixed a problem where the session was not reused when a panic occurred. Fixes #3460 --- spanner/client.go | 8 +++++--- spanner/client_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index eb5ff9798d3..8f14968e04b 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -451,6 +451,11 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea var ( sh *sessionHandle ) + defer func() { + if sh != nil { + sh.recycle() + } + }() err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error { var ( err error @@ -480,9 +485,6 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea resp, err = t.runInTransaction(ctx, f) return err }) - if sh != nil { - sh.recycle() - } return resp, err } diff --git a/spanner/client_test.go b/spanner/client_test.go index 92cac5564bb..4fc6219a0f4 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -828,6 +828,34 @@ func TestClient_ReadWriteTransaction_Update_QueryOptions(t *testing.T) { } } +func TestClient_ReadWriteTransaction_DoNotLeakSessionOnPanic(t *testing.T) { + // Make sure that there is always only one session in the pool. + sc := SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + } + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{SessionPoolConfig: sc}) + defer teardown() + ctx := context.Background() + + // If a panic occurs during a transaction, the session will not leak. + func() { + defer func() { recover() }() + + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + panic("cause panic") + return nil + }) + if err != nil { + t.Fatalf("Unexpected error during transaction: %v", err) + } + }() + + if g, w := client.idleSessions.idleList.Len(), 1; g != w { + t.Fatalf("idle session count mismatch.\nGot: %v\nWant: %v", g, w) + } +} + func TestClient_SessionNotFound(t *testing.T) { // Ensure we always have at least one session in the pool. sc := SessionPoolConfig{ From 98331bd26e32c7b3295baf2db463d806dc8cad7e Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Wed, 16 Dec 2020 13:51:21 -0500 Subject: [PATCH 05/13] test(datastore): remove flaky timezone test (#3470) This test causes a race condition and there's no simple way to get it to work. I'll file another issue to figure out a way to test this behavior. Fixes #3402 --- datastore/load_test.go | 37 ------------------------------------- 1 file changed, 37 deletions(-) diff --git a/datastore/load_test.go b/datastore/load_test.go index b201f08ce2f..518fa61c10b 100644 --- a/datastore/load_test.go +++ b/datastore/load_test.go @@ -439,43 +439,6 @@ type withUntypedInterface struct { Field interface{} } -func TestLoadCivilTimeInNonUTCZone(t *testing.T) { - t.Skip("https://github.com/googleapis/google-cloud-go/issues/3402") - src := &pb.Entity{ - Key: keyToProto(testKey0), - Properties: map[string]*pb.Value{ - "Time": {ValueType: &pb.Value_TimestampValue{TimestampValue: ×tamppb.Timestamp{Seconds: 1605504600}}}, - }, - } - dst := &struct{ Time civil.Time }{ - Time: civil.Time{}, - } - want := &struct{ Time civil.Time }{ - Time: civil.Time{ - Hour: 5, - Minute: 30, - }, - } - loc, err := time.LoadLocation("Africa/Cairo") - if err != nil { - t.Fatalf("LoadLocation: %v", err) - } - time.Local = loc - - err = loadEntityProto(dst, src) - if err != nil { - t.Fatalf("loadEntityProto: %v", err) - } - if diff := testutil.Diff(dst, want); diff != "" { - t.Fatalf("Mismatch: got - want +\n%s", diff) - } - loc, err = time.LoadLocation("UTC") - if err != nil { - t.Fatalf("LoadLocation: %v", err) - } - time.Local = loc -} - func TestLoadToInterface(t *testing.T) { testCases := []struct { name string From 299de04ec22e3cc31d76ded257a86008960ea4bb Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 17 Dec 2020 06:36:04 +1100 Subject: [PATCH 06/13] refactor(pubsublite): rename common.PublishMetadata to publish.Metadata (#3457) Packages named `common` are generally discouraged. --- pubsublite/internal/wire/publish_batcher.go | 6 ++--- .../internal/wire/publish_batcher_test.go | 6 ++--- .../publish_data.go => publish/metadata.go} | 22 +++++++++---------- .../metadata_test.go} | 14 ++++++------ 4 files changed, 24 insertions(+), 24 deletions(-) rename pubsublite/{common/publish_data.go => publish/metadata.go} (57%) rename pubsublite/{common/publish_data_test.go => publish/metadata_test.go} (76%) diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go index 8a32accf248..a6d150565f2 100644 --- a/pubsublite/internal/wire/publish_batcher.go +++ b/pubsublite/internal/wire/publish_batcher.go @@ -18,7 +18,7 @@ import ( "errors" "fmt" - "cloud.google.com/go/pubsublite/common" + "cloud.google.com/go/pubsublite/publish" "google.golang.org/api/support/bundler" "google.golang.org/protobuf/proto" @@ -28,7 +28,7 @@ import ( var errPublishQueueEmpty = errors.New("pubsublite: received publish response from server with no batches in flight") // PublishResultFunc receives the result of a publish. -type PublishResultFunc func(*common.PublishMetadata, error) +type PublishResultFunc func(*publish.Metadata, error) // messageHolder stores a message to be published, with associated metadata. type messageHolder struct { @@ -141,7 +141,7 @@ func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error { batch, _ := frontElem.Value.(*publishBatch) for i, msgHolder := range batch.msgHolders { // Messages are ordered, so the offset of each message is firstOffset + i. - pm := &common.PublishMetadata{Partition: b.partition, Offset: firstOffset + int64(i)} + pm := &publish.Metadata{Partition: b.partition, Offset: firstOffset + int64(i)} msgHolder.onResult(pm, nil) b.availableBufferBytes += msgHolder.size } diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go index 3e12fe2bef1..b36fed6fe75 100644 --- a/pubsublite/internal/wire/publish_batcher_test.go +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -19,8 +19,8 @@ import ( "time" "cloud.google.com/go/internal/testutil" - "cloud.google.com/go/pubsublite/common" "cloud.google.com/go/pubsublite/internal/test" + "cloud.google.com/go/pubsublite/publish" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -35,7 +35,7 @@ type testPublishResultReceiver struct { done chan struct{} msg string t *testing.T - got *common.PublishMetadata + got *publish.Metadata gotErr error } @@ -47,7 +47,7 @@ func newTestPublishResultReceiver(t *testing.T, msg *pb.PubSubMessage) *testPubl } } -func (r *testPublishResultReceiver) set(pm *common.PublishMetadata, err error) { +func (r *testPublishResultReceiver) set(pm *publish.Metadata, err error) { r.got = pm r.gotErr = err close(r.done) diff --git a/pubsublite/common/publish_data.go b/pubsublite/publish/metadata.go similarity index 57% rename from pubsublite/common/publish_data.go rename to pubsublite/publish/metadata.go index a4658160d18..5a3c4a4b4d0 100644 --- a/pubsublite/common/publish_data.go +++ b/pubsublite/publish/metadata.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package common +package publish import ( "fmt" @@ -19,28 +19,28 @@ import ( "strings" ) -// PublishMetadata holds the results of a published message. -type PublishMetadata struct { +// Metadata holds the results of a published message. +type Metadata struct { Partition int Offset int64 } -func (pm *PublishMetadata) String() string { - return fmt.Sprintf("%d:%d", pm.Partition, pm.Offset) +func (m *Metadata) String() string { + return fmt.Sprintf("%d:%d", m.Partition, m.Offset) } -// ParsePublishMetadata converts a string obtained from PublishMetadata.String() -// back to PublishMetadata. -func ParsePublishMetadata(input string) (*PublishMetadata, error) { +// ParseMetadata converts a string obtained from Metadata.String() +// back to Metadata. +func ParseMetadata(input string) (*Metadata, error) { parts := strings.Split(input, ":") if len(parts) != 2 { - return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input) + return nil, fmt.Errorf("pubsublite: invalid encoded publish metadata %q", input) } partition, pErr := strconv.ParseInt(parts[0], 10, 64) offset, oErr := strconv.ParseInt(parts[1], 10, 64) if pErr != nil || oErr != nil { - return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input) + return nil, fmt.Errorf("pubsublite: invalid encoded publish metadata %q", input) } - return &PublishMetadata{Partition: int(partition), Offset: offset}, nil + return &Metadata{Partition: int(partition), Offset: offset}, nil } diff --git a/pubsublite/common/publish_data_test.go b/pubsublite/publish/metadata_test.go similarity index 76% rename from pubsublite/common/publish_data_test.go rename to pubsublite/publish/metadata_test.go index 89151374584..e9529934b73 100644 --- a/pubsublite/common/publish_data_test.go +++ b/pubsublite/publish/metadata_test.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package common +package publish import ( "testing" @@ -23,18 +23,18 @@ func TestPublishMetadataStringEncoding(t *testing.T) { for _, tc := range []struct { desc string input string - want *PublishMetadata + want *Metadata wantErr bool }{ { desc: "valid: zero", input: "0:0", - want: &PublishMetadata{Partition: 0, Offset: 0}, + want: &Metadata{Partition: 0, Offset: 0}, }, { desc: "valid: non-zero", input: "3:1234", - want: &PublishMetadata{Partition: 3, Offset: 1234}, + want: &Metadata{Partition: 3, Offset: 1234}, }, { desc: "invalid: number", @@ -53,14 +53,14 @@ func TestPublishMetadataStringEncoding(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - got, gotErr := ParsePublishMetadata(tc.input) + got, gotErr := ParseMetadata(tc.input) if !testutil.Equal(got, tc.want) || (gotErr != nil) != tc.wantErr { - t.Errorf("ParsePublishMetadata(%q): got (%v, %v), want (%v, err=%v)", tc.input, got, gotErr, tc.want, tc.wantErr) + t.Errorf("ParseMetadata(%q): got (%v, %v), want (%v, err=%v)", tc.input, got, gotErr, tc.want, tc.wantErr) } if tc.want != nil { if got := tc.want.String(); got != tc.input { - t.Errorf("PublishMetadata(%v).String(): got %q, want: %q", tc.want, got, tc.input) + t.Errorf("Metadata(%v).String(): got %q, want: %q", tc.want, got, tc.input) } } }) From 81fc2d1f2934b7590da3eb2d35f70632409af80a Mon Sep 17 00:00:00 2001 From: Mohan Li <67390330+mohanli-ml@users.noreply.github.com> Date: Thu, 17 Dec 2020 14:04:02 -0800 Subject: [PATCH 07/13] bigtable: Update user-agent to the latest version (#3429) Currently the user agent of Bigtable in Go is standalone, as in https://github.com/googleapis/google-cloud-go/blob/master/bigtable/doc.go#L121m. Update it to the latest version https://pkg.go.dev/cloud.google.com/go/bigtable. Related issue: https://github.com/googleapis/google-cloud-go/issues/3330. Background: DirectPath will use user agent to deny a specific library version, so we want to update the user agent with the library release date. --- bigtable/doc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigtable/doc.go b/bigtable/doc.go index 37d06c035cc..f21ac5bb229 100644 --- a/bigtable/doc.go +++ b/bigtable/doc.go @@ -117,8 +117,8 @@ const ( ) // clientUserAgent identifies the version of this package. -// It should be bumped upon significant changes only. -const clientUserAgent = "cbt-go/20180601" +// It should be the same as https://pkg.go.dev/cloud.google.com/go/bigtable. +const clientUserAgent = "cbt-go/v1.6.0" // resourcePrefixHeader is the name of the metadata header used to indicate // the resource being operated on. From 87b972c7c2dfe28413924ff43281c79f3af3e297 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Dec 2020 08:09:02 +1100 Subject: [PATCH 08/13] refactor(pubsublite): retryableStream cleanups (#3392) Added initialResponseRequired for clarity. Use unsafeTerminate when mutex is already held. --- pubsublite/internal/wire/assigner.go | 6 +++--- pubsublite/internal/wire/committer.go | 6 ++---- pubsublite/internal/wire/publisher.go | 4 ++-- pubsublite/internal/wire/streams.go | 21 ++++++++++++--------- pubsublite/internal/wire/streams_test.go | 4 ++-- pubsublite/internal/wire/subscriber.go | 4 ++-- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go index eeecfe9bdca..e6e722bb1d7 100644 --- a/pubsublite/internal/wire/assigner.go +++ b/pubsublite/internal/wire/assigner.go @@ -112,12 +112,12 @@ func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) { return a.assignmentClient.AssignPartitions(ctx) } -func (a *assigner) initialRequest() (interface{}, bool) { - return a.initialReq, false // No initial response expected +func (a *assigner) initialRequest() (interface{}, initialResponseRequired) { + return a.initialReq, initialResponseRequired(false) } func (a *assigner) validateInitialResponse(_ interface{}) error { - // Should not be called. + // Should not be called as initialResponseRequired=false above. return errors.New("pubsublite: unexpected initial response") } diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index 89eb2d9f997..e6cf4be0079 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -102,10 +102,8 @@ func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) { return c.cursorClient.StreamingCommitCursor(ctx) } -func (c *committer) initialRequest() (req interface{}, needsResp bool) { - req = c.initialReq - needsResp = true - return +func (c *committer) initialRequest() (interface{}, initialResponseRequired) { + return c.initialReq, initialResponseRequired(true) } func (c *committer) validateInitialResponse(response interface{}) error { diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index e677b31c942..0c876b70c1e 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -142,8 +142,8 @@ func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientS return pp.pubClient.Publish(addTopicRoutingMetadata(ctx, pp.topic)) } -func (pp *singlePartitionPublisher) initialRequest() (interface{}, bool) { - return pp.initialReq, true +func (pp *singlePartitionPublisher) initialRequest() (interface{}, initialResponseRequired) { + return pp.initialReq, initialResponseRequired(true) } func (pp *singlePartitionPublisher) validateInitialResponse(response interface{}) error { diff --git a/pubsublite/internal/wire/streams.go b/pubsublite/internal/wire/streams.go index 2c9004d1bf7..e97d3dc2e92 100644 --- a/pubsublite/internal/wire/streams.go +++ b/pubsublite/internal/wire/streams.go @@ -38,6 +38,8 @@ const ( streamTerminated streamStatus = 3 ) +type initialResponseRequired bool + // streamHandler provides hooks for different Pub/Sub Lite streaming APIs // (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream. // All Pub/Sub Lite streaming APIs implement a similar handshaking protocol, @@ -56,7 +58,7 @@ type streamHandler interface { newStream(context.Context) (grpc.ClientStream, error) // initialRequest should return the initial request and whether an initial // response is expected. - initialRequest() (interface{}, bool) + initialRequest() (interface{}, initialResponseRequired) validateInitialResponse(interface{}) error // onStreamStatusChange is used to notify stream handlers when the stream has @@ -120,10 +122,9 @@ func (rs *retryableStream) Start() { rs.mu.Lock() defer rs.mu.Unlock() - if rs.status != streamUninitialized { - return + if rs.status == streamUninitialized { + go rs.connectStream() } - go rs.connectStream() } // Stop gracefully closes the stream without error. @@ -136,6 +137,7 @@ func (rs *retryableStream) Stop() { // in progress. func (rs *retryableStream) Send(request interface{}) (sent bool) { rs.mu.Lock() + defer rs.mu.Unlock() if rs.stream != nil { err := rs.stream.SendMsg(request) @@ -150,13 +152,9 @@ func (rs *retryableStream) Send(request interface{}) (sent bool) { case isRetryableSendError(err): go rs.connectStream() default: - rs.mu.Unlock() // terminate acquires the mutex. - rs.terminate(err) - return + rs.unsafeTerminate(err) } } - - rs.mu.Unlock() return } @@ -196,6 +194,8 @@ func (rs *retryableStream) unsafeClearStream() { func (rs *retryableStream) setCancel(cancel context.CancelFunc) { rs.mu.Lock() defer rs.mu.Unlock() + + rs.unsafeClearStream() rs.cancelStream = cancel } @@ -337,7 +337,10 @@ func (rs *retryableStream) listen(recvStream grpc.ClientStream) { func (rs *retryableStream) terminate(err error) { rs.mu.Lock() defer rs.mu.Unlock() + rs.unsafeTerminate(err) +} +func (rs *retryableStream) unsafeTerminate(err error) { if rs.status == streamTerminated { return } diff --git a/pubsublite/internal/wire/streams_test.go b/pubsublite/internal/wire/streams_test.go index 29b0bb522eb..2604e11429f 100644 --- a/pubsublite/internal/wire/streams_test.go +++ b/pubsublite/internal/wire/streams_test.go @@ -99,8 +99,8 @@ func (sh *testStreamHandler) validateInitialResponse(response interface{}) error return nil } -func (sh *testStreamHandler) initialRequest() (interface{}, bool) { - return sh.InitialReq, true +func (sh *testStreamHandler) initialRequest() (interface{}, initialResponseRequired) { + return sh.InitialReq, initialResponseRequired(true) } func (sh *testStreamHandler) onStreamStatusChange(status streamStatus) { diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 8b9c9894c28..5965257609b 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -197,8 +197,8 @@ func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, err return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription)) } -func (s *subscribeStream) initialRequest() (interface{}, bool) { - return s.initialReq, true +func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) { + return s.initialReq, initialResponseRequired(true) } func (s *subscribeStream) validateInitialResponse(response interface{}) error { From 404b6c6ab879a218a6cf48d2bb869a0c938dd4b1 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Dec 2020 09:05:40 +1100 Subject: [PATCH 09/13] refactor(pubsublite): simplify service (#3393) No-op refactoring and additional error checking. --- pubsublite/internal/wire/service.go | 63 ++++++++++++------------ pubsublite/internal/wire/service_test.go | 57 ++++++++++++++++----- 2 files changed, 78 insertions(+), 42 deletions(-) diff --git a/pubsublite/internal/wire/service.go b/pubsublite/internal/wire/service.go index 928d9a8df0d..565f39276d4 100644 --- a/pubsublite/internal/wire/service.go +++ b/pubsublite/internal/wire/service.go @@ -14,6 +14,7 @@ package wire import ( + "errors" "sync" ) @@ -56,6 +57,7 @@ type service interface { AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc) RemoveStatusChangeReceiver(serviceHandle) Handle() serviceHandle + Status() serviceStatus Error() error } @@ -153,10 +155,7 @@ func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err er return true } -type serviceHolder struct { - service service - lastStatus serviceStatus -} +var errChildServiceStarted = errors.New("pubsublite: dependent service must not be started") // compositeService can be embedded into other structs to manage child services. // It implements the service interface and can itself be a dependency of another @@ -169,8 +168,8 @@ type compositeService struct { waitStarted chan struct{} waitTerminated chan struct{} - dependencies []*serviceHolder - removed []*serviceHolder + dependencies []service + removed []service abstractService } @@ -188,7 +187,7 @@ func (cs *compositeService) Start() { if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) { for _, s := range cs.dependencies { - s.service.Start() + s.Start() } } } @@ -218,8 +217,14 @@ func (cs *compositeService) unsafeAddServices(services ...service) error { } for _, s := range services { + // Adding dependent services which have already started not currently + // supported. Requires updating logic to handle the compositeService state. + if s.Status() > serviceUninitialized { + return errChildServiceStarted + } + s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange) - cs.dependencies = append(cs.dependencies, &serviceHolder{service: s}) + cs.dependencies = append(cs.dependencies, s) if cs.status > serviceUninitialized { s.Start() } @@ -227,15 +232,15 @@ func (cs *compositeService) unsafeAddServices(services ...service) error { return nil } -func (cs *compositeService) unsafeRemoveService(service service) { +func (cs *compositeService) unsafeRemoveService(remove service) { removeIdx := -1 for i, s := range cs.dependencies { - if s.service.Handle() == service.Handle() { + if s.Handle() == remove.Handle() { // Move from the `dependencies` to the `removed` list. cs.removed = append(cs.removed, s) removeIdx = i - if s.lastStatus < serviceTerminating { - s.service.Stop() + if s.Status() < serviceTerminating { + s.Stop() } break } @@ -244,12 +249,13 @@ func (cs *compositeService) unsafeRemoveService(service service) { } func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { - for _, s := range cs.dependencies { - if s.lastStatus < serviceTerminating { - s.service.Stop() + if cs.unsafeUpdateStatus(targetStatus, err) { + for _, s := range cs.dependencies { + if s.Status() < serviceTerminating { + s.Stop() + } } } - cs.unsafeUpdateStatus(targetStatus, err) } func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) { @@ -257,7 +263,7 @@ func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err e if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret { // Note: the waitStarted channel must be closed when the service fails to // start. - if previousStatus == serviceStarting { + if previousStatus < serviceActive && targetStatus >= serviceActive { close(cs.waitStarted) } if targetStatus == serviceTerminated { @@ -273,17 +279,15 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s removeIdx := -1 for i, s := range cs.removed { - if s.service.Handle() == handle { + if s.Handle() == handle { if status == serviceTerminated { - s.service.RemoveStatusChangeReceiver(cs.Handle()) + s.RemoveStatusChangeReceiver(cs.Handle()) removeIdx = i } break } } - if removeIdx >= 0 { - cs.removed = removeFromSlice(cs.removed, removeIdx) - } + cs.removed = removeFromSlice(cs.removed, removeIdx) // Note: we cannot rely on the service not being in the removed list above to // determine whether it is an active dependency. The notification may be for a @@ -291,10 +295,7 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s // changes are notified asynchronously and may be received out of order. isDependency := false for _, s := range cs.dependencies { - if s.service.Handle() == handle { - if status > s.lastStatus { - s.lastStatus = status - } + if s.Handle() == handle { isDependency = true break } @@ -307,13 +308,13 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s numTerminated := 0 for _, s := range cs.dependencies { - if shouldTerminate && s.lastStatus < serviceTerminating { - s.service.Stop() + if shouldTerminate && s.Status() < serviceTerminating { + s.Stop() } - if s.lastStatus >= serviceActive { + if s.Status() >= serviceActive { numStarted++ } - if s.lastStatus == serviceTerminated { + if s.Status() == serviceTerminated { numTerminated++ } } @@ -328,7 +329,7 @@ func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status s } } -func removeFromSlice(services []*serviceHolder, removeIdx int) []*serviceHolder { +func removeFromSlice(services []service, removeIdx int) []service { lastIdx := len(services) - 1 if removeIdx < 0 || removeIdx > lastIdx { return services diff --git a/pubsublite/internal/wire/service_test.go b/pubsublite/internal/wire/service_test.go index 9a5b22c8dbb..0dd9b9be4fd 100644 --- a/pubsublite/internal/wire/service_test.go +++ b/pubsublite/internal/wire/service_test.go @@ -202,10 +202,10 @@ func newTestCompositeService(name string) *testCompositeService { return ts } -func (ts *testCompositeService) AddServices(services ...service) { +func (ts *testCompositeService) AddServices(services ...service) error { ts.mu.Lock() defer ts.mu.Unlock() - ts.unsafeAddServices(services...) + return ts.unsafeAddServices(services...) } func (ts *testCompositeService) RemoveService(service service) { @@ -231,7 +231,9 @@ func TestCompositeServiceNormalStop(t *testing.T) { child2 := newTestService("child2") child3 := newTestService("child3") parent := newTestCompositeService("parent") - parent.AddServices(child1, child2) + if err := parent.AddServices(child1, child2); err != nil { + t.Errorf("AddServices() got err: %v", err) + } t.Run("Starting", func(t *testing.T) { wantState := serviceUninitialized @@ -252,7 +254,9 @@ func TestCompositeServiceNormalStop(t *testing.T) { if child3.Status() != wantState { t.Errorf("child3: current service status: got %d, want %d", child3.Status(), wantState) } - parent.AddServices(child3) + if err := parent.AddServices(child3); err != nil { + t.Errorf("AddServices() got err: %v", err) + } child3.receiver.VerifyStatus(t, serviceStarting) }) @@ -300,7 +304,9 @@ func TestCompositeServiceErrorDuringStartup(t *testing.T) { child1 := newTestService("child1") child2 := newTestService("child2") parent := newTestCompositeService("parent") - parent.AddServices(child1, child2) + if err := parent.AddServices(child1, child2); err != nil { + t.Errorf("AddServices() got err: %v", err) + } t.Run("Starting", func(t *testing.T) { parent.Start() @@ -334,7 +340,9 @@ func TestCompositeServiceErrorWhileActive(t *testing.T) { child1 := newTestService("child1") child2 := newTestService("child2") parent := newTestCompositeService("parent") - parent.AddServices(child1, child2) + if err := parent.AddServices(child1, child2); err != nil { + t.Errorf("AddServices() got err: %v", err) + } t.Run("Starting", func(t *testing.T) { parent.Start() @@ -382,7 +390,9 @@ func TestCompositeServiceRemoveService(t *testing.T) { child1 := newTestService("child1") child2 := newTestService("child2") parent := newTestCompositeService("parent") - parent.AddServices(child1, child2) + if err := parent.AddServices(child1, child2); err != nil { + t.Errorf("AddServices() got err: %v", err) + } t.Run("Starting", func(t *testing.T) { parent.Start() @@ -452,16 +462,21 @@ func TestCompositeServiceTree(t *testing.T) { leaf1 := newTestService("leaf1") leaf2 := newTestService("leaf2") intermediate1 := newTestCompositeService("intermediate1") - intermediate1.AddServices(leaf1, leaf2) + if err := intermediate1.AddServices(leaf1, leaf2); err != nil { + t.Errorf("intermediate1.AddServices() got err: %v", err) + } leaf3 := newTestService("leaf3") leaf4 := newTestService("leaf4") intermediate2 := newTestCompositeService("intermediate2") - intermediate2.AddServices(leaf3, leaf4) + if err := intermediate2.AddServices(leaf3, leaf4); err != nil { + t.Errorf("intermediate2.AddServices() got err: %v", err) + } root := newTestCompositeService("root") - root.AddServices(intermediate1, intermediate2) - + if err := root.AddServices(intermediate1, intermediate2); err != nil { + t.Errorf("root.AddServices() got err: %v", err) + } wantErr := errors.New("fail") t.Run("Starting", func(t *testing.T) { @@ -528,3 +543,23 @@ func TestCompositeServiceTree(t *testing.T) { } }) } + +func TestCompositeServiceAddServicesErrors(t *testing.T) { + child1 := newTestService("child1") + parent := newTestCompositeService("parent") + if err := parent.AddServices(child1); err != nil { + t.Errorf("AddServices(child1) got err: %v", err) + } + + child2 := newTestService("child2") + child2.Start() + if gotErr, wantErr := parent.AddServices(child2), errChildServiceStarted; !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("AddServices(child2) got err: (%v), want err: (%v)", gotErr, wantErr) + } + + parent.Stop() + child3 := newTestService("child3") + if gotErr, wantErr := parent.AddServices(child3), ErrServiceStopped; !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("AddServices(child3) got err: (%v), want err: (%v)", gotErr, wantErr) + } +} From 4e1ef1b3fb536ef950249cdee02cc0b6c2b56e86 Mon Sep 17 00:00:00 2001 From: Mohan Li <67390330+mohanli-ml@users.noreply.github.com> Date: Tue, 22 Dec 2020 15:14:56 -0800 Subject: [PATCH 10/13] feat(spanner): include User agent (#3465) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite --- spanner/client.go | 1 + spanner/doc.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/spanner/client.go b/spanner/client.go index 8f14968e04b..1a52fa5abfd 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -177,6 +177,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf ), ), option.WithGRPCConnectionPool(config.NumChannels), + option.WithUserAgent(clientUserAgent), } // opts will take precedence above allOpts, as the values in opts will be // applied after the values in allOpts. diff --git a/spanner/doc.go b/spanner/doc.go index eecaad06526..86e44b2c395 100644 --- a/spanner/doc.go +++ b/spanner/doc.go @@ -357,3 +357,8 @@ at https://godoc.org/go.opencensus.io/trace. OpenCensus tracing requires Go 1.8 or higher. */ package spanner // import "cloud.google.com/go/spanner" + +// clientUserAgent identifies the version of this package. +// It should be the same as https://pkg.go.dev/cloud.google.com/go/spanner. +// TODO: We will want to automate the version with a bash script. +const clientUserAgent = "spanner-go/v1.12.0" From 18e3a4fe2a0c59c6295db2d85c7893ac51688083 Mon Sep 17 00:00:00 2001 From: Mohan Li <67390330+mohanli-ml@users.noreply.github.com> Date: Tue, 22 Dec 2020 15:28:57 -0800 Subject: [PATCH 11/13] feat(spanner): run E2E test over DirectPath (#3466) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite Co-authored-by: Thiago Nunes --- spanner/integration_test.go | 77 +++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 2881813c533..a54d6d7d005 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -39,14 +39,21 @@ import ( instance "cloud.google.com/go/spanner/admin/instance/apiv1" "google.golang.org/api/iterator" "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) +const ( + directPathIPV6Prefix = "[2001:4860:8040" + directPathIPV4Prefix = "34.126" +) + var ( // testProjectID specifies the project used for testing. It can be changed // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. @@ -68,6 +75,9 @@ var ( databaseAdmin *database.DatabaseAdminClient instanceAdmin *instance.InstanceAdminClient + dpConfig directPathTestConfig + peerInfo *peer.Peer + singerDBStatements = []string{ `CREATE TABLE Singers ( SingerId INT64 NOT NULL, @@ -160,6 +170,17 @@ var ( validInstancePattern = regexp.MustCompile("^projects/(?P[^/]+)/instances/(?P[^/]+)$") ) +func init() { + flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag") + flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM") + peerInfo = &peer.Peer{} +} + +type directPathTestConfig struct { + attemptDirectPath bool + directPathIPv4Only bool +} + func parseInstanceName(inst string) (project, instance string, err error) { matches := validInstancePattern.FindStringSubmatch(inst) if len(matches) == 0 { @@ -206,6 +227,11 @@ func initIntegrationTests() (cleanup func()) { if spannerHost != "" { opts = append(opts, option.WithEndpoint(spannerHost)) } + if dpConfig.attemptDirectPath { + // TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta. + opts = append(opts, internaloption.EnableDirectPath(true)) + opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo)))) + } var err error // Create InstanceAdmin and DatabaseAdmin clients. instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) @@ -353,6 +379,7 @@ func TestIntegration_SingleUse(t *testing.T) { if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) } // Calculate time difference between Cloud Spanner server and localhost to // use to determine the exact staleness value to use. @@ -436,6 +463,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) } + verifyDirectPathRemoteAddress(t) rts, err := su.Timestamp() if err != nil { t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) @@ -452,6 +480,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) } + verifyDirectPathRemoteAddress(t) rts, err = su.Timestamp() if err != nil { t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) @@ -470,6 +499,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { continue } + verifyDirectPathRemoteAddress(t) v, err := rowToValues(r) if err != nil { continue @@ -492,6 +522,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) } + verifyDirectPathRemoteAddress(t) // The results from ReadUsingIndex is sorted by the index rather than primary key. if len(got) != len(test.want) { t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) @@ -530,6 +561,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { continue } + verifyDirectPathRemoteAddress(t) v, err := rowToValues(r) if err != nil { continue @@ -931,10 +963,12 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if e != nil { return e } + verifyDirectPathRemoteAddress(t) bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) if e != nil { return e } + verifyDirectPathRemoteAddress(t) if bf <= 0 { return nil } @@ -948,6 +982,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if err != nil { t.Errorf("%d: failed to execute transaction: %v", iter, err) } + verifyDirectPathRemoteAddress(t) }(i) } // Because of context timeout, all goroutines will eventually return. @@ -958,6 +993,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if e != nil { return e } + verifyDirectPathRemoteAddress(t) if ce := r.Column(0, &bf); ce != nil { return ce } @@ -965,6 +1001,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if e != nil { return e } + verifyDirectPathRemoteAddress(t) if bf != 30 || bb != 21 { t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) } @@ -973,6 +1010,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if err != nil { t.Errorf("failed to check balances: %v", err) } + verifyDirectPathRemoteAddress(t) } func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) { @@ -1087,6 +1125,7 @@ func TestIntegration_Reads(t *testing.T) { if _, err := client.Apply(ctx, ms); err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) // Empty read. rows, err := readAllTestTable(client.Single().Read(ctx, testTable, @@ -1094,6 +1133,7 @@ func TestIntegration_Reads(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) if got, want := len(rows), 0; got != want { t.Errorf("got %d, want %d", got, want) } @@ -1104,6 +1144,7 @@ func TestIntegration_Reads(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) if got, want := len(rows), 0; got != want { t.Errorf("got %d, want %d", got, want) } @@ -1113,6 +1154,7 @@ func TestIntegration_Reads(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) var got testTableRow if err := row.ToStruct(&got); err != nil { t.Fatal(err) @@ -1126,12 +1168,14 @@ func TestIntegration_Reads(t *testing.T) { if ErrCode(err) != codes.NotFound { t.Fatalf("got %v, want NotFound", err) } + verifyDirectPathRemoteAddress(t) // Index point read. rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) var gotIndex testTableRow if err := rowIndex.ToStruct(&gotIndex); err != nil { t.Fatal(err) @@ -1144,6 +1188,7 @@ func TestIntegration_Reads(t *testing.T) { if ErrCode(err) != codes.NotFound { t.Fatalf("got %v, want NotFound", err) } + verifyDirectPathRemoteAddress(t) rangeReads(ctx, t, client) indexRangeReads(ctx, t, client) } @@ -1293,6 +1338,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) { if _, err := iter.Next(); err == nil { t.Errorf("client sends query to removed database successfully, want it to fail") } + verifyDirectPathRemoteAddress(t) // Recreate database and table. dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] @@ -1322,6 +1368,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) { if err != nil && err != iterator.Done { t.Errorf("failed to send query to database %v: %v", dbPath, err) } + verifyDirectPathRemoteAddress(t) } // Test encoding/decoding non-struct Cloud Spanner types. @@ -1514,6 +1561,7 @@ func TestIntegration_BasicTypes(t *testing.T) { if err != nil { t.Fatalf("Unable to fetch row %v: %v", i, err) } + verifyDirectPathRemoteAddress(t) // Create new instance of type of test.want. want := test.want if want == nil { @@ -3105,6 +3153,11 @@ func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (cl if spannerHost != "" { opts = append(opts, option.WithEndpoint(spannerHost)) } + if dpConfig.attemptDirectPath { + // TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta. + opts = append(opts, internaloption.EnableDirectPath(true)) + opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo)))) + } client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...) if err != nil { return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) @@ -3226,3 +3279,27 @@ func skipEmulatorTest(t *testing.T) { t.Skip("Skipping testing against the emulator.") } } + +func verifyDirectPathRemoteAddress(t *testing.T) { + t.Helper() + if !dpConfig.attemptDirectPath { + return + } + if remoteIP, res := isDirectPathRemoteAddress(); !res { + if dpConfig.directPathIPv4Only { + t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) + } else { + t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) + } + } +} + +func isDirectPathRemoteAddress() (_ string, _ bool) { + remoteIP := peerInfo.Addr.String() + // DirectPath ipv4-only can only use ipv4 traffic. + if dpConfig.directPathIPv4Only { + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) + } + // DirectPath ipv6 can use either ipv4 or ipv6 traffic. + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) +} From 74923c27efd7936b3e18cd8ccb72882a40c7ff42 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Dec 2020 11:09:04 +1100 Subject: [PATCH 12/13] feat(pubsublite): settings and message transforms for Cloud Pub/Sub shim (#3281) The shim implements various features in the Pub/Sub Lite Java client library for feature parity, e.g. nack handler, custom message transforms, key extractor, etc. It also implements some features from the Cloud Pub/Sub Go library, e.g. publish buffer byte limit. --- pubsublite/go.mod | 2 +- pubsublite/go.sum | 14 +- pubsublite/internal/wire/settings.go | 14 +- pubsublite/ps/doc.go | 43 ++++++ pubsublite/ps/message.go | 133 +++++++++++++++++ pubsublite/ps/message_test.go | 145 ++++++++++++++++++ pubsublite/ps/settings.go | 215 +++++++++++++++++++++++++++ pubsublite/ps/settings_test.go | 154 +++++++++++++++++++ 8 files changed, 716 insertions(+), 4 deletions(-) create mode 100644 pubsublite/ps/doc.go create mode 100644 pubsublite/ps/message.go create mode 100644 pubsublite/ps/message_test.go create mode 100644 pubsublite/ps/settings.go create mode 100644 pubsublite/ps/settings_test.go diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 89973e20cc7..22309bf0107 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -4,11 +4,11 @@ go 1.11 require ( cloud.google.com/go v0.74.0 + cloud.google.com/go/pubsub v1.9.1 github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.4 github.com/google/uuid v1.1.2 github.com/googleapis/gax-go/v2 v2.0.5 - golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 // indirect google.golang.org/api v0.36.0 google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a google.golang.org/grpc v1.34.0 diff --git a/pubsublite/go.sum b/pubsublite/go.sum index fdd5e2ebd1a..ec15514746d 100644 --- a/pubsublite/go.sum +++ b/pubsublite/go.sum @@ -15,6 +15,7 @@ cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOY cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww= cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= +cloud.google.com/go v0.73.0/go.mod h1:BkDh9dFvGjCitVw03TNjKbBxXNKULXXIq6orU6HrJ4Q= cloud.google.com/go v0.74.0 h1:kpgPA77kSSbjSs+fWHkPTxQ6J5Z2Qkruo5jfXEkHxNQ= cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= @@ -29,6 +30,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.9.1 h1:hXEte3a/Brd+Tl9ecEkHH3ow9wpnOTZ28lSOszYj6Cg= +cloud.google.com/go/pubsub v1.9.1/go.mod h1:7QTUeCiy+P1dVPO8hHVbZSHDfibbgm1gbKyOVYnqb8g= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= @@ -105,6 +108,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20201117184057-ae444373da19/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= @@ -212,6 +216,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjN golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 h1:lwlPPsmjDKK0J6eG6xDWd5XPehI0R024zxjDnw3esPA= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -235,6 +240,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -279,6 +286,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -322,9 +330,9 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201208233053-a543418bbed2 h1:vEtypaVub6UvKkiXZ2xx9QIvp9TL7sI7xp7vdi2kezA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 h1:9j/upNXDRpADUw2RpUfJ7E7GHtfhDih62kX6JM8vs2c= -golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -391,6 +399,8 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201203001206-6486ece9c497/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a h1:GnJAhasbD8HiT8DZMvsEx3QLVy/X0icq/MGr0MqRJ2M= google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index e2cc5699746..f1dcd6cb0ea 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -31,9 +31,15 @@ const ( // MaxPublishRequestBytes is the maximum allowed serialized size of a single // publish request (containing a batch of messages) in bytes. Must be lower // than the gRPC limit of 4 MiB. - MaxPublishRequestBytes = 3500000 + MaxPublishRequestBytes int = 3.5 * 1024 * 1024 ) +// FrameworkType is the user-facing API for Cloud Pub/Sub Lite. +type FrameworkType string + +// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub. +const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM" + // PublishSettings control the batching of published messages. These settings // apply per partition. type PublishSettings struct { @@ -70,6 +76,9 @@ type PublishSettings struct { // The polling interval to watch for topic partition count updates. Set to 0 // to disable polling if the number of partitions will never update. ConfigPollPeriod time.Duration + + // The user-facing API type. + Framework FrameworkType } // DefaultPublishSettings holds the default values for PublishSettings. @@ -132,6 +141,9 @@ type ReceiveSettings struct { // specified, the client will use the partition assignment service to // determine which partitions it should connect to. Partitions []int + + // The user-facing API type. + Framework FrameworkType } // DefaultReceiveSettings holds the default values for ReceiveSettings. diff --git a/pubsublite/ps/doc.go b/pubsublite/ps/doc.go new file mode 100644 index 00000000000..43b80de96ea --- /dev/null +++ b/pubsublite/ps/doc.go @@ -0,0 +1,43 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +/* +Package ps contains clients for publishing and subscribing using the Google +Cloud Pub/Sub Lite service. + +If interfaces are defined, PublisherClient and SubscriberClient can be used as +substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(), +respectively, from the pubsub package. + +As noted in comments, the two services have some differences: + - Pub/Sub Lite does not support nack for messages. By default, this will + terminate the SubscriberClient. A custom function can be provided for + ReceiveSettings.NackHandler to handle nacked messages. + - Pub/Sub Lite has no concept of ack expiration. Subscribers must ack or nack + every message received. + - Pub/Sub Lite PublisherClients can terminate when an unretryable error + occurs. + - Publishers and subscribers will be throttled if Pub/Sub Lite publish or + subscribe throughput limits are exceeded. Thus publishing can be more + sensitive to buffer overflow than Cloud Pub/Sub. + +More information about Google Cloud Pub/Sub Lite is available at +https://cloud.google.com/pubsub/lite. + +Information about choosing between Google Cloud Pub/Sub vs Pub/Sub Lite is +available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite. + +See https://godoc.org/cloud.google.com/go for authentication, timeouts, +connection pooling and similar aspects of this package. +*/ +package ps // import "cloud.google.com/go/pubsublite/ps" diff --git a/pubsublite/ps/message.go b/pubsublite/ps/message.go new file mode 100644 index 00000000000..011b4b303b2 --- /dev/null +++ b/pubsublite/ps/message.go @@ -0,0 +1,133 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "encoding/base64" + "errors" + "fmt" + + "cloud.google.com/go/pubsub" + "github.com/golang/protobuf/ptypes" + "google.golang.org/protobuf/proto" + + tspb "github.com/golang/protobuf/ptypes/timestamp" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// Message transforms and event timestamp encoding mirrors the Java client +// library implementation: +// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java +const eventTimestampAttributeKey = "x-goog-pubsublite-event-time-timestamp-proto" + +var errInvalidMessage = errors.New("pubsublite: invalid received message") + +// Encodes a timestamp in a way that it will be interpreted as an event time if +// published on a message with an attribute named eventTimestampAttributeKey. +func encodeEventTimestamp(eventTime *tspb.Timestamp) (string, error) { + bytes, err := proto.Marshal(eventTime) + if err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(bytes), nil +} + +// Decodes a timestamp encoded with encodeEventTimestamp. +func decodeEventTimestamp(value string) (*tspb.Timestamp, error) { + bytes, err := base64.StdEncoding.DecodeString(value) + if err != nil { + return nil, err + } + eventTime := &tspb.Timestamp{} + if err := proto.Unmarshal(bytes, eventTime); err != nil { + return nil, err + } + return eventTime, nil +} + +// extractOrderingKey extracts the ordering key from the message for routing +// during publishing. It is the default KeyExtractorFunc implementation. +func extractOrderingKey(msg *pubsub.Message) []byte { + if len(msg.OrderingKey) == 0 { + return nil + } + return []byte(msg.OrderingKey) +} + +// transformPublishedMessage is the default PublishMessageTransformerFunc +// implementation. +func transformPublishedMessage(from *pubsub.Message, to *pb.PubSubMessage, extractKey KeyExtractorFunc) error { + to.Data = from.Data + to.Key = extractKey(from) + + if len(from.Attributes) > 0 { + to.Attributes = make(map[string]*pb.AttributeValues) + for key, value := range from.Attributes { + if key == eventTimestampAttributeKey { + eventpb, err := decodeEventTimestamp(value) + if err != nil { + return err + } + to.EventTime = eventpb + } else { + to.Attributes[key] = &pb.AttributeValues{Values: [][]byte{[]byte(value)}} + } + } + } + return nil +} + +// transformReceivedMessage is the default ReceiveMessageTransformerFunc +// implementation. +func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) error { + if from == nil || from.GetMessage() == nil { + // This should not occur, but guard against nil. + return errInvalidMessage + } + + var err error + msg := from.GetMessage() + + if from.GetPublishTime() != nil { + if to.PublishTime, err = ptypes.Timestamp(from.GetPublishTime()); err != nil { + return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err) + } + } + if from.GetCursor() != nil { + to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset()) + } + if len(msg.GetKey()) > 0 { + to.OrderingKey = string(msg.GetKey()) + } + to.Data = msg.GetData() + to.Attributes = make(map[string]string) + + if msg.EventTime != nil { + val, err := encodeEventTimestamp(msg.EventTime) + if err != nil { + return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err) + } + to.Attributes[eventTimestampAttributeKey] = val + } + for key, values := range msg.Attributes { + if key == eventTimestampAttributeKey { + return fmt.Errorf("%s: attribute with reserved key %q exists in API message", errInvalidMessage.Error(), eventTimestampAttributeKey) + } + if len(values.Values) > 1 { + return fmt.Errorf("%s: cannot transform API message with multiple values for attribute with key %q", errInvalidMessage.Error(), key) + } + to.Attributes[key] = string(values.Values[0]) + } + return nil +} diff --git a/pubsublite/ps/message_test.go b/pubsublite/ps/message_test.go new file mode 100644 index 00000000000..aec1838c7cb --- /dev/null +++ b/pubsublite/ps/message_test.go @@ -0,0 +1,145 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "log" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsub" + "github.com/google/go-cmp/cmp/cmpopts" + + tspb "github.com/golang/protobuf/ptypes/timestamp" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +func encodeTimestamp(seconds int64, nanos int32) string { + val, err := encodeEventTimestamp(&tspb.Timestamp{ + Seconds: seconds, + Nanos: nanos, + }) + if err != nil { + log.Fatal(err) + } + return val +} + +func TestMessageTransforms(t *testing.T) { + for _, tc := range []struct { + desc string + wireMsg *pb.SequencedMessage + wantMsg *pubsub.Message + wantErr bool + }{ + { + desc: "valid: full message", + wireMsg: &pb.SequencedMessage{ + Cursor: &pb.Cursor{Offset: 10}, + PublishTime: &tspb.Timestamp{ + Seconds: 1577836800, + Nanos: 900800700, + }, + Message: &pb.PubSubMessage{ + Data: []byte("foo"), + Key: []byte("bar"), + EventTime: &tspb.Timestamp{ + Seconds: 1577836800, + Nanos: 500400300, + }, + Attributes: map[string]*pb.AttributeValues{ + "attr1": {Values: [][]byte{[]byte("hello")}}, + "attr2": {Values: [][]byte{[]byte("world")}}, + }, + }, + }, + wantMsg: &pubsub.Message{ + ID: "10", + PublishTime: time.Unix(1577836800, 900800700), + Data: []byte("foo"), + OrderingKey: "bar", + Attributes: map[string]string{ + "attr1": "hello", + "attr2": "world", + "x-goog-pubsublite-event-time-timestamp-proto": encodeTimestamp(1577836800, 500400300), + }, + }, + }, + { + desc: "valid: minimum", + wireMsg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{}, + }, + wantMsg: &pubsub.Message{}, + }, + { + desc: "invalid: sequenced message nil", + wantErr: true, + }, + { + desc: "invalid: pubsubmessage nil", + wireMsg: &pb.SequencedMessage{}, + wantErr: true, + }, + { + desc: "invalid: multiple attribute values", + wireMsg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{ + Attributes: map[string]*pb.AttributeValues{ + "attr1": {Values: [][]byte{[]byte("hello"), []byte("bar")}}, + }, + }, + }, + wantErr: true, + }, + { + desc: "invalid: event time is attribute", + wireMsg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{ + Attributes: map[string]*pb.AttributeValues{ + "x-goog-pubsublite-event-time-timestamp-proto": { + Values: [][]byte{[]byte(encodeTimestamp(1577836800, 500400300))}, + }, + }, + }, + }, + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotRecvMsg := new(pubsub.Message) + gotErr := transformReceivedMessage(tc.wireMsg, gotRecvMsg) + if (gotErr != nil) != tc.wantErr { + t.Errorf("transformReceivedMessage() err = (%v), want err=%v", gotErr, tc.wantErr) + } + + if tc.wantMsg != nil { + if diff := testutil.Diff(gotRecvMsg, tc.wantMsg, cmpopts.IgnoreUnexported(pubsub.Message{}), cmpopts.EquateEmpty()); diff != "" { + t.Errorf("transformReceivedMessage() got: -, want: +\n%s", diff) + } + + // Check reverse conversion equals input. + gotPubMsg := new(pb.PubSubMessage) + gotErr := transformPublishedMessage(tc.wantMsg, gotPubMsg, extractOrderingKey) + if gotErr != nil { + t.Errorf("transformPublishedMessage() err = (%v)", gotErr) + } + if diff := testutil.Diff(gotPubMsg, tc.wireMsg.Message, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("transformPublishedMessage() got: -, want: +\n%s", diff) + } + } + }) + } +} diff --git a/pubsublite/ps/settings.go b/pubsublite/ps/settings.go new file mode 100644 index 00000000000..6a8be9d1bb5 --- /dev/null +++ b/pubsublite/ps/settings.go @@ -0,0 +1,215 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "time" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite/internal/wire" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +const ( + // MaxPublishRequestCount is the maximum number of messages that can be + // batched in a single publish request. + MaxPublishRequestCount = wire.MaxPublishRequestCount + + // MaxPublishRequestBytes is the maximum allowed serialized size of a single + // publish request (containing a batch of messages) in bytes. + MaxPublishRequestBytes = wire.MaxPublishRequestBytes +) + +// KeyExtractorFunc is a function that extracts an ordering key from a Message. +type KeyExtractorFunc func(*pubsub.Message) []byte + +// PublishMessageTransformerFunc transforms a pubsub.Message to a Pub/Sub Lite +// PubSubMessage API proto. If this returns an error, the pubsub.PublishResult +// will be errored and the PublisherClient will consider this a fatal error and +// terminate. +type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) error + +// PublishSettings configure the PublisherClient. These settings apply per +// partition. If BufferedByteLimit is being used to bound memory usage, keep in +// mind the number of partitions in the topic. +// +// A zero PublishSettings will result in values equivalent to +// DefaultPublishSettings. +type PublishSettings struct { + // Publish a non-empty batch after this delay has passed. If DelayThreshold is + // 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise + // must be > 0. + DelayThreshold time.Duration + + // Publish a batch when it has this many messages. The maximum is + // MaxPublishRequestCount. If CountThreshold is 0, it will be treated as + // DefaultPublishSettings.CountThreshold. Otherwise must be > 0. + CountThreshold int + + // Publish a batch when its size in bytes reaches this value. The maximum is + // MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as + // DefaultPublishSettings.ByteThreshold. Otherwise must be > 0. + ByteThreshold int + + // The maximum time that the client will attempt to establish a publish stream + // connection to the server. If Timeout is 0, it will be treated as + // DefaultPublishSettings.Timeout. Otherwise must be > 0. + // + // The timeout is exceeded, the publisher will terminate with the last error + // that occurred while trying to reconnect. Note that if the timeout duration + // is long, ErrOverflow may occur first. + Timeout time.Duration + + // The maximum number of bytes that the publisher will keep in memory before + // returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as + // DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0. + // + // Note that Pub/Sub Lite topics are provisioned a publishing throughput + // capacity, per partition, shared by all publisher clients. Setting a large + // buffer size can mitigate transient publish spikes. However, consistently + // attempting to publish messages at a much higher rate than the publishing + // throughput capacity can cause the buffers to overflow. For more + // information, see https://cloud.google.com/pubsub/lite/docs/topics. + BufferedByteLimit int + + // Optional custom function that extracts an ordering key from a Message. The + // default implementation extracts the key from Message.OrderingKey. + KeyExtractor KeyExtractorFunc + + // Optional custom function that transforms a pubsub.Message to a + // PubSubMessage API proto. + MessageTransformer PublishMessageTransformerFunc +} + +// DefaultPublishSettings holds the default values for PublishSettings. +var DefaultPublishSettings = PublishSettings{ + DelayThreshold: 10 * time.Millisecond, + CountThreshold: 100, + ByteThreshold: 1e6, + Timeout: 60 * time.Second, + BufferedByteLimit: 1e8, +} + +func (s *PublishSettings) toWireSettings() wire.PublishSettings { + wireSettings := wire.PublishSettings{ + DelayThreshold: DefaultPublishSettings.DelayThreshold, + CountThreshold: DefaultPublishSettings.CountThreshold, + ByteThreshold: DefaultPublishSettings.ByteThreshold, + Timeout: DefaultPublishSettings.Timeout, + BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + } + // Negative values preserved, but will fail validation in wire package. + if s.DelayThreshold != 0 { + wireSettings.DelayThreshold = s.DelayThreshold + } + if s.CountThreshold != 0 { + wireSettings.CountThreshold = s.CountThreshold + } + if s.ByteThreshold != 0 { + wireSettings.ByteThreshold = s.ByteThreshold + } + if s.Timeout != 0 { + wireSettings.Timeout = s.Timeout + } + if s.BufferedByteLimit != 0 { + wireSettings.BufferedByteLimit = s.BufferedByteLimit + } + return wireSettings +} + +// NackHandler is invoked when pubsub.Message.Nack() is called. Cloud Pub/Sub +// Lite does not have a concept of 'nack'. If the nack handler implementation +// returns nil, the message is acknowledged. If an error is returned, the +// SubscriberClient will consider this a fatal error and terminate. +// +// In Cloud Pub/Sub Lite, only a single subscriber for a given subscription is +// connected to any partition at a time, and there is no other client that may +// be able to handle messages. +type NackHandler func(*pubsub.Message) error + +// ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API +// proto to a pubsub.Message. If this returns an error, the SubscriberClient +// will consider this a fatal error and terminate. +type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error + +// ReceiveSettings configure the SubscriberClient. These settings apply per +// partition. If MaxOutstandingBytes is being used to bound memory usage, keep +// in mind the number of partitions in the associated topic. +// +// A zero ReceiveSettings will result in values equivalent to +// DefaultReceiveSettings. +type ReceiveSettings struct { + // MaxOutstandingMessages is the maximum number of unacknowledged messages. + // If MaxOutstandingMessages is 0, it will be treated as + // DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0. + MaxOutstandingMessages int + + // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged + // messages. If MaxOutstandingBytes is 0, it will be treated as + // DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0. + MaxOutstandingBytes int + + // The maximum time that the client will attempt to establish a subscribe + // stream connection to the server. If Timeout is 0, it will be treated as + // DefaultReceiveSettings.Timeout. Otherwise must be > 0. + // + // The timeout is exceeded, the SubscriberClient will terminate with the last + // error that occurred while trying to reconnect. + Timeout time.Duration + + // The topic partition numbers (zero-indexed) to receive messages from. + // Values must be less than the number of partitions for the topic. If not + // specified, the SubscriberClient will use the partition assignment service + // to determine which partitions it should connect to. + Partitions []int + + // Optional custom function to handle pubsub.Message.Nack() calls. If not set, + // the default behavior is to terminate the SubscriberClient. + NackHandler NackHandler + + // Optional custom function that transforms a SequencedMessage API proto to a + // pubsub.Message. + MessageTransformer ReceiveMessageTransformerFunc +} + +// DefaultReceiveSettings holds the default values for ReceiveSettings. +var DefaultReceiveSettings = ReceiveSettings{ + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, + Timeout: 60 * time.Second, +} + +func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings { + wireSettings := wire.ReceiveSettings{ + MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, + MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, + Timeout: DefaultReceiveSettings.Timeout, + Partitions: s.Partitions, + Framework: wire.FrameworkCloudPubSubShim, + } + // Negative values preserved, but will fail validation in wire package. + if s.MaxOutstandingMessages != 0 { + wireSettings.MaxOutstandingMessages = s.MaxOutstandingMessages + } + if s.MaxOutstandingBytes != 0 { + wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes + } + if s.Timeout != 0 { + wireSettings.Timeout = s.Timeout + } + return wireSettings +} diff --git a/pubsublite/ps/settings_test.go b/pubsublite/ps/settings_test.go new file mode 100644 index 00000000000..900af0efdcf --- /dev/null +++ b/pubsublite/ps/settings_test.go @@ -0,0 +1,154 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "testing" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/internal/wire" +) + +func TestPublishSettingsToWireSettings(t *testing.T) { + for _, tc := range []struct { + desc string + settings PublishSettings + wantSettings wire.PublishSettings + }{ + { + desc: "default settings", + settings: DefaultPublishSettings, + wantSettings: wire.PublishSettings{ + DelayThreshold: DefaultPublishSettings.DelayThreshold, + CountThreshold: DefaultPublishSettings.CountThreshold, + ByteThreshold: DefaultPublishSettings.ByteThreshold, + Timeout: DefaultPublishSettings.Timeout, + BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "zero settings", + settings: PublishSettings{}, + wantSettings: DefaultPublishSettings.toWireSettings(), + }, + { + desc: "positive values", + settings: PublishSettings{ + DelayThreshold: 2, + CountThreshold: 3, + ByteThreshold: 4, + Timeout: 5, + BufferedByteLimit: 6, + }, + wantSettings: wire.PublishSettings{ + DelayThreshold: 2, + CountThreshold: 3, + ByteThreshold: 4, + Timeout: 5, + BufferedByteLimit: 6, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "negative values", + settings: PublishSettings{ + DelayThreshold: -2, + CountThreshold: -3, + ByteThreshold: -4, + Timeout: -5, + BufferedByteLimit: -6, + }, + wantSettings: wire.PublishSettings{ + DelayThreshold: -2, + CountThreshold: -3, + ByteThreshold: -4, + Timeout: -5, + BufferedByteLimit: -6, + ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" { + t.Errorf("PublishSettings.toWireSettings() got: -, want: +\n%s", diff) + } + }) + } +} + +func TestReceiveSettingsToWireSettings(t *testing.T) { + for _, tc := range []struct { + desc string + settings ReceiveSettings + wantSettings wire.ReceiveSettings + }{ + { + desc: "default settings", + settings: DefaultReceiveSettings, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, + MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, + Timeout: DefaultReceiveSettings.Timeout, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "zero settings", + settings: ReceiveSettings{}, + wantSettings: DefaultReceiveSettings.toWireSettings(), + }, + { + desc: "positive values", + settings: ReceiveSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 3, + Timeout: 4, + Partitions: []int{5, 6}, + }, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 3, + Timeout: 4, + Partitions: []int{5, 6}, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + { + desc: "negative values", + settings: ReceiveSettings{ + MaxOutstandingMessages: -2, + MaxOutstandingBytes: -3, + Timeout: -4, + Partitions: []int{-5, -6}, + }, + wantSettings: wire.ReceiveSettings{ + MaxOutstandingMessages: -2, + MaxOutstandingBytes: -3, + Timeout: -4, + Partitions: []int{-5, -6}, + Framework: wire.FrameworkCloudPubSubShim, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if diff := testutil.Diff(tc.settings.toWireSettings(), tc.wantSettings); diff != "" { + t.Errorf("ReceiveSettings.toWireSettings() got: -, want: +\n%s", diff) + } + }) + } +} From 1648ea06bbb08c3452f79551a9d45147379f13e4 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Dec 2020 12:43:10 +1100 Subject: [PATCH 13/13] feat(pubsublite): publisher client (#3303) PublisherClient wraps Pub/Sub Lite's wire.Publisher and emulates the pubsub.Topic.Publish API. --- pubsublite/go.mod | 1 + pubsublite/internal/wire/assigner_test.go | 23 +- pubsublite/internal/wire/errors.go | 10 +- pubsublite/internal/wire/publish_batcher.go | 5 +- .../internal/wire/publish_batcher_test.go | 6 +- pubsublite/internal/wire/settings.go | 4 - pubsublite/ps/example_test.go | 74 ++++++ pubsublite/ps/publisher.go | 168 +++++++++++++ pubsublite/ps/publisher_test.go | 238 ++++++++++++++++++ 9 files changed, 513 insertions(+), 16 deletions(-) create mode 100644 pubsublite/ps/example_test.go create mode 100644 pubsublite/ps/publisher.go create mode 100644 pubsublite/ps/publisher_test.go diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 22309bf0107..fbb7d6c67aa 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -9,6 +9,7 @@ require ( github.com/google/go-cmp v0.5.4 github.com/google/uuid v1.1.2 github.com/googleapis/gax-go/v2 v2.0.5 + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/api v0.36.0 google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a google.golang.org/grpc v1.34.0 diff --git a/pubsublite/internal/wire/assigner_test.go b/pubsublite/internal/wire/assigner_test.go index 4c7c3f02e36..64e761f1993 100644 --- a/pubsublite/internal/wire/assigner_test.go +++ b/pubsublite/internal/wire/assigner_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "sort" + "sync" "testing" "time" @@ -62,7 +63,8 @@ func fakeGenerateUUID() (uuid.UUID, error) { // testAssigner wraps an assigner for ease of testing. type testAssigner struct { // Fake error to simulate receiver unable to handle assignment. - RetError error + recvError error + mu sync.Mutex t *testing.T asn *assigner @@ -96,12 +98,20 @@ func (ta *testAssigner) receiveAssignment(partitions partitionSet) error { sort.Ints(p) ta.partitions <- p - if ta.RetError != nil { - return ta.RetError + ta.mu.Lock() + defer ta.mu.Unlock() + if ta.recvError != nil { + return ta.recvError } return nil } +func (ta *testAssigner) SetReceiveError(err error) { + ta.mu.Lock() + defer ta.mu.Unlock() + ta.recvError = err +} + func (ta *testAssigner) NextPartitions() []int { select { case <-time.After(serviceTestWaitTimeout): @@ -186,7 +196,8 @@ func TestAssignerHandlePartitionFailure(t *testing.T) { asn := newTestAssigner(t, subscription) // Simulates the assigningSubscriber discarding assignments. - asn.RetError = errors.New("subscriber shutting down") + wantErr := errors.New("subscriber shutting down") + asn.SetReceiveError(wantErr) if gotErr := asn.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) @@ -194,7 +205,7 @@ func TestAssignerHandlePartitionFailure(t *testing.T) { if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) { t.Errorf("Partition assignments: got %v, want %v", got, want) } - if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) { - t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError) + if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } } diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go index 5dc5bc10167..2953dca7011 100644 --- a/pubsublite/internal/wire/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -13,13 +13,21 @@ package wire -import "errors" +import ( + "errors" + "fmt" +) +// Errors exported from this package. var ( // ErrOverflow indicates that the publish buffers have overflowed. See // comments for PublishSettings.BufferedByteLimit. ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed") + // ErrOversizedMessage indicates that the user published a message over the + // allowed serialized byte size limit. It is wrapped in another error. + ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes) + // ErrServiceUninitialized indicates that a service (e.g. publisher or // subscriber) cannot perform an operation because it is uninitialized. ErrServiceUninitialized = errors.New("pubsublite: service must be started") diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go index a6d150565f2..1fa8ed134fb 100644 --- a/pubsublite/internal/wire/publish_batcher.go +++ b/pubsublite/internal/wire/publish_batcher.go @@ -19,6 +19,7 @@ import ( "fmt" "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" "google.golang.org/api/support/bundler" "google.golang.org/protobuf/proto" @@ -109,8 +110,8 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error { msgSize := proto.Size(msg) switch { - case msgSize > MaxPublishMessageBytes: - return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes) + case msgSize > MaxPublishRequestBytes: + return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage) case msgSize > b.availableBufferBytes: return ErrOverflow } diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go index b36fed6fe75..9f8c6a6bc42 100644 --- a/pubsublite/internal/wire/publish_batcher_test.go +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver } func TestPublishBatcherAddMessage(t *testing.T) { - const initAvailableBytes = MaxPublishMessageBytes + 1 + const initAvailableBytes = MaxPublishRequestBytes settings := DefaultPublishSettings settings.BufferedByteLimit = initAvailableBytes @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) { }) t.Run("oversized message", func(t *testing.T) { - msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)} - if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { + msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)} + if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg) } }) diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index f1dcd6cb0ea..d4f97bf63fb 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -24,10 +24,6 @@ const ( // batched in a single publish request. MaxPublishRequestCount = 1000 - // MaxPublishMessageBytes is the maximum allowed serialized size of a single - // Pub/Sub message in bytes. - MaxPublishMessageBytes = 1000000 - // MaxPublishRequestBytes is the maximum allowed serialized size of a single // publish request (containing a batch of messages) in bytes. Must be lower // than the gRPC limit of 4 MiB. diff --git a/pubsublite/ps/example_test.go b/pubsublite/ps/example_test.go new file mode 100644 index 00000000000..957ffdd79f6 --- /dev/null +++ b/pubsublite/ps/example_test.go @@ -0,0 +1,74 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps_test + +import ( + "context" + "fmt" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/ps" +) + +func ExamplePublisherClient_Publish() { + ctx := context.Background() + topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"} + publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} + +func ExamplePublisherClient_Error() { + ctx := context.Background() + topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"} + publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + if err == ps.ErrPublisherStopped { + fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error()) + } + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go new file mode 100644 index 00000000000..ead99913551 --- /dev/null +++ b/pubsublite/ps/publisher.go @@ -0,0 +1,168 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "context" + "sync" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" + "google.golang.org/api/option" + "google.golang.org/api/support/bundler" + + ipubsub "cloud.google.com/go/internal/pubsub" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +var ( + // ErrOverflow is set for a PublishResult when publish buffers overflow. + ErrOverflow = bundler.ErrOverflow + + // ErrOversizedMessage is set for a PublishResult when a published message + // exceeds MaxPublishRequestBytes. + ErrOversizedMessage = bundler.ErrOversizedItem + + // ErrPublisherStopped is set for a PublishResult when a message cannot be + // published because the publisher client has stopped. PublisherClient.Error() + // returns the error that caused the publisher client to terminate (if any). + ErrPublisherStopped = wire.ErrServiceStopped +) + +// translateError transforms a subset of errors to what would be returned by the +// pubsub package. +func translateError(err error) error { + if xerrors.Is(err, wire.ErrOversizedMessage) { + return ErrOversizedMessage + } + if xerrors.Is(err, wire.ErrOverflow) { + return ErrOverflow + } + return err +} + +// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given +// topic. A PublisherClient is safe to use from multiple goroutines. +// +// See https://cloud.google.com/pubsub/lite/docs/publishing for more information +// about publishing. +type PublisherClient struct { + settings PublishSettings + wirePub wire.Publisher + + // Fields below must be guarded with mutex. + mu sync.Mutex + err error +} + +// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish +// messages to a given topic. +// +// See https://cloud.google.com/pubsub/lite/docs/publishing for more information +// about publishing. +func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) { + region, err := pubsublite.ZoneToRegion(topic.Zone) + if err != nil { + return nil, err + } + + // Note: ctx is not used to create the wire publisher, because if it is + // cancelled, the publisher will not be able to perform graceful shutdown + // (e.g. flush pending messages). + wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...) + if err != nil { + return nil, err + } + wirePub.Start() + if err := wirePub.WaitStarted(); err != nil { + return nil, err + } + return &PublisherClient{settings: settings, wirePub: wirePub}, nil +} + +// Publish publishes `msg` to the topic asynchronously. Messages are batched and +// sent according to the client's PublishSettings. Publish never blocks. +// +// Publish returns a non-nil PublishResult which will be ready when the +// message has been sent (or has failed to be sent) to the server. +// +// Once Stop() has been called or the publisher has failed permanently due to an +// error, future calls to Publish will immediately return a PublishResult with +// error ErrPublisherStopped. Error() returns the error that caused the +// publisher to terminate. +func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult { + result := ipubsub.NewPublishResult() + msgpb := new(pb.PubSubMessage) + if err := p.transformMessage(msg, msgpb); err != nil { + ipubsub.SetPublishResult(result, "", err) + p.setError(err) + p.wirePub.Stop() + return result + } + + p.wirePub.Publish(msgpb, func(pm *publish.Metadata, err error) { + err = translateError(err) + if pm != nil { + ipubsub.SetPublishResult(result, pm.String(), err) + } else { + ipubsub.SetPublishResult(result, "", err) + } + }) + return result +} + +// Stop sends all remaining published messages and closes publish streams. +// Returns once all outstanding messages have been sent or have failed to be +// sent. +func (p *PublisherClient) Stop() { + p.wirePub.Stop() + p.wirePub.WaitStopped() +} + +// Error returns the error that caused the publisher client to terminate. It +// may be nil if Stop() was called. +func (p *PublisherClient) Error() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.err != nil { + return p.err + } + return p.wirePub.Error() +} + +func (p *PublisherClient) setError(err error) { + p.mu.Lock() + defer p.mu.Unlock() + + // Don't clobber original error. + if p.err == nil { + p.err = err + } +} + +func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error { + if p.settings.MessageTransformer != nil { + return p.settings.MessageTransformer(from, to) + } + + keyExtractor := p.settings.KeyExtractor + if keyExtractor == nil { + keyExtractor = extractOrderingKey + } + return transformPublishedMessage(from, to, keyExtractor) +} diff --git a/pubsublite/ps/publisher_test.go b/pubsublite/ps/publisher_test.go new file mode 100644 index 00000000000..3629b636b69 --- /dev/null +++ b/pubsublite/ps/publisher_test.go @@ -0,0 +1,238 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package ps + +import ( + "context" + "errors" + "testing" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite/internal/test" + "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" + "golang.org/x/xerrors" + "google.golang.org/api/support/bundler" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// mockWirePublisher is a mock implementation of the wire.Publisher interface. +// It uses test.RPCVerifier to install fake PublishResults for each Publish +// call. +type mockWirePublisher struct { + Verifier *test.RPCVerifier + Stopped bool + err error +} + +func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) { + resp, err := mp.Verifier.Pop(msg) + if err != nil { + mp.err = err + onResult(nil, err) + return + } + result := resp.(*publish.Metadata) + onResult(result, nil) +} + +func (mp *mockWirePublisher) Start() {} +func (mp *mockWirePublisher) Stop() { mp.Stopped = true } +func (mp *mockWirePublisher) WaitStarted() error { return mp.err } +func (mp *mockWirePublisher) WaitStopped() error { return mp.err } +func (mp *mockWirePublisher) Error() error { return mp.err } + +func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient { + return &PublisherClient{ + settings: settings, + wirePub: &mockWirePublisher{Verifier: verifier}, + } +} + +func TestPublisherClientTransformMessage(t *testing.T) { + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + OrderingKey: "ordering_key", + Attributes: map[string]string{"attr": "value"}, + } + fakeResponse := &publish.Metadata{ + Partition: 2, + Offset: 42, + } + wantResultID := "2:42" + + for _, tc := range []struct { + desc string + // mutateSettings is passed a copy of DefaultPublishSettings to mutate. + mutateSettings func(settings *PublishSettings) + wantMsg *pb.PubSubMessage + }{ + { + desc: "default settings", + mutateSettings: func(settings *PublishSettings) {}, + wantMsg: &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("ordering_key"), + Attributes: map[string]*pb.AttributeValues{ + "attr": {Values: [][]byte{[]byte("value")}}, + }, + }, + }, + { + desc: "custom key extractor", + mutateSettings: func(settings *PublishSettings) { + settings.KeyExtractor = func(msg *pubsub.Message) []byte { + return msg.Data + } + }, + wantMsg: &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("data"), + Attributes: map[string]*pb.AttributeValues{ + "attr": {Values: [][]byte{[]byte("value")}}, + }, + }, + }, + { + desc: "custom message transformer", + mutateSettings: func(settings *PublishSettings) { + settings.KeyExtractor = func(msg *pubsub.Message) []byte { + return msg.Data + } + settings.MessageTransformer = func(from *pubsub.Message, to *pb.PubSubMessage) error { + // Swaps data and key. + to.Data = []byte(from.OrderingKey) + to.Key = from.Data + return nil + } + }, + wantMsg: &pb.PubSubMessage{ + Data: []byte("ordering_key"), + Key: []byte("data"), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + settings := DefaultPublishSettings + tc.mutateSettings(&settings) + + verifier := test.NewRPCVerifier(t) + verifier.Push(tc.wantMsg, fakeResponse, nil) + defer verifier.Flush() + + pubClient := newTestPublisherClient(verifier, settings) + result := pubClient.Publish(ctx, input) + + gotID, err := result.Get(ctx) + if err != nil { + t.Errorf("Publish() got err: %v", err) + } + if gotID != wantResultID { + t.Errorf("Publish() got id: %q, want: %q", gotID, wantResultID) + } + }) + } +} + +func TestPublisherClientTransformMessageError(t *testing.T) { + wantErr := errors.New("message could not be converted") + + settings := DefaultPublishSettings + settings.MessageTransformer = func(_ *pubsub.Message, _ *pb.PubSubMessage) error { + return wantErr + } + + // No publish calls expected. + verifier := test.NewRPCVerifier(t) + defer verifier.Flush() + + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + } + pubClient := newTestPublisherClient(verifier, settings) + result := pubClient.Publish(ctx, input) + + _, gotErr := result.Get(ctx) + if !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, wantErr) + } + if !test.ErrorEqual(pubClient.Error(), wantErr) { + t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), wantErr) + } + if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, true; got != want { + t.Errorf("Publisher.Stopped: got %v, want %v", got, want) + } +} + +func TestPublisherClientTranslatePublishResultErrors(t *testing.T) { + ctx := context.Background() + input := &pubsub.Message{ + Data: []byte("data"), + OrderingKey: "ordering_key", + } + wantMsg := &pb.PubSubMessage{ + Data: []byte("data"), + Key: []byte("ordering_key"), + } + + for _, tc := range []struct { + desc string + wireErr error + wantErr error + }{ + { + desc: "oversized message", + wireErr: wire.ErrOversizedMessage, + wantErr: bundler.ErrOversizedItem, + }, + { + desc: "oversized message wrapped", + wireErr: xerrors.Errorf("placeholder error message: %w", wire.ErrOversizedMessage), + wantErr: bundler.ErrOversizedItem, + }, + { + desc: "buffer overflow", + wireErr: wire.ErrOverflow, + wantErr: bundler.ErrOverflow, + }, + { + desc: "service stopped", + wireErr: wire.ErrServiceStopped, + wantErr: wire.ErrServiceStopped, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + verifier := test.NewRPCVerifier(t) + verifier.Push(wantMsg, nil, tc.wireErr) + defer verifier.Flush() + + pubClient := newTestPublisherClient(verifier, DefaultPublishSettings) + result := pubClient.Publish(ctx, input) + + _, gotErr := result.Get(ctx) + if !test.ErrorEqual(gotErr, tc.wantErr) { + t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr) + } + if !test.ErrorEqual(pubClient.Error(), tc.wireErr) { + t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), tc.wireErr) + } + if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, false; got != want { + t.Errorf("Publisher.Stopped: got %v, want %v", got, want) + } + }) + } +}