From 8c7b839a6ac1491c1c3b6b0e8755f4b70ed72ee3 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 19 Jul 2021 22:39:44 +0200 Subject: [PATCH] feat: add support for user defined Table View Functions (#724) * Add auxiliary classes for TVF routines * Add return_table_type property to Routine * Add system test for TVF routines * Use the generated StandardSqlTableType class * Update docs with new changes * Add missing space in misc. Sphinx directives --- docs/reference.rst | 1 + google/cloud/bigquery/__init__.py | 2 + google/cloud/bigquery/job/query.py | 14 +-- google/cloud/bigquery/routine/__init__.py | 2 + google/cloud/bigquery/routine/routine.py | 45 ++++++++ google/cloud/bigquery/table.py | 14 +-- tests/system/test_client.py | 79 ++++++++++++++ tests/unit/routine/test_routine.py | 127 ++++++++++++++++++++++ 8 files changed, 270 insertions(+), 14 deletions(-) diff --git a/docs/reference.rst b/docs/reference.rst index 8c38d0c44..8a5bff9a4 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -118,6 +118,7 @@ Routine routine.Routine routine.RoutineArgument routine.RoutineReference + routine.RoutineType Schema ====== diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index ced8cefae..222aadcc9 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -85,6 +85,7 @@ from google.cloud.bigquery.routine import Routine from google.cloud.bigquery.routine import RoutineArgument from google.cloud.bigquery.routine import RoutineReference +from google.cloud.bigquery.routine import RoutineType from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import PartitionRange from google.cloud.bigquery.table import RangePartitioning @@ -162,6 +163,7 @@ "KeyResultStatementKind", "OperationType", "QueryPriority", + "RoutineType", "SchemaUpdateOption", "SourceFormat", "SqlTypeNames", diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index d588e9b5a..2cb7ee28e 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1386,12 +1386,12 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.24.0 + .. versionadded:: 1.24.0 max_results (Optional[int]): Maximum number of rows to include in the result. No limit by default. - ..versionadded:: 2.21.0 + .. versionadded:: 2.21.0 Returns: pyarrow.Table @@ -1403,7 +1403,7 @@ def to_arrow( ValueError: If the :mod:`pyarrow` library cannot be imported. - ..versionadded:: 1.17.0 + .. versionadded:: 1.17.0 """ query_result = wait_for_query(self, progress_bar_type, max_results=max_results) return query_result.to_arrow( @@ -1452,7 +1452,7 @@ def to_dataframe( :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe` for details. - ..versionadded:: 1.11.0 + .. versionadded:: 1.11.0 create_bqstorage_client (Optional[bool]): If ``True`` (default), create a BigQuery Storage API client using the default API settings. The BigQuery Storage API @@ -1461,18 +1461,18 @@ def to_dataframe( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.24.0 + .. versionadded:: 1.24.0 date_as_object (Optional[bool]): If ``True`` (default), cast dates to objects. If ``False``, convert to datetime64[ns] dtype. - ..versionadded:: 1.26.0 + .. versionadded:: 1.26.0 max_results (Optional[int]): Maximum number of rows to include in the result. No limit by default. - ..versionadded:: 2.21.0 + .. versionadded:: 2.21.0 Returns: A :class:`~pandas.DataFrame` populated with row data and column diff --git a/google/cloud/bigquery/routine/__init__.py b/google/cloud/bigquery/routine/__init__.py index d1c79b05e..7353073c8 100644 --- a/google/cloud/bigquery/routine/__init__.py +++ b/google/cloud/bigquery/routine/__init__.py @@ -19,6 +19,7 @@ from google.cloud.bigquery.routine.routine import Routine from google.cloud.bigquery.routine.routine import RoutineArgument from google.cloud.bigquery.routine.routine import RoutineReference +from google.cloud.bigquery.routine.routine import RoutineType __all__ = ( @@ -26,4 +27,5 @@ "Routine", "RoutineArgument", "RoutineReference", + "RoutineType", ) diff --git a/google/cloud/bigquery/routine/routine.py b/google/cloud/bigquery/routine/routine.py index bbc0a7693..a776212c3 100644 --- a/google/cloud/bigquery/routine/routine.py +++ b/google/cloud/bigquery/routine/routine.py @@ -21,6 +21,21 @@ import google.cloud._helpers from google.cloud.bigquery import _helpers import google.cloud.bigquery_v2.types +from google.cloud.bigquery_v2.types import StandardSqlTableType + + +class RoutineType: + """The fine-grained type of the routine. + + https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#routinetype + + .. versionadded:: 2.22.0 + """ + + ROUTINE_TYPE_UNSPECIFIED = "ROUTINE_TYPE_UNSPECIFIED" + SCALAR_FUNCTION = "SCALAR_FUNCTION" + PROCEDURE = "PROCEDURE" + TABLE_VALUED_FUNCTION = "TABLE_VALUED_FUNCTION" class Routine(object): @@ -48,6 +63,7 @@ class Routine(object): "modified": "lastModifiedTime", "reference": "routineReference", "return_type": "returnType", + "return_table_type": "returnTableType", "type_": "routineType", "description": "description", "determinism_level": "determinismLevel", @@ -204,6 +220,35 @@ def return_type(self, value): resource = None self._properties[self._PROPERTY_TO_API_FIELD["return_type"]] = resource + @property + def return_table_type(self) -> StandardSqlTableType: + """The return type of a Table Valued Function (TVF) routine. + + .. versionadded:: 2.22.0 + """ + resource = self._properties.get( + self._PROPERTY_TO_API_FIELD["return_table_type"] + ) + if not resource: + return resource + + output = google.cloud.bigquery_v2.types.StandardSqlTableType() + raw_protobuf = json_format.ParseDict( + resource, output._pb, ignore_unknown_fields=True + ) + return type(output).wrap(raw_protobuf) + + @return_table_type.setter + def return_table_type(self, value): + if not value: + resource = None + else: + resource = { + "columns": [json_format.MessageToDict(col._pb) for col in value.columns] + } + + self._properties[self._PROPERTY_TO_API_FIELD["return_table_type"]] = resource + @property def imported_libraries(self): """List[str]: The path of the imported JavaScript libraries. diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 2d9c15f50..18d969a3f 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1684,7 +1684,7 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.24.0 + .. versionadded:: 1.24.0 Returns: pyarrow.Table @@ -1695,7 +1695,7 @@ def to_arrow( Raises: ValueError: If the :mod:`pyarrow` library cannot be imported. - ..versionadded:: 1.17.0 + .. versionadded:: 1.17.0 """ if pyarrow is None: raise ValueError(_NO_PYARROW_ERROR) @@ -1775,7 +1775,7 @@ def to_dataframe_iterable( created by the server. If ``max_queue_size`` is :data:`None`, the queue size is infinite. - ..versionadded:: 2.14.0 + .. versionadded:: 2.14.0 Returns: pandas.DataFrame: @@ -1861,7 +1861,7 @@ def to_dataframe( Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. - ..versionadded:: 1.11.0 + .. versionadded:: 1.11.0 create_bqstorage_client (Optional[bool]): If ``True`` (default), create a BigQuery Storage API client using the default API settings. The BigQuery Storage API @@ -1870,13 +1870,13 @@ def to_dataframe( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.24.0 + .. versionadded:: 1.24.0 date_as_object (Optional[bool]): If ``True`` (default), cast dates to objects. If ``False``, convert to datetime64[ns] dtype. - ..versionadded:: 1.26.0 + .. versionadded:: 1.26.0 Returns: pandas.DataFrame: @@ -2010,7 +2010,7 @@ def to_dataframe_iterable( ) -> Iterator["pandas.DataFrame"]: """Create an iterable of pandas DataFrames, to process the table as a stream. - ..versionadded:: 2.21.0 + .. versionadded:: 2.21.0 Args: bqstorage_client: diff --git a/tests/system/test_client.py b/tests/system/test_client.py index cbca73619..ceb62b8cd 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -2228,6 +2228,85 @@ def test_create_routine(self): assert len(rows) == 1 assert rows[0].max_value == 100.0 + def test_create_tvf_routine(self): + from google.cloud.bigquery import Routine, RoutineArgument, RoutineType + + StandardSqlDataType = bigquery_v2.types.StandardSqlDataType + StandardSqlField = bigquery_v2.types.StandardSqlField + StandardSqlTableType = bigquery_v2.types.StandardSqlTableType + + INT64 = StandardSqlDataType.TypeKind.INT64 + STRING = StandardSqlDataType.TypeKind.STRING + + client = Config.CLIENT + + dataset = self.temp_dataset(_make_dataset_id("create_tvf_routine")) + routine_ref = dataset.routine("test_tvf_routine") + + routine_body = """ + SELECT int_col, str_col + FROM ( + UNNEST([1, 2, 3]) int_col + JOIN + (SELECT str_col FROM UNNEST(["one", "two", "three"]) str_col) + ON TRUE + ) + WHERE int_col > threshold + """ + + return_table_type = StandardSqlTableType( + columns=[ + StandardSqlField( + name="int_col", type=StandardSqlDataType(type_kind=INT64), + ), + StandardSqlField( + name="str_col", type=StandardSqlDataType(type_kind=STRING), + ), + ] + ) + + routine_args = [ + RoutineArgument( + name="threshold", data_type=StandardSqlDataType(type_kind=INT64), + ) + ] + + routine_def = Routine( + routine_ref, + type_=RoutineType.TABLE_VALUED_FUNCTION, + arguments=routine_args, + return_table_type=return_table_type, + body=routine_body, + ) + + # Create TVF routine. + client.delete_routine(routine_ref, not_found_ok=True) + routine = client.create_routine(routine_def) + + assert routine.body == routine_body + assert routine.return_table_type == return_table_type + assert routine.arguments == routine_args + + # Execute the routine to see if it's working as expected. + query_job = client.query( + f""" + SELECT int_col, str_col + FROM `{routine.reference}`(1) + ORDER BY int_col, str_col ASC + """ + ) + + result_rows = [tuple(row) for row in query_job.result()] + expected = [ + (2, "one"), + (2, "three"), + (2, "two"), + (3, "one"), + (3, "three"), + (3, "two"), + ] + assert result_rows == expected + def test_create_table_rows_fetch_nested_schema(self): table_name = "test_table" dataset = self.temp_dataset(_make_dataset_id("create_table_nested_schema")) diff --git a/tests/unit/routine/test_routine.py b/tests/unit/routine/test_routine.py index 0a59e7c5f..fdaf13324 100644 --- a/tests/unit/routine/test_routine.py +++ b/tests/unit/routine/test_routine.py @@ -156,12 +156,86 @@ def test_from_api_repr(target_class): assert actual_routine.return_type == bigquery_v2.types.StandardSqlDataType( type_kind=bigquery_v2.types.StandardSqlDataType.TypeKind.INT64 ) + assert actual_routine.return_table_type is None assert actual_routine.type_ == "SCALAR_FUNCTION" assert actual_routine._properties["someNewField"] == "someValue" assert actual_routine.description == "A routine description." assert actual_routine.determinism_level == "DETERMINISTIC" +def test_from_api_repr_tvf_function(target_class): + from google.cloud.bigquery.routine import RoutineArgument + from google.cloud.bigquery.routine import RoutineReference + from google.cloud.bigquery.routine import RoutineType + + StandardSqlDataType = bigquery_v2.types.StandardSqlDataType + StandardSqlField = bigquery_v2.types.StandardSqlField + StandardSqlTableType = bigquery_v2.types.StandardSqlTableType + + creation_time = datetime.datetime( + 2010, 5, 19, 16, 0, 0, tzinfo=google.cloud._helpers.UTC + ) + modified_time = datetime.datetime( + 2011, 10, 1, 16, 0, 0, tzinfo=google.cloud._helpers.UTC + ) + resource = { + "routineReference": { + "projectId": "my-project", + "datasetId": "my_dataset", + "routineId": "my_routine", + }, + "etag": "abcdefg", + "creationTime": str(google.cloud._helpers._millis(creation_time)), + "lastModifiedTime": str(google.cloud._helpers._millis(modified_time)), + "definitionBody": "SELECT x FROM UNNEST([1,2,3]) x WHERE x > a", + "arguments": [{"name": "a", "dataType": {"typeKind": "INT64"}}], + "language": "SQL", + "returnTableType": { + "columns": [{"name": "int_col", "type": {"typeKind": "INT64"}}] + }, + "routineType": "TABLE_VALUED_FUNCTION", + "someNewField": "someValue", + "description": "A routine description.", + "determinismLevel": bigquery.DeterminismLevel.DETERMINISTIC, + } + actual_routine = target_class.from_api_repr(resource) + + assert actual_routine.project == "my-project" + assert actual_routine.dataset_id == "my_dataset" + assert actual_routine.routine_id == "my_routine" + assert ( + actual_routine.path + == "/projects/my-project/datasets/my_dataset/routines/my_routine" + ) + assert actual_routine.reference == RoutineReference.from_string( + "my-project.my_dataset.my_routine" + ) + assert actual_routine.etag == "abcdefg" + assert actual_routine.created == creation_time + assert actual_routine.modified == modified_time + assert actual_routine.arguments == [ + RoutineArgument( + name="a", + data_type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.INT64), + ) + ] + assert actual_routine.body == "SELECT x FROM UNNEST([1,2,3]) x WHERE x > a" + assert actual_routine.language == "SQL" + assert actual_routine.return_type is None + assert actual_routine.return_table_type == StandardSqlTableType( + columns=[ + StandardSqlField( + name="int_col", + type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.INT64), + ) + ] + ) + assert actual_routine.type_ == RoutineType.TABLE_VALUED_FUNCTION + assert actual_routine._properties["someNewField"] == "someValue" + assert actual_routine.description == "A routine description." + assert actual_routine.determinism_level == "DETERMINISTIC" + + def test_from_api_repr_w_minimal_resource(target_class): from google.cloud.bigquery.routine import RoutineReference @@ -261,6 +335,24 @@ def test_from_api_repr_w_unknown_fields(target_class): ["return_type"], {"returnType": {"typeKind": "INT64"}}, ), + ( + { + "definitionBody": "SELECT x FROM UNNEST([1,2,3]) x WHERE x > 1", + "language": "SQL", + "returnTableType": { + "columns": [{"name": "int_col", "type": {"typeKind": "INT64"}}] + }, + "routineType": "TABLE_VALUED_FUNCTION", + "description": "A routine description.", + "determinismLevel": bigquery.DeterminismLevel.DETERMINISM_LEVEL_UNSPECIFIED, + }, + ["return_table_type"], + { + "returnTableType": { + "columns": [{"name": "int_col", "type": {"typeKind": "INT64"}}] + } + }, + ), ( { "arguments": [{"name": "x", "dataType": {"typeKind": "INT64"}}], @@ -361,6 +453,41 @@ def test_set_return_type_w_none(object_under_test): assert object_under_test._properties["returnType"] is None +def test_set_return_table_type_w_none(object_under_test): + object_under_test.return_table_type = None + assert object_under_test.return_table_type is None + assert object_under_test._properties["returnTableType"] is None + + +def test_set_return_table_type_w_not_none(object_under_test): + StandardSqlDataType = bigquery_v2.types.StandardSqlDataType + StandardSqlField = bigquery_v2.types.StandardSqlField + StandardSqlTableType = bigquery_v2.types.StandardSqlTableType + + table_type = StandardSqlTableType( + columns=[ + StandardSqlField( + name="int_col", + type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.INT64), + ), + StandardSqlField( + name="str_col", + type=StandardSqlDataType(type_kind=StandardSqlDataType.TypeKind.STRING), + ), + ] + ) + + object_under_test.return_table_type = table_type + + assert object_under_test.return_table_type == table_type + assert object_under_test._properties["returnTableType"] == { + "columns": [ + {"name": "int_col", "type": {"typeKind": "INT64"}}, + {"name": "str_col", "type": {"typeKind": "STRING"}}, + ] + } + + def test_set_description_w_none(object_under_test): object_under_test.description = None assert object_under_test.description is None