Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added retry and timeout params to partition read in database and snapshot class #278

Merged
merged 7 commits into from Mar 24, 2021
50 changes: 46 additions & 4 deletions google/cloud/spanner_v1/database.py
Expand Up @@ -26,6 +26,7 @@
from google.api_core.retry import if_exception_type
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import Aborted
from google.api_core import gapic_v1
import six

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -862,6 +863,9 @@ def generate_read_batches(
index="",
partition_size_bytes=None,
max_partitions=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Start a partitioned batch read operation.

Expand Down Expand Up @@ -893,6 +897,12 @@ def generate_read_batches(
service uses this as a hint, the actual number of partitions may
differ.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: iterable of dict
:returns:
mappings of information used peform actual partitioned reads via
Expand All @@ -905,6 +915,8 @@ def generate_read_batches(
index=index,
partition_size_bytes=partition_size_bytes,
max_partitions=max_partitions,
retry=retry,
timeout=timeout,
)

read_info = {
Expand All @@ -916,21 +928,32 @@ def generate_read_batches(
for partition in partitions:
yield {"partition": partition, "read": read_info.copy()}

def process_read_batch(self, batch):
def process_read_batch(
self, batch, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT,
):
"""Process a single, partitioned read.

:type batch: mapping
:param batch:
one of the mappings returned from an earlier call to
:meth:`generate_read_batches`.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.


:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
kwargs = copy.deepcopy(batch["read"])
keyset_dict = kwargs.pop("keyset")
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
return self._get_snapshot().read(partition=batch["partition"], **kwargs)
return self._get_snapshot().read(
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
)

def generate_query_batches(
self,
Expand All @@ -940,6 +963,9 @@ def generate_query_batches(
partition_size_bytes=None,
max_partitions=None,
query_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Start a partitioned query operation.

Expand Down Expand Up @@ -983,6 +1009,12 @@ def generate_query_batches(
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.QueryOptions`

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: iterable of dict
:returns:
mappings of information used peform actual partitioned reads via
Expand All @@ -994,6 +1026,8 @@ def generate_query_batches(
param_types=param_types,
partition_size_bytes=partition_size_bytes,
max_partitions=max_partitions,
retry=retry,
timeout=timeout,
)

query_info = {"sql": sql}
Expand All @@ -1011,19 +1045,27 @@ def generate_query_batches(
for partition in partitions:
yield {"partition": partition, "query": query_info}

def process_query_batch(self, batch):
def process_query_batch(
self, batch, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT,
):
"""Process a single, partitioned query.

:type batch: mapping
:param batch:
one of the mappings returned from an earlier call to
:meth:`generate_query_batches`.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self._get_snapshot().execute_sql(
partition=batch["partition"], **batch["query"]
partition=batch["partition"], **batch["query"], retry=retry, timeout=timeout
)

def process(self, batch):
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner_v1/session.py
Expand Up @@ -258,6 +258,12 @@ def execute_sql(
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
Expand Down
63 changes: 56 additions & 7 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -27,7 +27,7 @@

from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
import google.api_core.gapic_v1.method
from google.api_core import gapic_v1
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
Expand Down Expand Up @@ -109,7 +109,18 @@ def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
"""
raise NotImplementedError

def read(self, table, columns, keyset, index="", limit=0, partition=None):
def read(
self,
table,
columns,
keyset,
index="",
limit=0,
partition=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a ``StreamingRead`` API request for rows in a table.

:type table: str
Expand All @@ -134,6 +145,12 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
from :meth:`partition_read`. Incompatible with
``limit``.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.

Expand Down Expand Up @@ -163,7 +180,11 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
partition_token=partition,
)
restart = functools.partial(
api.streaming_read, request=request, metadata=metadata,
api.streaming_read,
request=request,
metadata=metadata,
retry=retry,
timeout=timeout,
)

trace_attributes = {"table_id": table, "columns": columns}
Expand All @@ -186,8 +207,8 @@ def execute_sql(
query_mode=None,
query_options=None,
partition=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform an ``ExecuteStreamingSql`` API request.

Expand Down Expand Up @@ -224,6 +245,12 @@ def execute_sql(
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.
Expand Down Expand Up @@ -296,6 +323,9 @@ def partition_read(
index="",
partition_size_bytes=None,
max_partitions=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a ``ParitionRead`` API request for rows in a table.

Expand Down Expand Up @@ -323,6 +353,12 @@ def partition_read(
service uses this as a hint, the actual number of partitions may
differ.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: iterable of bytes
:returns: a sequence of partition tokens

Expand Down Expand Up @@ -357,7 +393,9 @@ def partition_read(
with trace_call(
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
):
response = api.partition_read(request=request, metadata=metadata,)
response = api.partition_read(
request=request, metadata=metadata, retry=retry, timeout=timeout,
)

return [partition.partition_token for partition in response.partitions]

Expand All @@ -368,6 +406,9 @@ def partition_query(
param_types=None,
partition_size_bytes=None,
max_partitions=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a ``ParitionQuery`` API request.

Expand All @@ -394,6 +435,12 @@ def partition_query(
service uses this as a hint, the actual number of partitions may
differ.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

:type timeout: float
:param timeout: (Optional) The timeout for this request.

:rtype: iterable of bytes
:returns: a sequence of partition tokens

Expand Down Expand Up @@ -438,7 +485,9 @@ def partition_query(
self._session,
trace_attributes,
):
response = api.partition_query(request=request, metadata=metadata,)
response = api.partition_query(
request=request, metadata=metadata, retry=retry, timeout=timeout,
)

return [partition.partition_token for partition in response.partitions]

Expand Down