Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: resume iterator on EOS internal error (#122)
* fix: resume iterator on EOS internal error

* fix: add additional stream resumption message

* test: add unit tests

* Apply suggestions from code review

Co-authored-by: Tres Seaver <tseaver@palladion.com>

Co-authored-by: larkee <larkee@users.noreply.github.com>
Co-authored-by: Tres Seaver <tseaver@palladion.com>
  • Loading branch information
3 people committed Aug 25, 2020
1 parent 808837b commit 45a1538
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 7 deletions.
17 changes: 17 additions & 0 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -20,6 +20,7 @@
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector

from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
import google.api_core.gapic_v1.method
from google.cloud._helpers import _datetime_to_pb_timestamp
Expand All @@ -32,6 +33,11 @@
from google.cloud.spanner_v1.types import PartitionOptions
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call

_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
"RST_STREAM",
"Received unexpected EOS on DATA frame from server",
)


def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
"""Restart iteration after :exc:`.ServiceUnavailable`.
Expand All @@ -55,6 +61,17 @@ def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=N
with trace_call(trace_name, session, attributes):
iterator = restart(resume_token=resume_token)
continue
except InternalServerError as exc:
resumable_error = any(
resumable_message in exc.message
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
)
if not resumable_error:
raise
del item_buffer[:]
with trace_call(trace_name, session, attributes):
iterator = restart(resume_token=resume_token)
continue

if len(item_buffer) == 0:
break
Expand Down
141 changes: 134 additions & 7 deletions tests/unit/test_snapshot.py
Expand Up @@ -86,24 +86,68 @@ def test_iteration_w_raw_w_resume_tken(self):
self.assertNoSpans()

def test_iteration_w_raw_raising_unavailable_no_token(self):
from google.api_core.exceptions import ServiceUnavailable

ITEMS = (
self._make_item(0),
self._make_item(1, resume_token=RESUME_TOKEN),
self._make_item(2),
)
before = _MockIterator(fail_after=True, error=ServiceUnavailable("testing"))
after = _MockIterator(*ITEMS)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
self.assertEqual(list(resumable), list(ITEMS))
self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")])
self.assertNoSpans()

def test_iteration_w_raw_raising_retryable_internal_error_no_token(self):
from google.api_core.exceptions import InternalServerError

ITEMS = (
self._make_item(0),
self._make_item(1, resume_token=RESUME_TOKEN),
self._make_item(2),
)
before = _MockIterator(fail_after=True)
before = _MockIterator(
fail_after=True,
error=InternalServerError(
"Received unexpected EOS on DATA frame from server"
),
)
after = _MockIterator(*ITEMS)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
self.assertEqual(list(resumable), list(ITEMS))
self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")])
self.assertNoSpans()

def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self):
from google.api_core.exceptions import InternalServerError

ITEMS = (
self._make_item(0),
self._make_item(1, resume_token=RESUME_TOKEN),
self._make_item(2),
)
before = _MockIterator(fail_after=True, error=InternalServerError("testing"))
after = _MockIterator(*ITEMS)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
with self.assertRaises(InternalServerError):
list(resumable)
self.assertEqual(restart.mock_calls, [mock.call()])
self.assertNoSpans()

def test_iteration_w_raw_raising_unavailable(self):
from google.api_core.exceptions import ServiceUnavailable

FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2),) # discarded after 503
LAST = (self._make_item(3),)
before = _MockIterator(*(FIRST + SECOND), fail_after=True)
before = _MockIterator(
*(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing")
)
after = _MockIterator(*LAST)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
Expand All @@ -113,10 +157,53 @@ def test_iteration_w_raw_raising_unavailable(self):
)
self.assertNoSpans()

def test_iteration_w_raw_raising_retryable_internal_error(self):
from google.api_core.exceptions import InternalServerError

FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2),) # discarded after 503
LAST = (self._make_item(3),)
before = _MockIterator(
*(FIRST + SECOND),
fail_after=True,
error=InternalServerError(
"Received unexpected EOS on DATA frame from server"
)
)
after = _MockIterator(*LAST)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
self.assertEqual(list(resumable), list(FIRST + LAST))
self.assertEqual(
restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)]
)
self.assertNoSpans()

def test_iteration_w_raw_raising_non_retryable_internal_error(self):
from google.api_core.exceptions import InternalServerError

FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2),) # discarded after 503
LAST = (self._make_item(3),)
before = _MockIterator(
*(FIRST + SECOND), fail_after=True, error=InternalServerError("testing")
)
after = _MockIterator(*LAST)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
with self.assertRaises(InternalServerError):
list(resumable)
self.assertEqual(restart.mock_calls, [mock.call()])
self.assertNoSpans()

def test_iteration_w_raw_raising_unavailable_after_token(self):
from google.api_core.exceptions import ServiceUnavailable

FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2), self._make_item(3))
before = _MockIterator(*FIRST, fail_after=True)
before = _MockIterator(
*FIRST, fail_after=True, error=ServiceUnavailable("testing")
)
after = _MockIterator(*SECOND)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
Expand All @@ -126,6 +213,43 @@ def test_iteration_w_raw_raising_unavailable_after_token(self):
)
self.assertNoSpans()

def test_iteration_w_raw_raising_retryable_internal_error_after_token(self):
from google.api_core.exceptions import InternalServerError

FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2), self._make_item(3))
before = _MockIterator(
*FIRST,
fail_after=True,
error=InternalServerError(
"Received unexpected EOS on DATA frame from server"
)
)
after = _MockIterator(*SECOND)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
self.assertEqual(list(resumable), list(FIRST + SECOND))
self.assertEqual(
restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)]
)
self.assertNoSpans()

def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self):
from google.api_core.exceptions import InternalServerError

FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2), self._make_item(3))
before = _MockIterator(
*FIRST, fail_after=True, error=InternalServerError("testing")
)
after = _MockIterator(*SECOND)
restart = mock.Mock(spec=[], side_effect=[before, after])
resumable = self._call_fut(restart)
with self.assertRaises(InternalServerError):
list(resumable)
self.assertEqual(restart.mock_calls, [mock.call()])
self.assertNoSpans()

def test_iteration_w_span_creation(self):
name = "TestSpan"
extra_atts = {"test_att": 1}
Expand All @@ -136,11 +260,15 @@ def test_iteration_w_span_creation(self):
self.assertSpanAttributes(name, attributes=dict(BASE_ATTRIBUTES, test_att=1))

def test_iteration_w_multiple_span_creation(self):
from google.api_core.exceptions import ServiceUnavailable

if HAS_OPENTELEMETRY_INSTALLED:
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
SECOND = (self._make_item(2),) # discarded after 503
LAST = (self._make_item(3),)
before = _MockIterator(*(FIRST + SECOND), fail_after=True)
before = _MockIterator(
*(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing")
)
after = _MockIterator(*LAST)
restart = mock.Mock(spec=[], side_effect=[before, after])
name = "TestSpan"
Expand Down Expand Up @@ -1153,18 +1281,17 @@ class _MockIterator(object):
def __init__(self, *values, **kw):
self._iter_values = iter(values)
self._fail_after = kw.pop("fail_after", False)
self._error = kw.pop("error", Exception)

def __iter__(self):
return self

def __next__(self):
from google.api_core.exceptions import ServiceUnavailable

try:
return next(self._iter_values)
except StopIteration:
if self._fail_after:
raise ServiceUnavailable("testing")
raise self._error
raise

next = __next__

0 comments on commit 45a1538

Please sign in to comment.