Skip to content

Commit

Permalink
feat: added retry and timeout params to partition read in database an…
Browse files Browse the repository at this point in the history
…d snapshot class (#278)

* feat: added retry and timeout params to partition read in database and snapshot class

* feat: lint corrections

* feat: added retry and timeout support in process_read_batch and process_query_batch

* feat: added retry and timeout support in process_read_batch and process_query_batch

* feat: changed retry to retry object in tests
  • Loading branch information
vi3k6i5 committed Mar 24, 2021
1 parent 2fd0352 commit 1a7c9d2
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 23 deletions.
50 changes: 46 additions & 4 deletions google/cloud/spanner_v1/database.py
Expand Up @@ -26,6 +26,7 @@
from google.api_core.retry import if_exception_type
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import Aborted
from google.api_core import gapic_v1
import six

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -915,6 +916,9 @@ def generate_read_batches(
index="",
partition_size_bytes=None,
max_partitions=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Start a partitioned batch read operation.
Expand Down Expand Up @@ -946,6 +950,12 @@ def generate_read_batches(
service uses this as a hint, the actual number of partitions may
differ.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: iterable of dict
:returns:
mappings of information used perform actual partitioned reads via
Expand All @@ -958,6 +968,8 @@ def generate_read_batches(
index=index,
partition_size_bytes=partition_size_bytes,
max_partitions=max_partitions,
retry=retry,
timeout=timeout,
)

read_info = {
Expand All @@ -969,21 +981,32 @@ def generate_read_batches(
for partition in partitions:
yield {"partition": partition, "read": read_info.copy()}

def process_read_batch(self, batch):
def process_read_batch(
self, batch, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT,
):
"""Process a single, partitioned read.
:type batch: mapping
:param batch:
one of the mappings returned from an earlier call to
:meth:`generate_read_batches`.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
kwargs = copy.deepcopy(batch["read"])
keyset_dict = kwargs.pop("keyset")
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
return self._get_snapshot().read(partition=batch["partition"], **kwargs)
return self._get_snapshot().read(
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
)

def generate_query_batches(
self,
Expand All @@ -993,6 +1016,9 @@ def generate_query_batches(
partition_size_bytes=None,
max_partitions=None,
query_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Start a partitioned query operation.
Expand Down Expand Up @@ -1036,6 +1062,12 @@ def generate_query_batches(
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: iterable of dict
:returns:
mappings of information used perform actual partitioned reads via
Expand All @@ -1047,6 +1079,8 @@ def generate_query_batches(
param_types=param_types,
partition_size_bytes=partition_size_bytes,
max_partitions=max_partitions,
retry=retry,
timeout=timeout,
)

query_info = {"sql": sql}
Expand All @@ -1064,19 +1098,27 @@ def generate_query_batches(
for partition in partitions:
yield {"partition": partition, "query": query_info}

def process_query_batch(self, batch):
def process_query_batch(
self, batch, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT,
):
"""Process a single, partitioned query.
:type batch: mapping
:param batch:
one of the mappings returned from an earlier call to
:meth:`generate_query_batches`.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self._get_snapshot().execute_sql(
partition=batch["partition"], **batch["query"]
partition=batch["partition"], **batch["query"], retry=retry, timeout=timeout
)

def process(self, batch):
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner_v1/session.py
Expand Up @@ -258,6 +258,12 @@ def execute_sql(
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
Expand Down
63 changes: 56 additions & 7 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -27,7 +27,7 @@

from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
import google.api_core.gapic_v1.method
from google.api_core import gapic_v1
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
Expand Down Expand Up @@ -109,7 +109,18 @@ def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
"""
raise NotImplementedError

def read(self, table, columns, keyset, index="", limit=0, partition=None):
def read(
self,
table,
columns,
keyset,
index="",
limit=0,
partition=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a ``StreamingRead`` API request for rows in a table.
:type table: str
Expand All @@ -134,6 +145,12 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
from :meth:`partition_read`. Incompatible with
``limit``.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
Expand Down Expand Up @@ -163,7 +180,11 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
partition_token=partition,
)
restart = functools.partial(
api.streaming_read, request=request, metadata=metadata,
api.streaming_read,
request=request,
metadata=metadata,
retry=retry,
timeout=timeout,
)

trace_attributes = {"table_id": table, "columns": columns}
Expand All @@ -186,8 +207,8 @@ def execute_sql(
query_mode=None,
query_options=None,
partition=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform an ``ExecuteStreamingSql`` API request.
Expand Down Expand Up @@ -224,6 +245,12 @@ def execute_sql(
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.
Expand Down Expand Up @@ -296,6 +323,9 @@ def partition_read(
index="",
partition_size_bytes=None,
max_partitions=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a ``PartitionRead`` API request for rows in a table.
Expand Down Expand Up @@ -323,6 +353,12 @@ def partition_read(
service uses this as a hint, the actual number of partitions may
differ.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: iterable of bytes
:returns: a sequence of partition tokens
Expand Down Expand Up @@ -357,7 +393,9 @@ def partition_read(
with trace_call(
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
):
response = api.partition_read(request=request, metadata=metadata,)
response = api.partition_read(
request=request, metadata=metadata, retry=retry, timeout=timeout,
)

return [partition.partition_token for partition in response.partitions]

Expand All @@ -368,6 +406,9 @@ def partition_query(
param_types=None,
partition_size_bytes=None,
max_partitions=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a ``PartitionQuery`` API request.
Expand All @@ -394,6 +435,12 @@ def partition_query(
service uses this as a hint, the actual number of partitions may
differ.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: iterable of bytes
:returns: a sequence of partition tokens
Expand Down Expand Up @@ -438,7 +485,9 @@ def partition_query(
self._session,
trace_attributes,
):
response = api.partition_query(request=request, metadata=metadata,)
response = api.partition_query(
request=request, metadata=metadata, retry=retry, timeout=timeout,
)

return [partition.partition_token for partition in response.partitions]

Expand Down

0 comments on commit 1a7c9d2

Please sign in to comment.