Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 'timeout' arg to 'Table.mutate_rows' #157

Merged
merged 4 commits into from Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 33 additions & 34 deletions google/cloud/bigtable/table.py
Expand Up @@ -20,9 +20,9 @@
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import RetryError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.gapic_v1.method import DEFAULT
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
from google.api_core.gapic_v1.method import wrap_method
from google.cloud._helpers import _to_bytes
from google.cloud.bigtable.backup import Backup
from google.cloud.bigtable.column_family import _gc_rule_from_pb
Expand Down Expand Up @@ -625,7 +625,7 @@ def yield_rows(self, **kwargs):
)
return self.read_rows(**kwargs)

def mutate_rows(self, rows, retry=DEFAULT_RETRY):
def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
"""Mutates multiple rows in bulk.

For example:
Expand Down Expand Up @@ -656,17 +656,23 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY):
the :meth:`~google.api_core.retry.Retry.with_delay` method or the
:meth:`~google.api_core.retry.Retry.with_deadline` method.

:type timeout: float
:param timeout: number of seconds bounding retries for the call

:rtype: list
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
corresponding to success or failure of each row mutation
sent. These will be in the same order as the `rows`.
"""
if timeout is DEFAULT:
timeout = self.mutation_timeout

retryable_mutate_rows = _RetryableMutateRowsWorker(
self._instance._client,
self.name,
rows,
app_profile_id=self._app_profile_id,
timeout=self.mutation_timeout,
timeout=timeout,
)
return retryable_mutate_rows(retry=retry)

Expand Down Expand Up @@ -1058,27 +1064,20 @@ def _do_mutate_retryable_rows(self):
# All mutations are either successful or non-retryable now.
return self.responses_statuses

mutate_rows_request = _mutate_rows_request(
self.table_name, retryable_rows, app_profile_id=self.app_profile_id
)
entries = _compile_mutation_entries(self.table_name, retryable_rows)
data_client = self.client.table_data_client
inner_api_calls = data_client._inner_api_calls
if "mutate_rows" not in inner_api_calls:
default_retry = (data_client._method_configs["MutateRows"].retry,)
if self.timeout is None:
default_timeout = data_client._method_configs["MutateRows"].timeout
else:
default_timeout = timeout.ExponentialTimeout(deadline=self.timeout)
data_client._inner_api_calls["mutate_rows"] = wrap_method(
data_client.transport.mutate_rows,
default_retry=default_retry,
default_timeout=default_timeout,
client_info=data_client._client_info,
)

kwargs = {}
if self.timeout is not None:
kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout)

try:
responses = data_client._inner_api_calls["mutate_rows"](
mutate_rows_request, retry=None
responses = data_client.mutate_rows(
self.table_name,
entries,
app_profile_id=self.app_profile_id,
retry=None,
**kwargs
)
except (ServiceUnavailable, DeadlineExceeded, Aborted):
# If an exception, considered retryable by `RETRY_CODES`, is
Expand Down Expand Up @@ -1260,38 +1259,38 @@ def _create_row_request(
return message


def _mutate_rows_request(table_name, rows, app_profile_id=None):
"""Creates a request to mutate rows in a table.
def _compile_mutation_entries(table_name, rows):
"""Create list of mutation entries

:type table_name: str
:param table_name: The name of the table to write to.

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

:type: app_profile_id: str
:param app_profile_id: (Optional) The unique name of the AppProfile.

:rtype: :class:`data_messages_v2_pb2.MutateRowsRequest`
:returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs.
:rtype: List[:class:`data_messages_v2_pb2.MutateRowsRequest.Entry`]
:returns: entries corresponding to the inputs.
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is
greater than 100,000
"""
request_pb = data_messages_v2_pb2.MutateRowsRequest(
table_name=table_name, app_profile_id=app_profile_id
greater than the max ({})
""".format(
_MAX_BULK_MUTATIONS
)
entries = []
mutations_count = 0
entry_klass = data_messages_v2_pb2.MutateRowsRequest.Entry

for row in rows:
_check_row_table_name(table_name, row)
_check_row_type(row)
mutations = row._get_mutations()
request_pb.entries.add(row_key=row.row_key, mutations=mutations)
entries.append(entry_klass(row_key=row.row_key, mutations=mutations))
mutations_count += len(mutations)

if mutations_count > _MAX_BULK_MUTATIONS:
raise TooManyMutationsError(
"Maximum number of mutations is %s" % (_MAX_BULK_MUTATIONS,)
)
return request_pb
return entries


def _check_row_table_name(table_name, row):
Expand Down