Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add session and connection properties to QueryJobConfig #1024

Merged
merged 8 commits into from Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -77,6 +77,7 @@
from google.cloud.bigquery.model import ModelReference
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ArrayQueryParameterType
from google.cloud.bigquery.query import ConnectionProperty
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameterType
from google.cloud.bigquery.query import StructQueryParameter
Expand Down Expand Up @@ -104,6 +105,7 @@
"__version__",
"Client",
# Queries
"ConnectionProperty",
"QueryJob",
"QueryJobConfig",
"ArrayQueryParameter",
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/bigquery/job/base.py
Expand Up @@ -202,6 +202,16 @@ def script_statistics(self) -> Optional["ScriptStatistics"]:
return None
return ScriptStatistics(resource)

@property
def session_info(self) -> Optional["SessionInfo"]:
"""[Preview] Information of the session if this job is part of one."""
plamut marked this conversation as resolved.
Show resolved Hide resolved
resource = _helpers._get_sub_prop(
self._properties, ["statistics", "sessionInfo"]
)
if resource is None:
return None
return SessionInfo(resource)

@property
def num_child_jobs(self):
"""The number of child jobs executed.
Expand Down Expand Up @@ -990,6 +1000,22 @@ def evaluation_kind(self) -> Optional[str]:
return self._properties.get("evaluationKind")


class SessionInfo(object):
Copy link
Contributor

@plamut plamut Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Explicitly inheriting from object is not needed.

Also, shouldn't we expose this class in the top level namespace? And both new classes in the reference docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to docs via job_base.rst and query.rst. I noticed in #1023 that we'd missed a few classes, so I migrated some docs back to automodule. Even though I don't think it's the best UX to have so many classes in one page, I'd rather we not forget in the future.

Screen Shot 2021-10-26 at 9 16 56 AM

Screen Shot 2021-10-26 at 9 20 28 AM

"""[Preview] Information of the session if this job is part of one.

Args:
resource (Map[str, Any]): JSON representation of object.
"""

def __init__(self, resource):
self._properties = resource

@property
def session_id(self) -> Optional[str]:
"""The ID of the session."""
return self._properties.get("sessionId")


class UnknownJob(_AsyncJob):
"""A job whose type cannot be determined."""

Expand Down
57 changes: 51 additions & 6 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -18,7 +18,7 @@
import copy
import re
import typing
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Sequence, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
Expand All @@ -31,11 +31,14 @@
from google.cloud.bigquery.enums import KeyResultStatementKind
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.query import _query_param_from_api_repr
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import StructQueryParameter
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.query import (
_query_param_from_api_repr,
ArrayQueryParameter,
ConnectionProperty,
ScalarQueryParameter,
StructQueryParameter,
UDFResource,
)
plamut marked this conversation as resolved.
Show resolved Hide resolved
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
Expand Down Expand Up @@ -269,6 +272,18 @@ def allow_large_results(self):
def allow_large_results(self, value):
self._set_sub_prop("allowLargeResults", value)

@property
def connection_properties(self) -> Sequence[ConnectionProperty]:
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""Connection properties."""
resource = self._get_sub_prop("connectionProperties", [])
return [ConnectionProperty.from_api_repr(prop) for prop in resource]

@connection_properties.setter
def connection_properties(self, value: Sequence[ConnectionProperty]):
plamut marked this conversation as resolved.
Show resolved Hide resolved
self._set_sub_prop(
"connectionProperties", [prop.to_api_repr() for prop in value],
)

@property
def create_disposition(self):
"""google.cloud.bigquery.job.CreateDisposition: Specifies behavior
Expand All @@ -283,6 +298,22 @@ def create_disposition(self):
def create_disposition(self, value):
self._set_sub_prop("createDisposition", value)

@property
def create_session(self) -> Optional[bool]:
"""[Preview] If :data:`True`, creates a new session, where
:attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
random server generated session id.

If :data:`False`, runs query with an existing ``session_id`` passed in
:attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
otherwise runs query in non-session mode.
"""
return self._get_sub_prop("createSession")

@create_session.setter
def create_session(self, value: Optional[bool]):
self._set_sub_prop("createSession", value)

@property
def default_dataset(self):
"""google.cloud.bigquery.dataset.DatasetReference: the default dataset
Expand Down Expand Up @@ -694,13 +725,27 @@ def allow_large_results(self):
"""
return self._configuration.allow_large_results

@property
def connection_properties(self) -> Sequence[ConnectionProperty]:
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
"""
return self._configuration.connection_properties

