Skip to content

Commit

Permalink
feat: implement query options versioning support (#30)
Browse files Browse the repository at this point in the history
* feat: implement query options versioning support

* refactor _merge_query_options to use MergeFrom protobuf function

* address comments

* fix assignment

Co-authored-by: larkee <larkee@users.noreply.github.com>
  • Loading branch information
larkee and larkee committed Mar 12, 2020
1 parent 23916c5 commit 5147921
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 41 deletions.
39 changes: 39 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud._helpers import _date_from_iso8601_date
from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud.spanner_v1.proto import type_pb2
from google.cloud.spanner_v1.proto.spanner_pb2 import ExecuteSqlRequest


def _try_to_coerce_bytes(bytestring):
Expand All @@ -47,6 +48,44 @@ def _try_to_coerce_bytes(bytestring):
)


def _merge_query_options(base, merge):
"""Merge higher precedence QueryOptions with current QueryOptions.
:type base:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict` or None
:param base: The current QueryOptions that is intended for use.
:type merge:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict` or None
:param merge:
The QueryOptions that have a higher priority than base. These options
should overwrite the fields in base.
:rtype:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or None
:returns:
QueryOptions object formed by merging the two given QueryOptions.
If the resultant object only has empty fields, returns None.
"""
combined = base or ExecuteSqlRequest.QueryOptions()
if type(combined) == dict:
combined = ExecuteSqlRequest.QueryOptions(
optimizer_version=combined.get("optimizer_version", "")
)
merge = merge or ExecuteSqlRequest.QueryOptions()
if type(merge) == dict:
merge = ExecuteSqlRequest.QueryOptions(
optimizer_version=merge.get("optimizer_version", "")
)
combined.MergeFrom(merge)
if not combined.optimizer_version:
return None
return combined


# pylint: disable=too-many-return-statements,too-many-branches
def _make_value_pb(value):
"""Helper for :func:`_make_list_value_pbs`.
Expand Down
25 changes: 24 additions & 1 deletion google/cloud/spanner_v1/client.py
Expand Up @@ -50,9 +50,10 @@

from google.cloud.client import ClientWithProject
from google.cloud.spanner_v1 import __version__
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import _merge_query_options, _metadata_with_prefix
from google.cloud.spanner_v1.instance import DEFAULT_NODE_COUNT
from google.cloud.spanner_v1.instance import Instance
from google.cloud.spanner_v1.proto.spanner_pb2 import ExecuteSqlRequest

_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
Expand All @@ -62,6 +63,7 @@
"without a scheme: ex %s=localhost:8080."
) % ((EMULATOR_ENV_VAR,) * 3)
SPANNER_ADMIN_SCOPE = "https://www.googleapis.com/auth/spanner.admin"
OPTIMIZER_VERSION_ENV_VAR = "SPANNER_OPTIMIZER_VERSION"
_USER_AGENT_DEPRECATED = (
"The 'user_agent' argument to 'Client' is deprecated / unused. "
"Please pass an appropriate 'client_info' instead."
Expand All @@ -72,6 +74,10 @@ def _get_spanner_emulator_host():
return os.getenv(EMULATOR_ENV_VAR)


def _get_spanner_optimizer_version():
return os.getenv(OPTIMIZER_VERSION_ENV_VAR, "")


class InstanceConfig(object):
"""Named configurations for Spanner instances.
Expand Down Expand Up @@ -132,11 +138,20 @@ class Client(ClientWithProject):
:param user_agent:
(Deprecated) The user agent to be used with API request.
Not used.
:type client_options: :class:`~google.api_core.client_options.ClientOptions`
or :class:`dict`
:param client_options: (Optional) Client options used to set user options
on the client. API Endpoint should be set through client_options.
:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -157,6 +172,7 @@ def __init__(
client_info=_CLIENT_INFO,
user_agent=None,
client_options=None,
query_options=None,
):
# NOTE: This API has no use for the _http argument, but sending it
# will have no impact since the _http() @property only lazily
Expand All @@ -172,6 +188,13 @@ def __init__(
else:
self._client_options = client_options

env_query_options = ExecuteSqlRequest.QueryOptions(
optimizer_version=_get_spanner_optimizer_version()
)

# Environment flag config has higher precedence than application config.
self._query_options = _merge_query_options(query_options, env_query_options)

if user_agent is not None:
warnings.warn(_USER_AGENT_DEPRECATED, DeprecationWarning, stacklevel=2)
self.user_agent = user_agent
Expand Down
39 changes: 36 additions & 3 deletions google/cloud/spanner_v1/database.py
Expand Up @@ -30,8 +30,11 @@
import six

# pylint: disable=ungrouped-imports
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import (
_make_value_pb,
_merge_query_options,
_metadata_with_prefix,
)
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient
from google.cloud.spanner_v1.gapic.transports import spanner_grpc_transport
Expand Down Expand Up @@ -350,7 +353,9 @@ def drop(self):
metadata = _metadata_with_prefix(self.name)
api.drop_database(self.name, metadata=metadata)

def execute_partitioned_dml(self, dml, params=None, param_types=None):
def execute_partitioned_dml(
self, dml, params=None, param_types=None, query_options=None
):
"""Execute a partitionable DML statement.
:type dml: str
Expand All @@ -365,9 +370,20 @@ def execute_partitioned_dml(self, dml, params=None, param_types=None):
(Optional) maps explicit types for one or more param values;
required if parameters are passed.
:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
:rtype: int
:returns: Count of rows affected by the DML statement.
"""
query_options = _merge_query_options(
self._instance._client._query_options, query_options
)
if params is not None:
if param_types is None:
raise ValueError("Specify 'param_types' when passing 'params'.")
Expand Down Expand Up @@ -398,6 +414,7 @@ def execute_partitioned_dml(self, dml, params=None, param_types=None):
transaction=txn_selector,
params=params_pb,
param_types=param_types,
query_options=query_options,
metadata=metadata,
)

