Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for user defined Table View Functions #724

Merged
merged 7 commits into from Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -81,6 +81,8 @@
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineArgument
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.routine import RoutineType
from google.cloud.bigquery.routine import StandardSQLTableType
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import PartitionRange
from google.cloud.bigquery.table import RangePartitioning
Expand Down Expand Up @@ -130,6 +132,7 @@
"Routine",
"RoutineArgument",
"RoutineReference",
"StandardSQLTableType",
# Shared helpers
"SchemaField",
"UDFResource",
Expand All @@ -152,6 +155,7 @@
"Encoding",
"KeyResultStatementKind",
"QueryPriority",
"RoutineType",
"SchemaUpdateOption",
"SourceFormat",
"SqlTypeNames",
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/bigquery/routine/__init__.py
Expand Up @@ -19,11 +19,15 @@
from google.cloud.bigquery.routine.routine import Routine
from google.cloud.bigquery.routine.routine import RoutineArgument
from google.cloud.bigquery.routine.routine import RoutineReference
from google.cloud.bigquery.routine.routine import RoutineType
from google.cloud.bigquery.routine.routine import StandardSQLTableType


__all__ = (
"DeterminismLevel",
"Routine",
"RoutineArgument",
"RoutineReference",
"RoutineType",
"StandardSQLTableType",
)
69 changes: 69 additions & 0 deletions google/cloud/bigquery/routine/routine.py
Expand Up @@ -16,11 +16,50 @@

"""Define resources for the BigQuery Routines API."""

from typing import Any, Iterable, List

from google.protobuf import json_format

import google.cloud._helpers
from google.cloud.bigquery import _helpers
import google.cloud.bigquery_v2.types
from google.cloud.bigquery_v2.types import StandardSqlField


# Unlike related proto-plus classes from `bigquery_v2.types.standard_sql`, this class
plamut marked this conversation as resolved.
Show resolved Hide resolved
# is not auto-generated (tech debt), thus we need to define it manually.
class StandardSQLTableType:
"""Representation of a return type from a Table Valued Functions (TFV)."""

def __init__(self, columns: Iterable[StandardSqlField]):
self.columns = columns

@property
def columns(self) -> List[StandardSqlField]:
return list(self._columns) # shallow copy

@columns.setter
def columns(self, value: Iterable[StandardSqlField]):
self._columns = list(value)

def __eq__(self, other: Any):
if not isinstance(other, StandardSQLTableType):
return NotImplemented
return self.columns == other.columns

__hash__ = None # Python does this implicitly for us, but let's be explicit.


class RoutineType:
"""The fine-grained type of the routine.
plamut marked this conversation as resolved.
Show resolved Hide resolved

https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#routinetype
"""

ROUTINE_TYPE_UNSPECIFIED = "ROUTINE_TYPE_UNSPECIFIED"
SCALAR_FUNCTION = "SCALAR_FUNCTION"
PROCEDURE = "PROCEDURE"
TABLE_VALUED_FUNCTION = "TABLE_VALUED_FUNCTION"


class Routine(object):
Expand Down Expand Up @@ -48,6 +87,7 @@ class Routine(object):
"modified": "lastModifiedTime",
"reference": "routineReference",
"return_type": "returnType",
"return_table_type": "returnTableType",
"type_": "routineType",
"description": "description",
"determinism_level": "determinismLevel",
Expand Down Expand Up @@ -204,6 +244,35 @@ def return_type(self, value):
resource = None
self._properties[self._PROPERTY_TO_API_FIELD["return_type"]] = resource

@property
def return_table_type(self) -> StandardSQLTableType:
resource = self._properties.get(
self._PROPERTY_TO_API_FIELD["return_table_type"]
)
if not resource:
return resource

table_columns = (
StandardSqlField.wrap(
json_format.ParseDict(
column_json, StandardSqlField()._pb, ignore_unknown_fields=True
)
)
for column_json in resource["columns"]
)
return StandardSQLTableType(columns=table_columns)

@return_table_type.setter
def return_table_type(self, value):
if not value:
resource = None
else:
resource = {
"columns": [json_format.MessageToDict(col._pb) for col in value.columns]
}