@property
def create_disposition(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
"""
return self._configuration.create_disposition

@property
def create_session(self) -> Optional[bool]:
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
"""
return self._configuration.create_session

@property
def default_dataset(self):
"""See
Expand Down
51 changes: 50 additions & 1 deletion google/cloud/bigquery/query.py
Expand Up @@ -18,7 +18,7 @@
import copy
import datetime
import decimal
from typing import Optional, Union
from typing import Any, Optional, Dict, Union

from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import _rows_from_json
Expand All @@ -31,6 +31,55 @@
]


class ConnectionProperty:
"""A connection-level property to customize query behavior."""

def __init__(self, key: str = "", value: str = ""):
self._properties = {
"key": key,
"value": value,
}

@property
def key(self) -> str:
"""Name of the property.

For example:

* ``time_zone``
* ``session_id``
"""
return self._properties["key"]

@property
def value(self) -> str:
"""Value of the property."""
return self._properties["value"]

@classmethod
def from_api_repr(cls, resource) -> "ConnectionProperty":
"""Construct :class:`~google.cloud.bigquery.query.ConnectionProperty`
from JSON resource.

Args:
resource: JSON representation.

Returns:
A connection property.
"""
value = cls()
value._properties = resource
return value

def to_api_repr(self) -> Dict[str, Any]:
"""Construct JSON API representation for the connection property.

Returns:
JSON mapping
"""
return self._properties


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.

Expand Down
26 changes: 26 additions & 0 deletions tests/system/test_query.py
Expand Up @@ -27,3 +27,29 @@ def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str):
assert query_job.dry_run is True
assert query_job.total_bytes_processed > 0
assert len(query_job.schema) > 0


def test_session(bigquery_client: bigquery.Client):
initial_config = bigquery.QueryJobConfig()
initial_config.create_session = True
initial_query = """
CREATE TEMPORARY TABLE numbers(id INT64)
AS
SELECT * FROM UNNEST([1, 2, 3, 4, 5]) AS id;
"""
initial_job = bigquery_client.query(initial_query, job_config=initial_config)
initial_job.result()
session_id = initial_job.session_info.session_id
assert session_id is not None

second_config = bigquery.QueryJobConfig()
second_config.connection_properties = [
bigquery.ConnectionProperty("session_id", session_id),
]
second_job = bigquery_client.query(
"SELECT COUNT(*) FROM numbers;", job_config=second_config
)
rows = list(second_job.result())

assert len(rows) == 1
assert rows[0][0] == 5
9 changes: 9 additions & 0 deletions tests/unit/job/test_base.py
Expand Up @@ -228,6 +228,15 @@ def test_script_statistics(self):
self.assertEqual(stack_frame.end_column, 14)
self.assertEqual(stack_frame.text, "QUERY TEXT")

def test_session_info(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)

self.assertIsNone(job.session_info)
job._properties["statistics"] = {"sessionInfo": {"sessionId": "abcdefg"}}
self.assertIsNotNone(job.session_info)
self.assertEqual(job.session_info.session_id, "abcdefg")

def test_transaction_info(self):
from google.cloud.bigquery.job.base import TransactionInfo

Expand Down
2 changes: 2 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -281,6 +281,8 @@ def test_from_api_repr_bare(self):
job = klass.from_api_repr(RESOURCE, client=client)
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)
self.assertEqual(len(job.connection_properties), 0)
self.assertIsNone(job.create_session)

def test_from_api_repr_with_encryption(self):
self._setUpConstants()
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/job/test_query_config.py
Expand Up @@ -152,6 +152,27 @@ def test_clustering_fields(self):
config.clustering_fields = None
self.assertIsNone(config.clustering_fields)

def test_connection_properties(self):
from google.cloud.bigquery.job.query import ConnectionProperty

config = self._get_target_class()()
self.assertEqual(len(config.connection_properties), 0)

session_id = ConnectionProperty("session_id", "abcd")
time_zone = ConnectionProperty("time_zone", "America/Chicago")
config.connection_properties = [session_id, time_zone]
self.assertEqual(len(config.connection_properties), 2)
self.assertEqual(config.connection_properties[0].key, "session_id")
self.assertEqual(config.connection_properties[0].value, "abcd")
self.assertEqual(config.connection_properties[1].key, "time_zone")
self.assertEqual(config.connection_properties[1].value, "America/Chicago")

def test_create_session(self):
config = self._get_target_class()()
self.assertIsNone(config.create_session)
config.create_session = True
self.assertTrue(config.create_session)

def test_from_api_repr_empty(self):
klass = self._get_target_class()
config = klass.from_api_repr({})
Expand Down