From f2b20f493e2ed5a883ce42fa65695c03c574feb5 Mon Sep 17 00:00:00 2001 From: shollyman Date: Thu, 1 Jul 2021 10:14:23 -0700 Subject: [PATCH] feat(bigquery managedwriter): schema conversion support (#4357) This is the first of multiple PRs to build up the functionality of a new thick client over the new BigQuery Storage API's write mechanism. This PR exposes schema conversion between the main bigquery package and the bigquery storage API. Towards: https://github.com/googleapis/google-cloud-go/issues/4366 --- bigquery/storage/managedwriter/adapt/doc.go | 19 ++ .../managedwriter/adapt/schemaconversion.go | 140 ++++++++++++ .../adapt/schemaconversion_test.go | 203 ++++++++++++++++++ bigquery/storage/managedwriter/doc.go | 22 ++ 4 files changed, 384 insertions(+) create mode 100644 bigquery/storage/managedwriter/adapt/doc.go create mode 100644 bigquery/storage/managedwriter/adapt/schemaconversion.go create mode 100644 bigquery/storage/managedwriter/adapt/schemaconversion_test.go create mode 100644 bigquery/storage/managedwriter/doc.go diff --git a/bigquery/storage/managedwriter/adapt/doc.go b/bigquery/storage/managedwriter/adapt/doc.go new file mode 100644 index 00000000000..c06d3039200 --- /dev/null +++ b/bigquery/storage/managedwriter/adapt/doc.go @@ -0,0 +1,19 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package adapt adds functionality related to converting bigquery representations +// like schema and data type representations. +// +// It is EXPERIMENTAL and subject to change or removal without notice. +package adapt diff --git a/bigquery/storage/managedwriter/adapt/schemaconversion.go b/bigquery/storage/managedwriter/adapt/schemaconversion.go new file mode 100644 index 00000000000..8de22575db6 --- /dev/null +++ b/bigquery/storage/managedwriter/adapt/schemaconversion.go @@ -0,0 +1,140 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapt + +import ( + "fmt" + + "cloud.google.com/go/bigquery" + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" +) + +var fieldTypeMap = map[bigquery.FieldType]storagepb.TableFieldSchema_Type{ + bigquery.StringFieldType: storagepb.TableFieldSchema_STRING, + bigquery.BytesFieldType: storagepb.TableFieldSchema_BYTES, + bigquery.IntegerFieldType: storagepb.TableFieldSchema_INT64, + bigquery.FloatFieldType: storagepb.TableFieldSchema_DOUBLE, + bigquery.BooleanFieldType: storagepb.TableFieldSchema_BOOL, + bigquery.TimestampFieldType: storagepb.TableFieldSchema_TIMESTAMP, + bigquery.RecordFieldType: storagepb.TableFieldSchema_STRUCT, + bigquery.DateFieldType: storagepb.TableFieldSchema_DATE, + bigquery.TimeFieldType: storagepb.TableFieldSchema_TIME, + bigquery.DateTimeFieldType: storagepb.TableFieldSchema_DATETIME, + bigquery.NumericFieldType: storagepb.TableFieldSchema_NUMERIC, + bigquery.BigNumericFieldType: storagepb.TableFieldSchema_BIGNUMERIC, + bigquery.GeographyFieldType: storagepb.TableFieldSchema_GEOGRAPHY, +} + +func bqFieldToProto(in *bigquery.FieldSchema) (*storagepb.TableFieldSchema, error) { + if in == nil { + return nil, nil + } + out := &storagepb.TableFieldSchema{ + Name: in.Name, + Description: in.Description, + } + + // Type conversion. + typ, ok := fieldTypeMap[in.Type] + if !ok { + return nil, fmt.Errorf("could not convert field (%s) due to unknown type value: %s", in.Name, in.Type) + } + out.Type = typ + + // Mode conversion. Repeated trumps required. + out.Mode = storagepb.TableFieldSchema_NULLABLE + if in.Repeated { + out.Mode = storagepb.TableFieldSchema_REPEATED + } + if !in.Repeated && in.Required { + out.Mode = storagepb.TableFieldSchema_REQUIRED + } + + for _, s := range in.Schema { + subField, err := bqFieldToProto(s) + if err != nil { + return nil, err + } + out.Fields = append(out.Fields, subField) + } + return out, nil +} + +func protoToBQField(in *storagepb.TableFieldSchema) (*bigquery.FieldSchema, error) { + if in == nil { + return nil, nil + } + out := &bigquery.FieldSchema{ + Name: in.GetName(), + Description: in.GetDescription(), + Repeated: in.GetMode() == storagepb.TableFieldSchema_REPEATED, + Required: in.GetMode() == storagepb.TableFieldSchema_REQUIRED, + } + + typeResolved := false + for k, v := range fieldTypeMap { + if v == in.GetType() { + out.Type = k + typeResolved = true + break + } + } + if !typeResolved { + return nil, fmt.Errorf("could not convert proto type to bigquery type: %v", in.GetType().String()) + } + + for _, s := range in.Fields { + subField, err := protoToBQField(s) + if err != nil { + return nil, err + } + out.Schema = append(out.Schema, subField) + } + return out, nil +} + +// BQSchemaToStorageTableSchema converts a bigquery Schema into the protobuf-based TableSchema used +// by the BigQuery Storage WriteClient. +func BQSchemaToStorageTableSchema(in bigquery.Schema) (*storagepb.TableSchema, error) { + if in == nil { + return nil, nil + } + out := &storagepb.TableSchema{} + for _, s := range in { + converted, err := bqFieldToProto(s) + if err != nil { + return nil, err + } + out.Fields = append(out.Fields, converted) + } + return out, nil +} + +// StorageTableSchemaToBQSchema converts a TableSchema from the BigQuery Storage WriteClient +// into the equivalent BigQuery Schema. +func StorageTableSchemaToBQSchema(in *storagepb.TableSchema) (bigquery.Schema, error) { + if in == nil { + return nil, nil + } + var out bigquery.Schema + for _, s := range in.Fields { + converted, err := protoToBQField(s) + if err != nil { + return nil, err + } + out = append(out, converted) + } + return out, nil +} diff --git a/bigquery/storage/managedwriter/adapt/schemaconversion_test.go b/bigquery/storage/managedwriter/adapt/schemaconversion_test.go new file mode 100644 index 00000000000..aab85fa9072 --- /dev/null +++ b/bigquery/storage/managedwriter/adapt/schemaconversion_test.go @@ -0,0 +1,203 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapt + +import ( + "testing" + + "cloud.google.com/go/bigquery" + "cloud.google.com/go/internal/testutil" + "github.com/google/go-cmp/cmp" + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestFieldConversions(t *testing.T) { + testCases := []struct { + desc string + bq *bigquery.FieldSchema + proto *storagepb.TableFieldSchema + }{ + { + desc: "nil", + bq: nil, + proto: nil, + }, + { + desc: "string field", + bq: &bigquery.FieldSchema{ + Name: "name", + Type: bigquery.StringFieldType, + Description: "description", + }, + proto: &storagepb.TableFieldSchema{ + Name: "name", + Type: storagepb.TableFieldSchema_STRING, + Description: "description", + Mode: storagepb.TableFieldSchema_NULLABLE, + }, + }, + { + desc: "required integer field", + bq: &bigquery.FieldSchema{ + Name: "name", + Type: bigquery.IntegerFieldType, + Description: "description", + Required: true, + }, + proto: &storagepb.TableFieldSchema{ + Name: "name", + Type: storagepb.TableFieldSchema_INT64, + Description: "description", + Mode: storagepb.TableFieldSchema_REQUIRED, + }, + }, + { + desc: "struct with repeated bytes subfield", + bq: &bigquery.FieldSchema{ + Name: "name", + Type: bigquery.RecordFieldType, + Description: "description", + Required: true, + Schema: bigquery.Schema{ + &bigquery.FieldSchema{ + Name: "inner1", + Repeated: true, + Description: "repeat", + Type: bigquery.BytesFieldType, + }, + }, + }, + proto: &storagepb.TableFieldSchema{ + Name: "name", + Type: storagepb.TableFieldSchema_STRUCT, + Description: "description", + Mode: storagepb.TableFieldSchema_REQUIRED, + Fields: []*storagepb.TableFieldSchema{ + { + Name: "inner1", + Mode: storagepb.TableFieldSchema_REPEATED, + Description: "repeat", + Type: storagepb.TableFieldSchema_BYTES, + }, + }, + }, + }, + } + + for _, tc := range testCases { + // first, bq to proto + converted, err := bqFieldToProto(tc.bq) + if err != nil { + t.Errorf("case (%s) failed conversion from bq: %v", tc.desc, err) + } + if diff := cmp.Diff(converted, tc.proto, protocmp.Transform()); diff != "" { + t.Errorf("conversion to proto diff (%s):\n%v", tc.desc, diff) + } + // reverse conversion, proto to bq + reverse, err := protoToBQField(tc.proto) + if err != nil { + t.Errorf("case (%s) failed conversion from proto: %v", tc.desc, err) + } + if diff := cmp.Diff(reverse, tc.bq); diff != "" { + t.Errorf("conversion to BQ diff (%s):\n%v", tc.desc, diff) + } + } +} + +func TestSchemaConversion(t *testing.T) { + + testCases := []struct { + description string + bqSchema bigquery.Schema + storageSchema *storagepb.TableSchema + }{ + { + description: "nil", + bqSchema: nil, + storageSchema: nil, + }, + { + description: "scalars", + bqSchema: bigquery.Schema{ + {Name: "f1", Type: bigquery.StringFieldType}, + {Name: "f2", Type: bigquery.IntegerFieldType}, + {Name: "f3", Type: bigquery.BooleanFieldType}, + }, + storageSchema: &storagepb.TableSchema{ + Fields: []*storagepb.TableFieldSchema{ + {Name: "f1", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE}, + {Name: "f2", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE}, + {Name: "f3", Type: storagepb.TableFieldSchema_BOOL, Mode: storagepb.TableFieldSchema_NULLABLE}, + }, + }, + }, + { + description: "array", + bqSchema: bigquery.Schema{ + {Name: "arr", Type: bigquery.NumericFieldType, Repeated: true}, + {Name: "big", Type: bigquery.BigNumericFieldType, Required: true}, + }, + storageSchema: &storagepb.TableSchema{ + Fields: []*storagepb.TableFieldSchema{ + {Name: "arr", Type: storagepb.TableFieldSchema_NUMERIC, Mode: storagepb.TableFieldSchema_REPEATED}, + {Name: "big", Type: storagepb.TableFieldSchema_BIGNUMERIC, Mode: storagepb.TableFieldSchema_REQUIRED}, + }, + }, + }, + { + description: "nested", + bqSchema: bigquery.Schema{ + {Name: "struct1", Type: bigquery.RecordFieldType, Schema: []*bigquery.FieldSchema{ + {Name: "leaf1", Type: bigquery.DateFieldType}, + {Name: "leaf2", Type: bigquery.DateTimeFieldType}, + }}, + {Name: "field2", Type: bigquery.StringFieldType}, + }, + storageSchema: &storagepb.TableSchema{ + Fields: []*storagepb.TableFieldSchema{ + {Name: "struct1", + Type: storagepb.TableFieldSchema_STRUCT, + Mode: storagepb.TableFieldSchema_NULLABLE, + Fields: []*storagepb.TableFieldSchema{ + {Name: "leaf1", Type: storagepb.TableFieldSchema_DATE, Mode: storagepb.TableFieldSchema_NULLABLE}, + {Name: "leaf2", Type: storagepb.TableFieldSchema_DATETIME, Mode: storagepb.TableFieldSchema_NULLABLE}, + }}, + {Name: "field2", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE}, + }, + }, + }, + } + for _, tc := range testCases { + + // BQ -> Storage + storageS, err := BQSchemaToStorageTableSchema(tc.bqSchema) + if err != nil { + t.Errorf("BQSchemaToStorageTableSchema(%s): %v", tc.description, err) + } + if diff := testutil.Diff(storageS, tc.storageSchema); diff != "" { + t.Fatalf("BQSchemaToStorageTableSchema(%s): -got, +want:\n%s", tc.description, diff) + } + + // Storage -> BQ + bqS, err := StorageTableSchemaToBQSchema(tc.storageSchema) + if err != nil { + t.Errorf("StorageTableSchemaToBQSchema(%s): %v", tc.description, err) + } + if diff := testutil.Diff(bqS, tc.bqSchema); diff != "" { + t.Fatalf("StorageTableSchemaToBQSchema(%s): -got, +want:\n%s", tc.description, diff) + } + } +} diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go new file mode 100644 index 00000000000..a8e580bd90c --- /dev/null +++ b/bigquery/storage/managedwriter/doc.go @@ -0,0 +1,22 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient. +// +// It is EXPERIMENTAL and subject to change or removal without notice. +// +// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is +// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and +// the Inserter abstraction is the equivalent to this in the cloud.google.com/go/bigquery package. +package managedwriter