Skip to content

Commit

Permalink
fix: resume read stream on Unknown transport-layer exception (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed Aug 6, 2021
1 parent 1ac1356 commit 127caa0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
7 changes: 6 additions & 1 deletion google/cloud/bigquery_storage_v1/reader.py
Expand Up @@ -39,7 +39,12 @@
pyarrow = None


_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,)
_STREAM_RESUMPTION_EXCEPTIONS = (
google.api_core.exceptions.ServiceUnavailable,
# Caused by transport-level error. No status code was received.
# https://github.com/googleapis/python-bigquery-storage/issues/262
google.api_core.exceptions.Unknown,
)

# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
# Unfortunately, this condition is surfaced to the caller as an internal error
Expand Down
27 changes: 21 additions & 6 deletions tests/unit/test_reader_v1.py
Expand Up @@ -103,6 +103,12 @@ def _pages_w_unavailable(pages):
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")


def _pages_w_unknown(pages):
for page in pages:
yield page
raise google.api_core.exceptions.Unknown("No status received")


def _avro_blocks_w_deadline(avro_blocks):
for block in avro_blocks:
yield block
Expand Down Expand Up @@ -237,14 +243,19 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client):
]
avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema))
bq_blocks_2 = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
avro_blocks_2 = _pages_w_resumable_internal_error(
_bq_to_avro_blocks(bq_blocks_2, avro_schema)
)
bq_blocks_3 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_3 = _bq_to_avro_blocks(bq_blocks_3, avro_schema)

mock_gapic_client.read_rows.side_effect = (avro_blocks_2, avro_blocks_3)
bq_blocks_3 = [[{"int_col": -1}, {"int_col": -2}], [{"int_col": -4}]]
avro_blocks_3 = _pages_w_unknown(_bq_to_avro_blocks(bq_blocks_3, avro_schema))
bq_blocks_4 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_4 = _bq_to_avro_blocks(bq_blocks_4, avro_schema)

mock_gapic_client.read_rows.side_effect = (
avro_blocks_2,
avro_blocks_3,
avro_blocks_4,
)

reader = class_under_test(
avro_blocks_1,
Expand All @@ -260,16 +271,20 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client):
itertools.chain.from_iterable(bq_blocks_1),
itertools.chain.from_iterable(bq_blocks_2),
itertools.chain.from_iterable(bq_blocks_3),
itertools.chain.from_iterable(bq_blocks_4),
)
)

assert tuple(got) == expected
mock_gapic_client.read_rows.assert_any_call(
read_stream="teststream", offset=4, metadata={"test-key": "test-value"}
)
mock_gapic_client.read_rows.assert_called_with(
mock_gapic_client.read_rows.assert_any_call(
read_stream="teststream", offset=7, metadata={"test-key": "test-value"}
)
mock_gapic_client.read_rows.assert_called_with(
read_stream="teststream", offset=10, metadata={"test-key": "test-value"}
)


def test_rows_w_reconnect_by_page(class_under_test, mock_gapic_client):
Expand Down

0 comments on commit 127caa0

Please sign in to comment.