Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement query options versioning support #30

Merged
merged 4 commits into from Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud._helpers import _date_from_iso8601_date
from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud.spanner_v1.proto import type_pb2
from google.cloud.spanner_v1.proto.spanner_pb2 import ExecuteSqlRequest


def _try_to_coerce_bytes(bytestring):
Expand All @@ -47,6 +48,44 @@ def _try_to_coerce_bytes(bytestring):
)


def _merge_query_options(base, merge):
"""Merge higher precedence QueryOptions with current QueryOptions.

:type base:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict` or None
:param base: The current QueryOptions that is intended for use.

:type merge:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict` or None
:param merge:
The QueryOptions that have a higher priority than base. These options
should overwrite the fields in base.

:rtype:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or None
:returns:
QueryOptions object formed by merging the two given QueryOptions.
If the resultant object only has empty fields, returns None.
"""
combined = base or ExecuteSqlRequest.QueryOptions()
larkee marked this conversation as resolved.
Show resolved Hide resolved
if type(combined) == dict:
combined = ExecuteSqlRequest.QueryOptions(
optimizer_version=combined.get("optimizer_version", "")
)
merge = merge or ExecuteSqlRequest.QueryOptions()
if type(merge) == dict:
merge = ExecuteSqlRequest.QueryOptions(
optimizer_version=merge.get("optimizer_version", "")
)
combined.MergeFrom(merge)
if not combined.optimizer_version:
return None
return combined


# pylint: disable=too-many-return-statements,too-many-branches
def _make_value_pb(value):
"""Helper for :func:`_make_list_value_pbs`.
Expand Down
25 changes: 24 additions & 1 deletion google/cloud/spanner_v1/client.py
Expand Up @@ -50,9 +50,10 @@

from google.cloud.client import ClientWithProject
from google.cloud.spanner_v1 import __version__
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import _merge_query_options, _metadata_with_prefix
from google.cloud.spanner_v1.instance import DEFAULT_NODE_COUNT
from google.cloud.spanner_v1.instance import Instance
from google.cloud.spanner_v1.proto.spanner_pb2 import ExecuteSqlRequest

_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
Expand All @@ -62,6 +63,7 @@
"without a scheme: ex %s=localhost:8080."
) % ((EMULATOR_ENV_VAR,) * 3)
SPANNER_ADMIN_SCOPE = "https://www.googleapis.com/auth/spanner.admin"
OPTIMIZER_VERSION_ENV_VAR = "SPANNER_OPTIMIZER_VERSION"
_USER_AGENT_DEPRECATED = (
"The 'user_agent' argument to 'Client' is deprecated / unused. "
"Please pass an appropriate 'client_info' instead."
Expand All @@ -72,6 +74,10 @@ def _get_spanner_emulator_host():
return os.getenv(EMULATOR_ENV_VAR)


def _get_spanner_optimizer_version():
return os.getenv(OPTIMIZER_VERSION_ENV_VAR, "")


class InstanceConfig(object):
"""Named configurations for Spanner instances.

Expand Down Expand Up @@ -132,11 +138,20 @@ class Client(ClientWithProject):
:param user_agent:
(Deprecated) The user agent to be used with API request.
Not used.

:type client_options: :class:`~google.api_core.client_options.ClientOptions`
or :class:`dict`
:param client_options: (Optional) Client options used to set user options
on the client. API Endpoint should be set through client_options.

:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`

:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -157,6 +172,7 @@ def __init__(
client_info=_CLIENT_INFO,
user_agent=None,
client_options=None,
query_options=None,
):
# NOTE: This API has no use for the _http argument, but sending it
# will have no impact since the _http() @property only lazily
Expand All @@ -172,6 +188,13 @@ def __init__(
else:
self._client_options = client_options

env_query_options = ExecuteSqlRequest.QueryOptions(
optimizer_version=_get_spanner_optimizer_version()
)

# Environment flag config has higher precedence than application config.
self._query_options = _merge_query_options(query_options, env_query_options)

if user_agent is not None:
warnings.warn(_USER_AGENT_DEPRECATED, DeprecationWarning, stacklevel=2)
self.user_agent = user_agent
Expand Down
39 changes: 36 additions & 3 deletions google/cloud/spanner_v1/database.py
Expand Up @@ -30,8 +30,11 @@
import six

# pylint: disable=ungrouped-imports
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import (
_make_value_pb,
_merge_query_options,
_metadata_with_prefix,
)
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient
from google.cloud.spanner_v1.gapic.transports import spanner_grpc_transport
Expand Down Expand Up @@ -350,7 +353,9 @@ def drop(self):
metadata = _metadata_with_prefix(self.name)
api.drop_database(self.name, metadata=metadata)

def execute_partitioned_dml(self, dml, params=None, param_types=None):
def execute_partitioned_dml(
self, dml, params=None, param_types=None, query_options=None
):
"""Execute a partitionable DML statement.

:type dml: str
Expand All @@ -365,9 +370,20 @@ def execute_partitioned_dml(self, dml, params=None, param_types=None):
(Optional) maps explicit types for one or more param values;
required if parameters are passed.

:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`

