From e4c94f446c27eb474f30b033c1b62d11bd0acd98 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Oct 2021 02:12:20 -0500 Subject: [PATCH] feat: add session and connection properties to QueryJobConfig (#1024) * feat: add session and connection properties to QueryJobConfig * add unit tests * adjust types and add versionadded * add missing url * link to ConnectionProperty docs * add resource classes to root module --- google/cloud/bigquery/__init__.py | 4 ++ google/cloud/bigquery/job/base.py | 31 ++++++++++++ google/cloud/bigquery/job/query.py | 74 ++++++++++++++++++++++++++--- google/cloud/bigquery/query.py | 61 +++++++++++++++++++++++- tests/system/test_query.py | 26 ++++++++++ tests/unit/job/test_base.py | 9 ++++ tests/unit/job/test_query.py | 2 + tests/unit/job/test_query_config.py | 21 ++++++++ 8 files changed, 220 insertions(+), 8 deletions(-) diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index d2b1dd26d..b3c492125 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -52,6 +52,7 @@ from google.cloud.bigquery.external_config import ExternalSourceFormat from google.cloud.bigquery.format_options import AvroOptions from google.cloud.bigquery.format_options import ParquetOptions +from google.cloud.bigquery.job.base import SessionInfo from google.cloud.bigquery.job import Compression from google.cloud.bigquery.job import CopyJob from google.cloud.bigquery.job import CopyJobConfig @@ -77,6 +78,7 @@ from google.cloud.bigquery.model import ModelReference from google.cloud.bigquery.query import ArrayQueryParameter from google.cloud.bigquery.query import ArrayQueryParameterType +from google.cloud.bigquery.query import ConnectionProperty from google.cloud.bigquery.query import ScalarQueryParameter from google.cloud.bigquery.query import ScalarQueryParameterType from google.cloud.bigquery.query import StructQueryParameter @@ -104,6 +106,7 @@ "__version__", "Client", # Queries + "ConnectionProperty", "QueryJob", "QueryJobConfig", "ArrayQueryParameter", @@ -132,6 +135,7 @@ "ExtractJobConfig", "LoadJob", "LoadJobConfig", + "SessionInfo", "UnknownJob", # Models "Model", diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 9e381ded6..88d6bec14 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -202,6 +202,19 @@ def script_statistics(self) -> Optional["ScriptStatistics"]: return None return ScriptStatistics(resource) + @property + def session_info(self) -> Optional["SessionInfo"]: + """[Preview] Information of the session if this job is part of one. + + .. versionadded:: 2.29.0 + """ + resource = _helpers._get_sub_prop( + self._properties, ["statistics", "sessionInfo"] + ) + if resource is None: + return None + return SessionInfo(resource) + @property def num_child_jobs(self): """The number of child jobs executed. @@ -990,6 +1003,24 @@ def evaluation_kind(self) -> Optional[str]: return self._properties.get("evaluationKind") +class SessionInfo: + """[Preview] Information of the session if this job is part of one. + + .. versionadded:: 2.29.0 + + Args: + resource (Map[str, Any]): JSON representation of object. + """ + + def __init__(self, resource): + self._properties = resource + + @property + def session_id(self) -> Optional[str]: + """The ID of the session.""" + return self._properties.get("sessionId") + + class UnknownJob(_AsyncJob): """A job whose type cannot be determined.""" diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index ab24af202..942c85fc3 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -18,7 +18,7 @@ import copy import re import typing -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union from google.api_core import exceptions from google.api_core.future import polling as polling_future @@ -31,11 +31,14 @@ from google.cloud.bigquery.enums import KeyResultStatementKind from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery import _helpers -from google.cloud.bigquery.query import _query_param_from_api_repr -from google.cloud.bigquery.query import ArrayQueryParameter -from google.cloud.bigquery.query import ScalarQueryParameter -from google.cloud.bigquery.query import StructQueryParameter -from google.cloud.bigquery.query import UDFResource +from google.cloud.bigquery.query import ( + _query_param_from_api_repr, + ArrayQueryParameter, + ConnectionProperty, + ScalarQueryParameter, + StructQueryParameter, + UDFResource, +) from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.schema import SchemaField @@ -269,6 +272,24 @@ def allow_large_results(self): def allow_large_results(self, value): self._set_sub_prop("allowLargeResults", value) + @property + def connection_properties(self) -> List[ConnectionProperty]: + """Connection properties. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties + + .. versionadded:: 2.29.0 + """ + resource = self._get_sub_prop("connectionProperties", []) + return [ConnectionProperty.from_api_repr(prop) for prop in resource] + + @connection_properties.setter + def connection_properties(self, value: Iterable[ConnectionProperty]): + self._set_sub_prop( + "connectionProperties", [prop.to_api_repr() for prop in value], + ) + @property def create_disposition(self): """google.cloud.bigquery.job.CreateDisposition: Specifies behavior @@ -283,6 +304,27 @@ def create_disposition(self): def create_disposition(self, value): self._set_sub_prop("createDisposition", value) + @property + def create_session(self) -> Optional[bool]: + """[Preview] If :data:`True`, creates a new session, where + :attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a + random server generated session id. + + If :data:`False`, runs query with an existing ``session_id`` passed in + :attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`, + otherwise runs query in non-session mode. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session + + .. versionadded:: 2.29.0 + """ + return self._get_sub_prop("createSession") + + @create_session.setter + def create_session(self, value: Optional[bool]): + self._set_sub_prop("createSession", value) + @property def default_dataset(self): """google.cloud.bigquery.dataset.DatasetReference: the default dataset @@ -613,7 +655,7 @@ def schema_update_options(self, values): @property def script_options(self) -> ScriptOptions: - """Connection properties which can modify the query behavior. + """Options controlling the execution of scripts. https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions """ @@ -694,6 +736,15 @@ def allow_large_results(self): """ return self._configuration.allow_large_results + @property + def connection_properties(self) -> List[ConnectionProperty]: + """See + :attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`. + + .. versionadded:: 2.29.0 + """ + return self._configuration.connection_properties + @property def create_disposition(self): """See @@ -701,6 +752,15 @@ def create_disposition(self): """ return self._configuration.create_disposition + @property + def create_session(self) -> Optional[bool]: + """See + :attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`. + + .. versionadded:: 2.29.0 + """ + return self._configuration.create_session + @property def default_dataset(self): """See diff --git a/google/cloud/bigquery/query.py b/google/cloud/bigquery/query.py index 1f449f189..708f5f47b 100644 --- a/google/cloud/bigquery/query.py +++ b/google/cloud/bigquery/query.py @@ -18,7 +18,7 @@ import copy import datetime import decimal -from typing import Optional, Union +from typing import Any, Optional, Dict, Union from google.cloud.bigquery.table import _parse_schema_resource from google.cloud.bigquery._helpers import _rows_from_json @@ -31,6 +31,65 @@ ] +class ConnectionProperty: + """A connection-level property to customize query behavior. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/ConnectionProperty + + Args: + key: + The key of the property to set, for example, ``'time_zone'`` or + ``'session_id'``. + value: The value of the property to set. + """ + + def __init__(self, key: str = "", value: str = ""): + self._properties = { + "key": key, + "value": value, + } + + @property + def key(self) -> str: + """Name of the property. + + For example: + + * ``time_zone`` + * ``session_id`` + """ + return self._properties["key"] + + @property + def value(self) -> str: + """Value of the property.""" + return self._properties["value"] + + @classmethod + def from_api_repr(cls, resource) -> "ConnectionProperty": + """Construct :class:`~google.cloud.bigquery.query.ConnectionProperty` + from JSON resource. + + Args: + resource: JSON representation. + + Returns: + A connection property. + """ + value = cls() + value._properties = resource + return value + + def to_api_repr(self) -> Dict[str, Any]: + """Construct JSON API representation for the connection property. + + Returns: + JSON mapping + """ + return self._properties + + class UDFResource(object): """Describe a single user-defined function (UDF) resource. diff --git a/tests/system/test_query.py b/tests/system/test_query.py index 24758595b..649120a7e 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -27,3 +27,29 @@ def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str): assert query_job.dry_run is True assert query_job.total_bytes_processed > 0 assert len(query_job.schema) > 0 + + +def test_session(bigquery_client: bigquery.Client): + initial_config = bigquery.QueryJobConfig() + initial_config.create_session = True + initial_query = """ + CREATE TEMPORARY TABLE numbers(id INT64) + AS + SELECT * FROM UNNEST([1, 2, 3, 4, 5]) AS id; + """ + initial_job = bigquery_client.query(initial_query, job_config=initial_config) + initial_job.result() + session_id = initial_job.session_info.session_id + assert session_id is not None + + second_config = bigquery.QueryJobConfig() + second_config.connection_properties = [ + bigquery.ConnectionProperty("session_id", session_id), + ] + second_job = bigquery_client.query( + "SELECT COUNT(*) FROM numbers;", job_config=second_config + ) + rows = list(second_job.result()) + + assert len(rows) == 1 + assert rows[0][0] == 5 diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index e320c72cb..250be83bb 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -228,6 +228,15 @@ def test_script_statistics(self): self.assertEqual(stack_frame.end_column, 14) self.assertEqual(stack_frame.text, "QUERY TEXT") + def test_session_info(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + + self.assertIsNone(job.session_info) + job._properties["statistics"] = {"sessionInfo": {"sessionId": "abcdefg"}} + self.assertIsNotNone(job.session_info) + self.assertEqual(job.session_info.session_id, "abcdefg") + def test_transaction_info(self): from google.cloud.bigquery.job.base import TransactionInfo diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 17baacf5b..4da035b78 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -281,6 +281,8 @@ def test_from_api_repr_bare(self): job = klass.from_api_repr(RESOURCE, client=client) self.assertIs(job._client, client) self._verifyResourceProperties(job, RESOURCE) + self.assertEqual(len(job.connection_properties), 0) + self.assertIsNone(job.create_session) def test_from_api_repr_with_encryption(self): self._setUpConstants() diff --git a/tests/unit/job/test_query_config.py b/tests/unit/job/test_query_config.py index 109cf7e44..7818236f4 100644 --- a/tests/unit/job/test_query_config.py +++ b/tests/unit/job/test_query_config.py @@ -152,6 +152,27 @@ def test_clustering_fields(self): config.clustering_fields = None self.assertIsNone(config.clustering_fields) + def test_connection_properties(self): + from google.cloud.bigquery.job.query import ConnectionProperty + + config = self._get_target_class()() + self.assertEqual(len(config.connection_properties), 0) + + session_id = ConnectionProperty("session_id", "abcd") + time_zone = ConnectionProperty("time_zone", "America/Chicago") + config.connection_properties = [session_id, time_zone] + self.assertEqual(len(config.connection_properties), 2) + self.assertEqual(config.connection_properties[0].key, "session_id") + self.assertEqual(config.connection_properties[0].value, "abcd") + self.assertEqual(config.connection_properties[1].key, "time_zone") + self.assertEqual(config.connection_properties[1].value, "America/Chicago") + + def test_create_session(self): + config = self._get_target_class()() + self.assertIsNone(config.create_session) + config.create_session = True + self.assertTrue(config.create_session) + def test_from_api_repr_empty(self): klass = self._get_target_class() config = klass.from_api_repr({})