From 45a15382bc1e62dedc944f6484c15ba929338670 Mon Sep 17 00:00:00 2001 From: larkee <31196561+larkee@users.noreply.github.com> Date: Tue, 25 Aug 2020 11:51:47 +1000 Subject: [PATCH] fix: resume iterator on EOS internal error (#122) * fix: resume iterator on EOS internal error * fix: add additional stream resumption message * test: add unit tests * Apply suggestions from code review Co-authored-by: Tres Seaver Co-authored-by: larkee Co-authored-by: Tres Seaver --- google/cloud/spanner_v1/snapshot.py | 17 ++++ tests/unit/test_snapshot.py | 141 ++++++++++++++++++++++++++-- 2 files changed, 151 insertions(+), 7 deletions(-) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 0b5ee1d894..42e71545d4 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -20,6 +20,7 @@ from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector +from google.api_core.exceptions import InternalServerError from google.api_core.exceptions import ServiceUnavailable import google.api_core.gapic_v1.method from google.cloud._helpers import _datetime_to_pb_timestamp @@ -32,6 +33,11 @@ from google.cloud.spanner_v1.types import PartitionOptions from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = ( + "RST_STREAM", + "Received unexpected EOS on DATA frame from server", +) + def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None): """Restart iteration after :exc:`.ServiceUnavailable`. @@ -55,6 +61,17 @@ def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=N with trace_call(trace_name, session, attributes): iterator = restart(resume_token=resume_token) continue + except InternalServerError as exc: + resumable_error = any( + resumable_message in exc.message + for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES + ) + if not resumable_error: + raise + del item_buffer[:] + with trace_call(trace_name, session, attributes): + iterator = restart(resume_token=resume_token) + continue if len(item_buffer) == 0: break diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 5c53ee6a0e..8589a0c363 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -86,12 +86,35 @@ def test_iteration_w_raw_w_resume_tken(self): self.assertNoSpans() def test_iteration_w_raw_raising_unavailable_no_token(self): + from google.api_core.exceptions import ServiceUnavailable + + ITEMS = ( + self._make_item(0), + self._make_item(1, resume_token=RESUME_TOKEN), + self._make_item(2), + ) + before = _MockIterator(fail_after=True, error=ServiceUnavailable("testing")) + after = _MockIterator(*ITEMS) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + self.assertEqual(list(resumable), list(ITEMS)) + self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")]) + self.assertNoSpans() + + def test_iteration_w_raw_raising_retryable_internal_error_no_token(self): + from google.api_core.exceptions import InternalServerError + ITEMS = ( self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN), self._make_item(2), ) - before = _MockIterator(fail_after=True) + before = _MockIterator( + fail_after=True, + error=InternalServerError( + "Received unexpected EOS on DATA frame from server" + ), + ) after = _MockIterator(*ITEMS) restart = mock.Mock(spec=[], side_effect=[before, after]) resumable = self._call_fut(restart) @@ -99,11 +122,32 @@ def test_iteration_w_raw_raising_unavailable_no_token(self): self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")]) self.assertNoSpans() + def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self): + from google.api_core.exceptions import InternalServerError + + ITEMS = ( + self._make_item(0), + self._make_item(1, resume_token=RESUME_TOKEN), + self._make_item(2), + ) + before = _MockIterator(fail_after=True, error=InternalServerError("testing")) + after = _MockIterator(*ITEMS) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + with self.assertRaises(InternalServerError): + list(resumable) + self.assertEqual(restart.mock_calls, [mock.call()]) + self.assertNoSpans() + def test_iteration_w_raw_raising_unavailable(self): + from google.api_core.exceptions import ServiceUnavailable + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) SECOND = (self._make_item(2),) # discarded after 503 LAST = (self._make_item(3),) - before = _MockIterator(*(FIRST + SECOND), fail_after=True) + before = _MockIterator( + *(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing") + ) after = _MockIterator(*LAST) restart = mock.Mock(spec=[], side_effect=[before, after]) resumable = self._call_fut(restart) @@ -113,10 +157,53 @@ def test_iteration_w_raw_raising_unavailable(self): ) self.assertNoSpans() + def test_iteration_w_raw_raising_retryable_internal_error(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2),) # discarded after 503 + LAST = (self._make_item(3),) + before = _MockIterator( + *(FIRST + SECOND), + fail_after=True, + error=InternalServerError( + "Received unexpected EOS on DATA frame from server" + ) + ) + after = _MockIterator(*LAST) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + self.assertEqual(list(resumable), list(FIRST + LAST)) + self.assertEqual( + restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)] + ) + self.assertNoSpans() + + def test_iteration_w_raw_raising_non_retryable_internal_error(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2),) # discarded after 503 + LAST = (self._make_item(3),) + before = _MockIterator( + *(FIRST + SECOND), fail_after=True, error=InternalServerError("testing") + ) + after = _MockIterator(*LAST) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + with self.assertRaises(InternalServerError): + list(resumable) + self.assertEqual(restart.mock_calls, [mock.call()]) + self.assertNoSpans() + def test_iteration_w_raw_raising_unavailable_after_token(self): + from google.api_core.exceptions import ServiceUnavailable + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) SECOND = (self._make_item(2), self._make_item(3)) - before = _MockIterator(*FIRST, fail_after=True) + before = _MockIterator( + *FIRST, fail_after=True, error=ServiceUnavailable("testing") + ) after = _MockIterator(*SECOND) restart = mock.Mock(spec=[], side_effect=[before, after]) resumable = self._call_fut(restart) @@ -126,6 +213,43 @@ def test_iteration_w_raw_raising_unavailable_after_token(self): ) self.assertNoSpans() + def test_iteration_w_raw_raising_retryable_internal_error_after_token(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2), self._make_item(3)) + before = _MockIterator( + *FIRST, + fail_after=True, + error=InternalServerError( + "Received unexpected EOS on DATA frame from server" + ) + ) + after = _MockIterator(*SECOND) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + self.assertEqual(list(resumable), list(FIRST + SECOND)) + self.assertEqual( + restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)] + ) + self.assertNoSpans() + + def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2), self._make_item(3)) + before = _MockIterator( + *FIRST, fail_after=True, error=InternalServerError("testing") + ) + after = _MockIterator(*SECOND) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + with self.assertRaises(InternalServerError): + list(resumable) + self.assertEqual(restart.mock_calls, [mock.call()]) + self.assertNoSpans() + def test_iteration_w_span_creation(self): name = "TestSpan" extra_atts = {"test_att": 1} @@ -136,11 +260,15 @@ def test_iteration_w_span_creation(self): self.assertSpanAttributes(name, attributes=dict(BASE_ATTRIBUTES, test_att=1)) def test_iteration_w_multiple_span_creation(self): + from google.api_core.exceptions import ServiceUnavailable + if HAS_OPENTELEMETRY_INSTALLED: FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) SECOND = (self._make_item(2),) # discarded after 503 LAST = (self._make_item(3),) - before = _MockIterator(*(FIRST + SECOND), fail_after=True) + before = _MockIterator( + *(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing") + ) after = _MockIterator(*LAST) restart = mock.Mock(spec=[], side_effect=[before, after]) name = "TestSpan" @@ -1153,18 +1281,17 @@ class _MockIterator(object): def __init__(self, *values, **kw): self._iter_values = iter(values) self._fail_after = kw.pop("fail_after", False) + self._error = kw.pop("error", Exception) def __iter__(self): return self def __next__(self): - from google.api_core.exceptions import ServiceUnavailable - try: return next(self._iter_values) except StopIteration: if self._fail_after: - raise ServiceUnavailable("testing") + raise self._error raise next = __next__