Skip to content

Commit

Permalink
feat: add 'timeout' arg to 'Table.mutate_rows' (#157)
Browse files Browse the repository at this point in the history
Also, call data client's 'mutate_rows' directly -- do *not* scribble on its internal API wrappers.

See:
#7 (comment)

Closes #7
  • Loading branch information
tseaver committed Nov 12, 2020
1 parent 44932cb commit 6d597a1
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 91 deletions.
67 changes: 33 additions & 34 deletions google/cloud/bigtable/table.py
Expand Up @@ -20,9 +20,9 @@
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import RetryError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.gapic_v1.method import DEFAULT
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
from google.api_core.gapic_v1.method import wrap_method
from google.cloud._helpers import _to_bytes
from google.cloud.bigtable.backup import Backup
from google.cloud.bigtable.column_family import _gc_rule_from_pb
Expand Down Expand Up @@ -625,7 +625,7 @@ def yield_rows(self, **kwargs):
)
return self.read_rows(**kwargs)

def mutate_rows(self, rows, retry=DEFAULT_RETRY):
def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
"""Mutates multiple rows in bulk.
For example:
Expand Down Expand Up @@ -656,17 +656,23 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY):
the :meth:`~google.api_core.retry.Retry.with_delay` method or the
:meth:`~google.api_core.retry.Retry.with_deadline` method.
:type timeout: float
:param timeout: number of seconds bounding retries for the call
:rtype: list
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
corresponding to success or failure of each row mutation
sent. These will be in the same order as the `rows`.
"""
if timeout is DEFAULT:
timeout = self.mutation_timeout

retryable_mutate_rows = _RetryableMutateRowsWorker(
self._instance._client,
self.name,
rows,
app_profile_id=self._app_profile_id,
timeout=self.mutation_timeout,
timeout=timeout,
)
return retryable_mutate_rows(retry=retry)

Expand Down Expand Up @@ -1058,27 +1064,20 @@ def _do_mutate_retryable_rows(self):
# All mutations are either successful or non-retryable now.
return self.responses_statuses

mutate_rows_request = _mutate_rows_request(
self.table_name, retryable_rows, app_profile_id=self.app_profile_id
)
entries = _compile_mutation_entries(self.table_name, retryable_rows)
data_client = self.client.table_data_client
inner_api_calls = data_client._inner_api_calls
if "mutate_rows" not in inner_api_calls:
default_retry = (data_client._method_configs["MutateRows"].retry,)
if self.timeout is None:
default_timeout = data_client._method_configs["MutateRows"].timeout
else:
default_timeout = timeout.ExponentialTimeout(deadline=self.timeout)
data_client._inner_api_calls["mutate_rows"] = wrap_method(
data_client.transport.mutate_rows,
default_retry=default_retry,
default_timeout=default_timeout,
client_info=data_client._client_info,
)

kwargs = {}
if self.timeout is not None:
kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout)

try:
responses = data_client._inner_api_calls["mutate_rows"](
mutate_rows_request, retry=None
responses = data_client.mutate_rows(
self.table_name,
entries,
app_profile_id=self.app_profile_id,
retry=None,
**kwargs
)
except (ServiceUnavailable, DeadlineExceeded, Aborted):
# If an exception, considered retryable by `RETRY_CODES`, is
Expand Down Expand Up @@ -1260,38 +1259,38 @@ def _create_row_request(
return message


def _mutate_rows_request(table_name, rows, app_profile_id=None):
"""Creates a request to mutate rows in a table.
def _compile_mutation_entries(table_name, rows):
"""Create list of mutation entries
:type table_name: str
:param table_name: The name of the table to write to.
:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.
:type: app_profile_id: str
:param app_profile_id: (Optional) The unique name of the AppProfile.
:rtype: :class:`data_messages_v2_pb2.MutateRowsRequest`
:returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs.
:rtype: List[:class:`data_messages_v2_pb2.MutateRowsRequest.Entry`]
:returns: entries corresponding to the inputs.
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is
greater than 100,000
"""
request_pb = data_messages_v2_pb2.MutateRowsRequest(
table_name=table_name, app_profile_id=app_profile_id
greater than the max ({})
""".format(
_MAX_BULK_MUTATIONS
)
entries = []
mutations_count = 0
entry_klass = data_messages_v2_pb2.MutateRowsRequest.Entry

for row in rows:
_check_row_table_name(table_name, row)
_check_row_type(row)
mutations = row._get_mutations()
request_pb.entries.add(row_key=row.row_key, mutations=mutations)
entries.append(entry_klass(row_key=row.row_key, mutations=mutations))
mutations_count += len(mutations)

if mutations_count > _MAX_BULK_MUTATIONS:
raise TooManyMutationsError(
"Maximum number of mutations is %s" % (_MAX_BULK_MUTATIONS,)
)
return request_pb
return entries


def _check_row_table_name(table_name, row):
Expand Down

0 comments on commit 6d597a1

Please sign in to comment.