:rtype: int
:returns: Count of rows affected by the DML statement.
"""
query_options = _merge_query_options(
self._instance._client._query_options, query_options
)
if params is not None:
if param_types is None:
raise ValueError("Specify 'param_types' when passing 'params'.")
Expand Down Expand Up @@ -398,6 +414,7 @@ def execute_partitioned_dml(self, dml, params=None, param_types=None):
transaction=txn_selector,
params=params_pb,
param_types=param_types,
query_options=query_options,
metadata=metadata,
)

Expand Down Expand Up @@ -748,6 +765,7 @@ def generate_query_batches(
param_types=None,
partition_size_bytes=None,
max_partitions=None,
query_options=None,
):
"""Start a partitioned query operation.

Expand Down Expand Up @@ -783,6 +801,14 @@ def generate_query_batches(
service uses this as a hint, the actual number of partitions may
differ.

:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`

:rtype: iterable of dict
:returns:
mappings of information used peform actual partitioned reads via
Expand All @@ -801,6 +827,13 @@ def generate_query_batches(
query_info["params"] = params
query_info["param_types"] = param_types

# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = self._database._instance._client._query_options
query_info["query_options"] = _merge_query_options(
default_query_options, query_options
)

for partition in partitions:
yield {"partition": partition, "query": query_info}

Expand Down
14 changes: 13 additions & 1 deletion google/cloud/spanner_v1/session.py
Expand Up @@ -202,6 +202,7 @@ def execute_sql(
params=None,
param_types=None,
query_mode=None,
query_options=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
):
Expand All @@ -225,11 +226,22 @@ def execute_sql(
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
larkee marked this conversation as resolved.
Show resolved Hide resolved
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.

:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().execute_sql(
sql, params, param_types, query_mode, retry=retry, timeout=timeout
sql,
params,
param_types,
query_mode,
query_options=query_options,
retry=retry,
timeout=timeout,
)

def batch(self):
Expand Down
16 changes: 16 additions & 0 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -23,6 +23,7 @@
from google.api_core.exceptions import ServiceUnavailable
import google.api_core.gapic_v1.method
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
Expand Down Expand Up @@ -157,6 +158,7 @@ def execute_sql(
params=None,
param_types=None,
query_mode=None,
query_options=None,
partition=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
Expand All @@ -180,6 +182,14 @@ def execute_sql(
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options:
(Optional) Query optimizer configuration to use for the given query.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`

:type partition: bytes
:param partition: (Optional) one of the partition tokens returned
from :meth:`partition_query`.
Expand Down Expand Up @@ -211,6 +221,11 @@ def execute_sql(
transaction = self._make_txn_selector()
api = database.spanner_api

# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = database._instance._client._query_options
query_options = _merge_query_options(default_query_options, query_options)

restart = functools.partial(
api.execute_streaming_sql,
self._session.name,
Expand All @@ -221,6 +236,7 @@ def execute_sql(
query_mode=query_mode,
partition_token=partition,
seqno=self._execute_sql_count,
query_options=query_options,
metadata=metadata,
retry=retry,
timeout=timeout,
Expand Down
22 changes: 19 additions & 3 deletions google/cloud/spanner_v1/transaction.py
Expand Up @@ -17,8 +17,11 @@
from google.protobuf.struct_pb2 import Struct

from google.cloud._helpers import _pb_timestamp_to_datetime
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import (
_make_value_pb,
_merge_query_options,
_metadata_with_prefix,
)
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
from google.cloud.spanner_v1.snapshot import _SnapshotBase
Expand Down Expand Up @@ -162,7 +165,9 @@ def _make_params_pb(params, param_types):

return None

def execute_update(self, dml, params=None, param_types=None, query_mode=None):
def execute_update(
self, dml, params=None, param_types=None, query_mode=None, query_options=None
):
"""Perform an ``ExecuteSql`` API request with DML.

:type dml: str
Expand All @@ -182,6 +187,11 @@ def execute_update(self, dml, params=None, param_types=None, query_mode=None):
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type query_options:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.

:rtype: int
:returns: Count of rows affected by the DML statement.
"""
Expand All @@ -191,13 +201,19 @@ def execute_update(self, dml, params=None, param_types=None, query_mode=None):
transaction = self._make_txn_selector()
api = database.spanner_api

# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = database._instance._client._query_options
query_options = _merge_query_options(default_query_options, query_options)

response = api.execute_sql(
self._session.name,
dml,
transaction=transaction,
params=params_pb,
param_types=param_types,
query_mode=query_mode,
query_options=query_options,
seqno=self._execute_sql_count,
metadata=metadata,
)
Expand Down