self._properties[self._PROPERTY_TO_API_FIELD["return_table_type"]] = resource

@property
def imported_libraries(self):
"""List[str]: The path of the imported JavaScript libraries.
Expand Down
79 changes: 79 additions & 0 deletions tests/system/test_client.py
Expand Up @@ -2118,6 +2118,85 @@ def test_create_routine(self):
assert len(rows) == 1
assert rows[0].max_value == 100.0

def test_create_tvf_routine(self):
from google.cloud.bigquery import Routine, RoutineArgument, RoutineType
from google.cloud.bigquery import StandardSQLTableType

StandardSqlDataType = bigquery_v2.types.StandardSqlDataType
StandardSqlField = bigquery_v2.types.StandardSqlField

INT64 = StandardSqlDataType.TypeKind.INT64
STRING = StandardSqlDataType.TypeKind.STRING

client = Config.CLIENT

dataset = self.temp_dataset(_make_dataset_id("create_tvf_routine"))
routine_ref = dataset.routine("test_tvf_routine")

routine_body = """
SELECT int_col, str_col
FROM (
UNNEST([1, 2, 3]) int_col
JOIN
(SELECT str_col FROM UNNEST(["one", "two", "three"]) str_col)
ON TRUE
)
WHERE int_col > threshold
"""

return_table_type = StandardSQLTableType(
columns=[
StandardSqlField(
name="int_col", type=StandardSqlDataType(type_kind=INT64),
),
StandardSqlField(
name="str_col", type=StandardSqlDataType(type_kind=STRING),
),
]
)

routine_args = [
RoutineArgument(
name="threshold", data_type=StandardSqlDataType(type_kind=INT64),
)
]

routine_def = Routine(
routine_ref,
type_=RoutineType.TABLE_VALUED_FUNCTION,
arguments=routine_args,
return_table_type=return_table_type,
body=routine_body,
)

# Create TVF routine.
client.delete_routine(routine_ref, not_found_ok=True)
routine = client.create_routine(routine_def)

assert routine.body == routine_body
assert routine.return_table_type == return_table_type
assert routine.arguments == routine_args

# Execute the routine to see if it's working as expected.
query_job = client.query(
f"""
SELECT int_col, str_col
FROM `{routine.reference}`(1)
ORDER BY int_col, str_col ASC
"""
)

result_rows = [tuple(row) for row in query_job.result()]
expected = [
(2, "one"),
(2, "three"),
(2, "two"),
(3, "one"),
(3, "three"),
(3, "two"),
]
assert result_rows == expected

def test_create_table_rows_fetch_nested_schema(self):
table_name = "test_table"
dataset = self.temp_dataset(_make_dataset_id("create_table_nested_schema"))
Expand Down
128 changes: 128 additions & 0 deletions tests/unit/routine/test_routine.py
Expand Up @@ -156,12 +156,86 @@ def test_from_api_repr(target_class):
assert actual_routine.return_type == bigquery_v2.types.StandardSqlDataType(
type_kind=bigquery_v2.types.StandardSqlDataType.TypeKind.INT64
)
assert actual_routine.return_table_type is None
assert actual_routine.type_ == "SCALAR_FUNCTION"
assert actual_routine._properties["someNewField"] == "someValue"
assert actual_routine.description == "A routine description."
assert actual_routine.determinism_level == "DETERMINISTIC"


def test_from_api_repr_tvf_function(target_class):
from google.cloud.bigquery.routine import RoutineArgument
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.routine import RoutineType
from google.cloud.bigquery.routine import StandardSQLTableType

StandardSqlDataType = bigquery_v2.types.StandardSqlDataType
StandardSqlField = bigquery_v2.types.StandardSqlField

creation_time = datetime.datetime(
2010, 5, 19, 16, 0, 0, tzinfo=google.cloud._helpers.UTC
)
modified_time = datetime.datetime(
2011, 10, 1, 16, 0, 0, tzinfo=google.cloud._helpers.UTC
)
resource = {
"routineReference": {
"projectId": "my-project",
"datasetId": "my_dataset",
"routineId": "my_routine",
},
"etag": "abcdefg",
"creationTime": str(google.cloud._helpers._millis(creation_time)),
"lastModifiedTime": str(google.cloud._helpers._millis(modified_time)),
"definitionBody": "SELECT x FROM UNNEST([1,2,3]) x WHERE x > a",
"arguments": [{"name": "a", "dataType": {"typeKind": "INT64"}}],
"language": "SQL",
"returnTableType": {
"columns": [{"name": "int_col", "type": {"typeKind": "INT64"}}]
},
"routineType": "TABLE_VALUED_FUNCTION",
"someNewField": "someValue",
"description": "A routine description.",
"determinismLevel": bigquery.DeterminismLevel.DETERMINISTIC,
}
actual_routine = target_class.from_api_repr(resource)

assert actual_routine.project == "my-project"
assert actual_routine.dataset_id == "my_dataset"
assert actual_routine.routine_id == "my_routine"
assert (
actual_routine.path
== "/projects/my-project/datasets/my_dataset/routines/my_routine"
)
assert actual_routine.reference == RoutineReference.from_string(
"my-project.my_dataset.my_routine"
)
assert actual_routine.etag == "abcdefg"
assert actual_routine.created == creation_time
assert actual_routine.modified == modified_time
assert actual_routine.arguments == [
RoutineArgument(
name="a",
data_type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.INT64),
)
]
assert actual_routine.body == "SELECT x FROM UNNEST([1,2,3]) x WHERE x > a"
assert actual_routine.language == "SQL"
assert actual_routine.return_type is None
assert actual_routine.return_table_type == StandardSQLTableType(
columns=[
StandardSqlField(
name="int_col",
type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.INT64),
)
]
)
assert actual_routine.type_ == RoutineType.TABLE_VALUED_FUNCTION
assert actual_routine._properties["someNewField"] == "someValue"
assert actual_routine.description == "A routine description."
assert actual_routine.determinism_level == "DETERMINISTIC"


def test_from_api_repr_w_minimal_resource(target_class):
from google.cloud.bigquery.routine import RoutineReference

Expand Down Expand Up @@ -261,6 +335,24 @@ def test_from_api_repr_w_unknown_fields(target_class):
["return_type"],
{"returnType": {"typeKind": "INT64"}},
),
(
{
"definitionBody": "SELECT x FROM UNNEST([1,2,3]) x WHERE x > 1",
"language": "SQL",
"returnTableType": {
"columns": [{"name": "int_col", "type": {"typeKind": "INT64"}}]
},
"routineType": "TABLE_VALUED_FUNCTION",
"description": "A routine description.",
"determinismLevel": bigquery.DeterminismLevel.DETERMINISM_LEVEL_UNSPECIFIED,
},
["return_table_type"],
{
"returnTableType": {
"columns": [{"name": "int_col", "type": {"typeKind": "INT64"}}]
}
},
),
(
{
"arguments": [{"name": "x", "dataType": {"typeKind": "INT64"}}],
Expand Down Expand Up @@ -361,6 +453,42 @@ def test_set_return_type_w_none(object_under_test):
assert object_under_test._properties["returnType"] is None


def test_set_return_table_type_w_none(object_under_test):
object_under_test.return_table_type = None
assert object_under_test.return_table_type is None
assert object_under_test._properties["returnTableType"] is None


def test_set_return_table_type_w_not_none(object_under_test):
from google.cloud.bigquery.routine import StandardSQLTableType

StandardSqlDataType = bigquery_v2.types.StandardSqlDataType
StandardSqlField = bigquery_v2.types.StandardSqlField

table_type = StandardSQLTableType(
columns=[
StandardSqlField(
name="int_col",
type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.INT64),
),
StandardSqlField(
name="str_col",
type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.STRING),
),
]
)

object_under_test.return_table_type = table_type

assert object_under_test.return_table_type == table_type
assert object_under_test._properties["returnTableType"] == {
"columns": [
{"name": "int_col", "type": {"typeKind": "INT64"}},
{"name": "str_col", "type": {"typeKind": "STRING"}},
]
}


def test_set_description_w_none(object_under_test):
object_under_test.description = None
assert object_under_test.description is None
Expand Down