Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add support for user defined TVF #4043

Merged
merged 10 commits into from Jun 22, 2021
38 changes: 38 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -1171,6 +1171,44 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
it, [][]Value{{int64(10)}})
}

func TestIntegration_RoutineUserTVF(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

routineID := routineIDs.New()
routine := dataset.Routine(routineID)
inMeta := &RoutineMetadata{
Type: "TABLE_VALUED_FUNCTION",
Language: "SQL",
Arguments: []*RoutineArgument{
{Name: "filter",
DataType: &StandardSQLDataType{TypeKind: "INT64"},
}},
ReturnTableType: &StandardSQLTableType{
Columns: []*StandardSQLField{
{Name: "x", Type: &StandardSQLDataType{TypeKind: "INT64"}},
},
},
Body: "SELECT x FROM UNNEST([1,2,3]) x WHERE x = filter",
}
if err := routine.Create(ctx, inMeta); err != nil {
t.Fatalf("routine create: %v", err)
}
defer routine.Delete(ctx)

meta, err := routine.Metadata(ctx)
if err != nil {
t.Fatal(err)
}

// Now, compare the input meta to the output meta
if diff := testutil.Diff(inMeta, meta, cmpopts.IgnoreFields(RoutineMetadata{}, "CreationTime", "LastModifiedTime", "ETag")); diff != "" {
t.Errorf("got=-, want=+\n%s", diff)
}
}

func TestIntegration_InsertErrors(t *testing.T) {
// This test serves to verify streaming behavior in the face of oversized data.
// BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
Expand Down
28 changes: 26 additions & 2 deletions bigquery/routine.go
Expand Up @@ -144,7 +144,8 @@ const (
// RoutineMetadata represents details of a given BigQuery Routine.
type RoutineMetadata struct {
ETag string
// Type indicates the type of routine, such as SCALAR_FUNCTION or PROCEDURE.
// Type indicates the type of routine, such as SCALAR_FUNCTION, PROCEDURE,
// or TABLE_VALUED_FUNCTION.
Type string
CreationTime time.Time
Description string
Expand All @@ -156,6 +157,9 @@ type RoutineMetadata struct {
// The list of arguments for the the routine.
Arguments []*RoutineArgument
ReturnType *StandardSQLDataType

// Set only if the routine type is TABLE_VALUED_FUNCTION.
ReturnTableType *StandardSQLTableType
// For javascript routines, this indicates the paths for imported libraries.
ImportedLibraries []string
// Body contains the routine's body.
Expand Down Expand Up @@ -184,7 +188,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
return nil, err
}
r.ReturnType = rt

if rm.ReturnTableType != nil {
tt, err := rm.ReturnTableType.toBQ()
if err != nil {
return nil, fmt.Errorf("couldn't convert return table type: %v", err)
}
r.ReturnTableType = tt
}
var args []*bq.Argument
for _, v := range rm.Arguments {
bqa, err := v.toBQ()
Expand Down Expand Up @@ -301,6 +311,7 @@ type RoutineMetadataToUpdate struct {
Body optional.String
ImportedLibraries []string
ReturnType *StandardSQLDataType
ReturnTableType *StandardSQLTableType
}

func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
Expand Down Expand Up @@ -370,6 +381,14 @@ func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
r.ReturnType = dt
forceSend("ReturnType")
}
if rm.ReturnTableType != nil {
tt, err := rm.ReturnTableType.toBQ()
if err != nil {
return nil, err
}
r.ReturnTableType = tt
forceSend("ReturnTableType")
}
return r, nil
}

Expand All @@ -395,5 +414,10 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
return nil, err
}
meta.ReturnType = ret
tt, err := bqToStandardSQLTableType(r.ReturnTableType)
if err != nil {
return nil, err
}
meta.ReturnTableType = tt
return meta, nil
}
10 changes: 10 additions & 0 deletions bigquery/routine_test.go
Expand Up @@ -84,6 +84,11 @@ func TestRoutineTypeConversions(t *testing.T) {
RoutineType: "type",
Language: "lang",
ReturnType: &bq.StandardSqlDataType{TypeKind: "INT64"},
ReturnTableType: &bq.StandardSqlTableType{
Columns: []*bq.StandardSqlField{
{Name: "field", Type: &bq.StandardSqlDataType{TypeKind: "FLOAT64"}},
},
},
},
&RoutineMetadata{
CreationTime: aTime,
Expand All @@ -95,6 +100,11 @@ func TestRoutineTypeConversions(t *testing.T) {
Type: "type",
Language: "lang",
ReturnType: &StandardSQLDataType{TypeKind: "INT64"},
ReturnTableType: &StandardSQLTableType{
Columns: []*StandardSQLField{
{Name: "field", Type: &StandardSQLDataType{TypeKind: "FLOAT64"}},
},
},
}},
{"body_and_libs", "FromRoutineMetadataToUpdate",
&RoutineMetadataToUpdate{
Expand Down
37 changes: 37 additions & 0 deletions bigquery/standardsql.go
Expand Up @@ -175,3 +175,40 @@ func standardSQLStructFieldsToBQ(fields []*StandardSQLField) ([]*bq.StandardSqlF
}
return bqFields, nil
}

// StandardSQLTableType models a table-like resource, which has a set of columns.
type StandardSQLTableType struct {

// The columns of the table.
Columns []*StandardSQLField
}

func (sstt *StandardSQLTableType) toBQ() (*bq.StandardSqlTableType, error) {
if sstt == nil {
return nil, nil
}
out := &bq.StandardSqlTableType{}
for k, v := range sstt.Columns {
bq, err := v.toBQ()
if err != nil {
return nil, fmt.Errorf("error converting column %d: %v", k, err)
}
out.Columns = append(out.Columns, bq)
}
return out, nil
}

func bqToStandardSQLTableType(in *bq.StandardSqlTableType) (*StandardSQLTableType, error) {
if in == nil {
return nil, nil
}
out := &StandardSQLTableType{}
for k, v := range in.Columns {
f, err := bqToStandardSQLField(v)
if err != nil {
return nil, fmt.Errorf("error converting column %d: %v", k, err)
}
out.Columns = append(out.Columns, f)
}
return out, nil
}