Skip to content

Commit

Permalink
feat(bigquery): add support for user defined TVF (#4043)
Browse files Browse the repository at this point in the history
* feat(bigquery): add support for user defined TVF

Allows user to define table-valued functions using the BigQuery API.
  • Loading branch information
shollyman committed Jun 22, 2021
1 parent 4c12b42 commit 37607b4
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 2 deletions.
38 changes: 38 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -1291,6 +1291,44 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
it, [][]Value{{int64(10)}})
}

func TestIntegration_RoutineUserTVF(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

routineID := routineIDs.New()
routine := dataset.Routine(routineID)
inMeta := &RoutineMetadata{
Type: "TABLE_VALUED_FUNCTION",
Language: "SQL",
Arguments: []*RoutineArgument{
{Name: "filter",
DataType: &StandardSQLDataType{TypeKind: "INT64"},
}},
ReturnTableType: &StandardSQLTableType{
Columns: []*StandardSQLField{
{Name: "x", Type: &StandardSQLDataType{TypeKind: "INT64"}},
},
},
Body: "SELECT x FROM UNNEST([1,2,3]) x WHERE x = filter",
}
if err := routine.Create(ctx, inMeta); err != nil {
t.Fatalf("routine create: %v", err)
}
defer routine.Delete(ctx)

meta, err := routine.Metadata(ctx)
if err != nil {
t.Fatal(err)
}

// Now, compare the input meta to the output meta
if diff := testutil.Diff(inMeta, meta, cmpopts.IgnoreFields(RoutineMetadata{}, "CreationTime", "LastModifiedTime", "ETag")); diff != "" {
t.Errorf("routine metadata differs, got=-, want=+\n%s", diff)
}
}

func TestIntegration_InsertErrors(t *testing.T) {
// This test serves to verify streaming behavior in the face of oversized data.
// BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
Expand Down
28 changes: 26 additions & 2 deletions bigquery/routine.go
Expand Up @@ -144,7 +144,8 @@ const (
// RoutineMetadata represents details of a given BigQuery Routine.
type RoutineMetadata struct {
ETag string
// Type indicates the type of routine, such as SCALAR_FUNCTION or PROCEDURE.
// Type indicates the type of routine, such as SCALAR_FUNCTION, PROCEDURE,
// or TABLE_VALUED_FUNCTION.
Type string
CreationTime time.Time
Description string
Expand All @@ -156,6 +157,9 @@ type RoutineMetadata struct {
// The list of arguments for the the routine.
Arguments []*RoutineArgument
ReturnType *StandardSQLDataType

// Set only if the routine type is TABLE_VALUED_FUNCTION.
ReturnTableType *StandardSQLTableType
// For javascript routines, this indicates the paths for imported libraries.
ImportedLibraries []string
// Body contains the routine's body.
Expand Down Expand Up @@ -184,7 +188,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
return nil, err
}
r.ReturnType = rt

if rm.ReturnTableType != nil {
tt, err := rm.ReturnTableType.toBQ()
if err != nil {
return nil, fmt.Errorf("couldn't convert return table type: %v", err)
}
r.ReturnTableType = tt
}
var args []*bq.Argument
for _, v := range rm.Arguments {
bqa, err := v.toBQ()
Expand Down Expand Up @@ -301,6 +311,7 @@ type RoutineMetadataToUpdate struct {
Body optional.String
ImportedLibraries []string
ReturnType *StandardSQLDataType
ReturnTableType *StandardSQLTableType
}

func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
Expand Down Expand Up @@ -370,6 +381,14 @@ func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
r.ReturnType = dt
forceSend("ReturnType")
}
if rm.ReturnTableType != nil {
tt, err := rm.ReturnTableType.toBQ()
if err != nil {
return nil, err
}
r.ReturnTableType = tt
forceSend("ReturnTableType")
}
return r, nil
}

Expand All @@ -395,5 +414,10 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
return nil, err
}
meta.ReturnType = ret
tt, err := bqToStandardSQLTableType(r.ReturnTableType)
if err != nil {
return nil, err
}
meta.ReturnTableType = tt
return meta, nil
}
10 changes: 10 additions & 0 deletions bigquery/routine_test.go
Expand Up @@ -84,6 +84,11 @@ func TestRoutineTypeConversions(t *testing.T) {
RoutineType: "type",
Language: "lang",
ReturnType: &bq.StandardSqlDataType{TypeKind: "INT64"},
ReturnTableType: &bq.StandardSqlTableType{
Columns: []*bq.StandardSqlField{
{Name: "field", Type: &bq.StandardSqlDataType{TypeKind: "FLOAT64"}},
},
},
},
&RoutineMetadata{
CreationTime: aTime,
Expand All @@ -95,6 +100,11 @@ func TestRoutineTypeConversions(t *testing.T) {
Type: "type",
Language: "lang",
ReturnType: &StandardSQLDataType{TypeKind: "INT64"},
ReturnTableType: &StandardSQLTableType{
Columns: []*StandardSQLField{
{Name: "field", Type: &StandardSQLDataType{TypeKind: "FLOAT64"}},
},
},
}},
{"body_and_libs", "FromRoutineMetadataToUpdate",
&RoutineMetadataToUpdate{
Expand Down
37 changes: 37 additions & 0 deletions bigquery/standardsql.go
Expand Up @@ -175,3 +175,40 @@ func standardSQLStructFieldsToBQ(fields []*StandardSQLField) ([]*bq.StandardSqlF
}
return bqFields, nil
}

// StandardSQLTableType models a table-like resource, which has a set of columns.
type StandardSQLTableType struct {

// The columns of the table.
Columns []*StandardSQLField
}

func (sstt *StandardSQLTableType) toBQ() (*bq.StandardSqlTableType, error) {
if sstt == nil {
return nil, nil
}
out := &bq.StandardSqlTableType{}
for k, v := range sstt.Columns {
bq, err := v.toBQ()
if err != nil {
return nil, fmt.Errorf("error converting column %d: %v", k, err)
}
out.Columns = append(out.Columns, bq)
}
return out, nil
}

func bqToStandardSQLTableType(in *bq.StandardSqlTableType) (*StandardSQLTableType, error) {
if in == nil {
return nil, nil
}
out := &StandardSQLTableType{}
for k, v := range in.Columns {
f, err := bqToStandardSQLField(v)
if err != nil {
return nil, fmt.Errorf("error converting column %d: %v", k, err)
}
out.Columns = append(out.Columns, f)
}
return out, nil
}

0 comments on commit 37607b4

Please sign in to comment.