Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] feat: use OperationTimeoutError instead of DeadlineExceeded #914

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigtable/data/__init__.py
Expand Up @@ -39,6 +39,7 @@
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import RowKeySamples
Expand Down Expand Up @@ -68,6 +69,7 @@
"RetryExceptionGroup",
"MutationsExceptionGroup",
"ShardedReadRowsExceptionGroup",
"OperationTimeoutError",
"ShardedQuery",
"TABLE_DEFAULT",
)
12 changes: 6 additions & 6 deletions google/cloud/bigtable/data/_async/client.py
Expand Up @@ -574,7 +574,7 @@ async def read_rows_stream(
Returns:
- an asynchronous iterator that yields rows returned by the query
Raises:
- DeadlineExceeded: raised after operation timeout
- OperationTimeoutError: raised after operation timeout
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
Expand Down Expand Up @@ -627,7 +627,7 @@ async def read_rows(
Returns:
- a list of Rows returned by the query
Raises:
- DeadlineExceeded: raised after operation timeout
- OperationTimeoutError: raised after operation timeout
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
Expand Down Expand Up @@ -671,7 +671,7 @@ async def read_row(
Returns:
- a Row object if the row exists, otherwise None
Raises:
- DeadlineExceeded: raised after operation timeout
- OperationTimeoutError: raised after operation timeout
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
Expand Down Expand Up @@ -806,7 +806,7 @@ async def row_exists(
Returns:
- a bool indicating whether the row exists
Raises:
- DeadlineExceeded: raised after operation timeout
- OperationTimeoutError: raised after operation timeout
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
Expand Down Expand Up @@ -859,7 +859,7 @@ async def sample_row_keys(
Returns:
- a set of RowKeySamples the delimit contiguous sections of the table
Raises:
- DeadlineExceeded: raised after operation timeout
- OperationTimeoutError: raised after operation timeout
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
Expand Down Expand Up @@ -981,7 +981,7 @@ async def mutate_row(
Only idempotent mutations will be retried. Defaults to the Table's
default_retryable_errors.
Raises:
- DeadlineExceeded: raised after operation timeout
- OperationTimeoutError: raised after operation timeout
will be chained with a RetryExceptionGroup containing all
GoogleAPIError exceptions from any retries that failed
- GoogleAPIError: raised on non-idempotent operations that cannot be
Expand Down
11 changes: 7 additions & 4 deletions google/cloud/bigtable/data/_helpers.py
Expand Up @@ -25,6 +25,7 @@
from google.api_core import exceptions as core_exceptions
from google.api_core.retry import RetryFailureReason
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

if TYPE_CHECKING:
import grpc
Expand Down Expand Up @@ -113,19 +114,21 @@ def _retry_exception_factory(
Returns:
- tuple of the exception to raise, and a cause exception if applicable
"""
# use the retry exception group as the cause of the exception
cause_exc: Exception | None = RetryExceptionGroup(exc_list) if exc_list else None
# build the source exception based on failure reason
if reason == RetryFailureReason.TIMEOUT:
timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
# if failed due to timeout, raise deadline exceeded as primary exception
source_exc: Exception = core_exceptions.DeadlineExceeded(
f"operation_timeout{timeout_val_str} exceeded"
source_exc: Exception = OperationTimeoutError(
f"operation_timeout {timeout_val_str}exceeded",
cause=cause_exc,
)
elif exc_list:
# otherwise, raise non-retryable error as primary exception
source_exc = exc_list.pop()
else:
source_exc = RuntimeError("failed with unspecified exception")
# use the retry exception group as the cause of the exception
cause_exc: Exception | None = RetryExceptionGroup(exc_list) if exc_list else None
source_exc.__cause__ = cause_exc
return source_exc, cause_exc

Expand Down
4 changes: 4 additions & 0 deletions google/cloud/bigtable/data/exceptions.py
Expand Up @@ -28,6 +28,10 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery


class OperationTimeoutError(core_exceptions.RetryError, core_exceptions.DeadlineExceeded):
"""Raised when a retryable operation times out"""


class InvalidChunk(core_exceptions.GoogleAPICallError):
"""Exception raised to invalid chunk data from back-end."""

Expand Down
11 changes: 4 additions & 7 deletions tests/unit/data/_async/test__mutate_rows.py
Expand Up @@ -282,7 +282,7 @@ async def test_mutate_rows_incomplete_ignored(self):
"""
from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.api_core.exceptions import DeadlineExceeded
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

client = mock.Mock()
table = mock.Mock()
Expand All @@ -294,17 +294,14 @@ async def test_mutate_rows_incomplete_ignored(self):
AsyncMock(),
) as attempt_mock:
attempt_mock.side_effect = _MutateRowsIncomplete("ignored")
found_exc = None
try:
with pytest.raises(MutationsExceptionGroup) as e:
instance = self._make_one(
client, table, entries, operation_timeout, operation_timeout
)
await instance.start()
except MutationsExceptionGroup as e:
found_exc = e
assert attempt_mock.call_count > 0
assert len(found_exc.exceptions) == 1
assert isinstance(found_exc.exceptions[0].__cause__, DeadlineExceeded)
assert len(e.value.exceptions) == 1
assert isinstance(e.value.exceptions[0].__cause__, OperationTimeoutError)

@pytest.mark.asyncio
async def test_run_attempt_single_entry_success(self):
Expand Down
65 changes: 33 additions & 32 deletions tests/unit/data/_async/test_client.py
Expand Up @@ -1461,20 +1461,21 @@ async def test_read_rows_query_matches_request(self, include_app_profile):
@pytest.mark.parametrize("operation_timeout", [0.001, 0.023, 0.1])
@pytest.mark.asyncio
async def test_read_rows_timeout(self, operation_timeout):
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

async with self._make_table() as table:
read_rows = table.client._gapic_client.read_rows
query = ReadRowsQuery()
chunks = [self._make_chunk(row_key=b"test_1")]
chunks = [core_exceptions.DeadlineExceeded("timeout")] * 5
read_rows.side_effect = lambda *args, **kwargs: self._make_gapic_stream(
chunks, sleep_time=1
chunks, sleep_time=0.05
)
try:
with pytest.raises(OperationTimeoutError) as e:
await table.read_rows(query, operation_timeout=operation_timeout)
except core_exceptions.DeadlineExceeded as e:
assert (
e.message
== f"operation_timeout of {operation_timeout:0.1f}s exceeded"
)
assert (
e.value.message
== f"operation_timeout of {operation_timeout:0.1f}s exceeded"
)

@pytest.mark.parametrize(
"per_request_t, operation_t, expected_num",
Expand All @@ -1497,6 +1498,7 @@ async def test_read_rows_attempt_timeout(
requests to be the ceiling of operation_timeout / attempt_timeout.
"""
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

expected_last_timeout = operation_t - (expected_num - 1) * per_request_t

Expand All @@ -1510,22 +1512,21 @@ async def test_read_rows_attempt_timeout(
query = ReadRowsQuery()
chunks = [core_exceptions.DeadlineExceeded("mock deadline")]

try:
with pytest.raises(OperationTimeoutError) as e:
await table.read_rows(
query,
operation_timeout=operation_t,
attempt_timeout=per_request_t,
)
except core_exceptions.DeadlineExceeded as e:
retry_exc = e.__cause__
if expected_num == 0:
assert retry_exc is None
else:
assert type(retry_exc) is RetryExceptionGroup
assert f"{expected_num} failed attempts" in str(retry_exc)
assert len(retry_exc.exceptions) == expected_num
for sub_exc in retry_exc.exceptions:
assert sub_exc.message == "mock deadline"
retry_exc = e.value.__cause__
if expected_num == 0:
assert retry_exc is None
else:
assert type(retry_exc) is RetryExceptionGroup
assert f"{expected_num} failed attempts" in str(retry_exc)
assert len(retry_exc.exceptions) == expected_num
for sub_exc in retry_exc.exceptions:
assert sub_exc.message == "mock deadline"
assert read_rows.call_count == expected_num
# check timeouts
for _, call_kwargs in read_rows.call_args_list[:-1]:
Expand All @@ -1550,20 +1551,21 @@ async def test_read_rows_attempt_timeout(
)
@pytest.mark.asyncio
async def test_read_rows_retryable_error(self, exc_type):
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

async with self._make_table() as table:
read_rows = table.client._gapic_client.read_rows
read_rows.side_effect = lambda *args, **kwargs: self._make_gapic_stream(
[expected_error]
)
query = ReadRowsQuery()
expected_error = exc_type("mock error")
try:
with pytest.raises(OperationTimeoutError) as e:
await table.read_rows(query, operation_timeout=0.1)
except core_exceptions.DeadlineExceeded as e:
retry_exc = e.__cause__
root_cause = retry_exc.exceptions[0]
assert type(root_cause) is exc_type
assert root_cause == expected_error
retry_exc = e.value.__cause__
root_cause = retry_exc.exceptions[0]
assert type(root_cause) is exc_type
assert root_cause == expected_error

@pytest.mark.parametrize(
"exc_type",
Expand Down Expand Up @@ -2084,16 +2086,16 @@ async def test_sample_row_keys_retryable_errors(self, retryable_exception):
"""
retryable errors should be retried until timeout
"""
from google.api_core.exceptions import DeadlineExceeded
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

async with self._make_client() as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
table.client._gapic_client, "sample_row_keys", AsyncMock()
) as sample_row_keys:
sample_row_keys.side_effect = retryable_exception("mock")
with pytest.raises(DeadlineExceeded) as e:
with pytest.raises(OperationTimeoutError) as e:
await table.sample_row_keys(operation_timeout=0.05)
cause = e.value.__cause__
assert isinstance(cause, RetryExceptionGroup)
Expand Down Expand Up @@ -2190,16 +2192,16 @@ async def test_mutate_row(self, mutation_arg):
)
@pytest.mark.asyncio
async def test_mutate_row_retryable_errors(self, retryable_exception):
from google.api_core.exceptions import DeadlineExceeded
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import OperationTimeoutError

async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_row"
) as mock_gapic:
mock_gapic.side_effect = retryable_exception("mock")
with pytest.raises(DeadlineExceeded) as e:
with pytest.raises(OperationTimeoutError) as e:
mutation = mutations.DeleteAllFromRow()
assert mutation.is_idempotent() is True
await table.mutate_row(
Expand Down Expand Up @@ -2420,6 +2422,7 @@ async def test_bulk_mutate_rows_idempotent_mutation_error_retryable(
RetryExceptionGroup,
FailedMutationEntryError,
MutationsExceptionGroup,
OperationTimeoutError,
)

async with self._make_client(project="project") as client:
Expand All @@ -2443,9 +2446,7 @@ async def test_bulk_mutate_rows_idempotent_mutation_error_retryable(
assert isinstance(cause, RetryExceptionGroup)
assert isinstance(cause.exceptions[0], exception)
# last exception should be due to retry timeout
assert isinstance(
cause.exceptions[-1], core_exceptions.DeadlineExceeded
)
assert isinstance(cause.exceptions[-1], OperationTimeoutError)

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down