Expand Down Expand Up @@ -748,6 +765,7 @@ def generate_query_batches(
param_types=None,
partition_size_bytes=None,
max_partitions=None,
query_options=None,
):
"""Start a partitioned query operation.
Expand Down Expand Up @@ -783,6 +801,14 @@ def generate_query_batches(
service uses this as a hint, the actual number of partitions may
differ.
:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
:rtype: iterable of dict
:returns:
mappings of information used peform actual partitioned reads via
Expand All @@ -801,6 +827,13 @@ def generate_query_batches(
query_info["params"] = params
query_info["param_types"] = param_types

# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = self._database._instance._client._query_options
query_info["query_options"] = _merge_query_options(
default_query_options, query_options
)

for partition in partitions:
yield {"partition": partition, "query": query_info}

Expand Down
14 changes: 13 additions & 1 deletion google/cloud/spanner_v1/session.py
Expand Up @@ -202,6 +202,7 @@ def execute_sql(
params=None,
param_types=None,
query_mode=None,
query_options=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
):
Expand All @@ -225,11 +226,22 @@ def execute_sql(
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().execute_sql(
sql, params, param_types, query_mode, retry=retry, timeout=timeout
sql,
params,
param_types,
query_mode,
query_options=query_options,
retry=retry,
timeout=timeout,
)

def batch(self):
Expand Down
16 changes: 16 additions & 0 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -23,6 +23,7 @@
from google.api_core.exceptions import ServiceUnavailable
import google.api_core.gapic_v1.method
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
Expand Down Expand Up @@ -157,6 +158,7 @@ def execute_sql(
params=None,
param_types=None,
query_mode=None,
query_options=None,
partition=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
Expand All @@ -180,6 +182,14 @@ def execute_sql(
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
:type partition: bytes
:param partition: (Optional) one of the partition tokens returned
from :meth:`partition_query`.
Expand Down Expand Up @@ -211,6 +221,11 @@ def execute_sql(
transaction = self._make_txn_selector()
api = database.spanner_api

# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = database._instance._client._query_options
query_options = _merge_query_options(default_query_options, query_options)

restart = functools.partial(
api.execute_streaming_sql,
self._session.name,
Expand All @@ -221,6 +236,7 @@ def execute_sql(
query_mode=query_mode,
partition_token=partition,
seqno=self._execute_sql_count,
query_options=query_options,
metadata=metadata,
retry=retry,
timeout=timeout,
Expand Down
22 changes: 19 additions & 3 deletions google/cloud/spanner_v1/transaction.py
Expand Up @@ -17,8 +17,11 @@
from google.protobuf.struct_pb2 import Struct

from google.cloud._helpers import _pb_timestamp_to_datetime
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import (
_make_value_pb,
_merge_query_options,
_metadata_with_prefix,
)
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
from google.cloud.spanner_v1.snapshot import _SnapshotBase
Expand Down Expand Up @@ -162,7 +165,9 @@ def _make_params_pb(params, param_types):

return None

def execute_update(self, dml, params=None, param_types=None, query_mode=None):
def execute_update(
self, dml, params=None, param_types=None, query_mode=None, query_options=None
):
"""Perform an ``ExecuteSql`` API request with DML.
:type dml: str
Expand All @@ -182,6 +187,11 @@ def execute_update(self, dml, params=None, param_types=None, query_mode=None):
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.
:rtype: int
:returns: Count of rows affected by the DML statement.
"""
Expand All @@ -191,13 +201,19 @@ def execute_update(self, dml, params=None, param_types=None, query_mode=None):
transaction = self._make_txn_selector()
api = database.spanner_api

# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = database._instance._client._query_options
query_options = _merge_query_options(default_query_options, query_options)

response = api.execute_sql(
self._session.name,
dml,
transaction=transaction,
params=params_pb,
param_types=param_types,
query_mode=query_mode,
query_options=query_options,
seqno=self._execute_sql_count,
metadata=metadata,
)
Expand Down

0 comments on commit 5147921

Please sign in to comment.