Skip to content

Commit

Permalink
feat: add support for user defined Table View Functions (#724)
Browse files Browse the repository at this point in the history
* Add auxiliary classes for TVF routines

* Add return_table_type property to Routine

* Add system test for TVF routines

* Use the generated StandardSqlTableType class

* Update docs with new changes

* Add missing space in misc. Sphinx directives
  • Loading branch information
plamut committed Jul 19, 2021
1 parent b8b5433 commit 8c7b839
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/reference.rst
Expand Up @@ -118,6 +118,7 @@ Routine
routine.Routine
routine.RoutineArgument
routine.RoutineReference
routine.RoutineType

Schema
======
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -85,6 +85,7 @@
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineArgument
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.routine import RoutineType
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import PartitionRange
from google.cloud.bigquery.table import RangePartitioning
Expand Down Expand Up @@ -162,6 +163,7 @@
"KeyResultStatementKind",
"OperationType",
"QueryPriority",
"RoutineType",
"SchemaUpdateOption",
"SourceFormat",
"SqlTypeNames",
Expand Down
14 changes: 7 additions & 7 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -1386,12 +1386,12 @@ def to_arrow(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
..versionadded:: 2.21.0
.. versionadded:: 2.21.0
Returns:
pyarrow.Table
Expand All @@ -1403,7 +1403,7 @@ def to_arrow(
ValueError:
If the :mod:`pyarrow` library cannot be imported.
..versionadded:: 1.17.0
.. versionadded:: 1.17.0
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_arrow(
Expand Down Expand Up @@ -1452,7 +1452,7 @@ def to_dataframe(
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.
..versionadded:: 1.11.0
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
Expand All @@ -1461,18 +1461,18 @@ def to_dataframe(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
date_as_object (Optional[bool]):
If ``True`` (default), cast dates to objects. If ``False``, convert
to datetime64[ns] dtype.
..versionadded:: 1.26.0
.. versionadded:: 1.26.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
..versionadded:: 2.21.0
.. versionadded:: 2.21.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/routine/__init__.py
Expand Up @@ -19,11 +19,13 @@
from google.cloud.bigquery.routine.routine import Routine
from google.cloud.bigquery.routine.routine import RoutineArgument
from google.cloud.bigquery.routine.routine import RoutineReference
from google.cloud.bigquery.routine.routine import RoutineType


__all__ = (
"DeterminismLevel",
"Routine",
"RoutineArgument",
"RoutineReference",
"RoutineType",
)
45 changes: 45 additions & 0 deletions google/cloud/bigquery/routine/routine.py
Expand Up @@ -21,6 +21,21 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
import google.cloud.bigquery_v2.types
from google.cloud.bigquery_v2.types import StandardSqlTableType


class RoutineType:
"""The fine-grained type of the routine.
https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#routinetype
.. versionadded:: 2.22.0
"""

ROUTINE_TYPE_UNSPECIFIED = "ROUTINE_TYPE_UNSPECIFIED"
SCALAR_FUNCTION = "SCALAR_FUNCTION"
PROCEDURE = "PROCEDURE"
TABLE_VALUED_FUNCTION = "TABLE_VALUED_FUNCTION"


class Routine(object):
Expand Down Expand Up @@ -48,6 +63,7 @@ class Routine(object):
"modified": "lastModifiedTime",
"reference": "routineReference",
"return_type": "returnType",
"return_table_type": "returnTableType",
"type_": "routineType",
"description": "description",
"determinism_level": "determinismLevel",
Expand Down Expand Up @@ -204,6 +220,35 @@ def return_type(self, value):
resource = None
self._properties[self._PROPERTY_TO_API_FIELD["return_type"]] = resource

@property
def return_table_type(self) -> StandardSqlTableType:
"""The return type of a Table Valued Function (TVF) routine.
.. versionadded:: 2.22.0
"""
resource = self._properties.get(
self._PROPERTY_TO_API_FIELD["return_table_type"]
)
if not resource:
return resource

output = google.cloud.bigquery_v2.types.StandardSqlTableType()
raw_protobuf = json_format.ParseDict(
resource, output._pb, ignore_unknown_fields=True
)
return type(output).wrap(raw_protobuf)

@return_table_type.setter
def return_table_type(self, value):
if not value:
resource = None
else:
resource = {
"columns": [json_format.MessageToDict(col._pb) for col in value.columns]
}

self._properties[self._PROPERTY_TO_API_FIELD["return_table_type"]] = resource

@property
def imported_libraries(self):
"""List[str]: The path of the imported JavaScript libraries.
Expand Down
14 changes: 7 additions & 7 deletions google/cloud/bigquery/table.py
Expand Up @@ -1684,7 +1684,7 @@ def to_arrow(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
Returns:
pyarrow.Table
Expand All @@ -1695,7 +1695,7 @@ def to_arrow(
Raises:
ValueError: If the :mod:`pyarrow` library cannot be imported.
..versionadded:: 1.17.0
.. versionadded:: 1.17.0
"""
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)
Expand Down Expand Up @@ -1775,7 +1775,7 @@ def to_dataframe_iterable(
created by the server. If ``max_queue_size`` is :data:`None`, the queue
size is infinite.
..versionadded:: 2.14.0
.. versionadded:: 2.14.0
Returns:
pandas.DataFrame:
Expand Down Expand Up @@ -1861,7 +1861,7 @@ def to_dataframe(
Use the :func:`tqdm.tqdm_gui` function to display a
progress bar as a graphical dialog box.
..versionadded:: 1.11.0
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
Expand All @@ -1870,13 +1870,13 @@ def to_dataframe(
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
.. versionadded:: 1.24.0
date_as_object (Optional[bool]):
If ``True`` (default), cast dates to objects. If ``False``, convert
to datetime64[ns] dtype.
..versionadded:: 1.26.0
.. versionadded:: 1.26.0
Returns:
pandas.DataFrame:
Expand Down Expand Up @@ -2010,7 +2010,7 @@ def to_dataframe_iterable(
) -> Iterator["pandas.DataFrame"]:
"""Create an iterable of pandas DataFrames, to process the table as a stream.
..versionadded:: 2.21.0
.. versionadded:: 2.21.0
Args:
bqstorage_client:
Expand Down
79 changes: 79 additions & 0 deletions tests/system/test_client.py
Expand Up @@ -2228,6 +2228,85 @@ def test_create_routine(self):
assert len(rows) == 1
assert rows[0].max_value == 100.0

def test_create_tvf_routine(self):
from google.cloud.bigquery import Routine, RoutineArgument, RoutineType

StandardSqlDataType = bigquery_v2.types.StandardSqlDataType
StandardSqlField = bigquery_v2.types.StandardSqlField
StandardSqlTableType = bigquery_v2.types.StandardSqlTableType

INT64 = StandardSqlDataType.TypeKind.INT64
STRING = StandardSqlDataType.TypeKind.STRING

client = Config.CLIENT

dataset = self.temp_dataset(_make_dataset_id("create_tvf_routine"))
routine_ref = dataset.routine("test_tvf_routine")

routine_body = """
SELECT int_col, str_col
FROM (
UNNEST([1, 2, 3]) int_col
JOIN
(SELECT str_col FROM UNNEST(["one", "two", "three"]) str_col)
ON TRUE
)
WHERE int_col > threshold
"""

return_table_type = StandardSqlTableType(
columns=[
StandardSqlField(
name="int_col", type=StandardSqlDataType(type_kind=INT64),
),
StandardSqlField(
name="str_col", type=StandardSqlDataType(type_kind=STRING),
),
]
)

routine_args = [
RoutineArgument(
name="threshold", data_type=StandardSqlDataType(type_kind=INT64),
)
]

routine_def = Routine(
routine_ref,
type_=RoutineType.TABLE_VALUED_FUNCTION,
arguments=routine_args,
return_table_type=return_table_type,
body=routine_body,
)

# Create TVF routine.
client.delete_routine(routine_ref, not_found_ok=True)
routine = client.create_routine(routine_def)

assert routine.body == routine_body
assert routine.return_table_type == return_table_type
assert routine.arguments == routine_args

# Execute the routine to see if it's working as expected.
query_job = client.query(
f"""
SELECT int_col, str_col
FROM `{routine.reference}`(1)
ORDER BY int_col, str_col ASC
"""
)

result_rows = [tuple(row) for row in query_job.result()]
expected = [
(2, "one"),
(2, "three"),
(2, "two"),
(3, "one"),
(3, "three"),
(3, "two"),
]
assert result_rows == expected

def test_create_table_rows_fetch_nested_schema(self):
table_name = "test_table"
dataset = self.temp_dataset(_make_dataset_id("create_table_nested_schema"))
Expand Down

0 comments on commit 8c7b839

Please sign in to comment.