diff --git a/google/cloud/bigquery_storage_v1/reader.py b/google/cloud/bigquery_storage_v1/reader.py index a8cd226c..fd6e630c 100644 --- a/google/cloud/bigquery_storage_v1/reader.py +++ b/google/cloud/bigquery_storage_v1/reader.py @@ -39,7 +39,12 @@ pyarrow = None -_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,) +_STREAM_RESUMPTION_EXCEPTIONS = ( + google.api_core.exceptions.ServiceUnavailable, + # Caused by transport-level error. No status code was received. + # https://github.com/googleapis/python-bigquery-storage/issues/262 + google.api_core.exceptions.Unknown, +) # The Google API endpoint can unexpectedly close long-running HTTP/2 streams. # Unfortunately, this condition is surfaced to the caller as an internal error diff --git a/tests/unit/test_reader_v1.py b/tests/unit/test_reader_v1.py index 838ef51a..94d63c0c 100644 --- a/tests/unit/test_reader_v1.py +++ b/tests/unit/test_reader_v1.py @@ -103,6 +103,12 @@ def _pages_w_unavailable(pages): raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect") +def _pages_w_unknown(pages): + for page in pages: + yield page + raise google.api_core.exceptions.Unknown("No status received") + + def _avro_blocks_w_deadline(avro_blocks): for block in avro_blocks: yield block @@ -237,14 +243,19 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client): ] avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema)) bq_blocks_2 = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]] - avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema) avro_blocks_2 = _pages_w_resumable_internal_error( _bq_to_avro_blocks(bq_blocks_2, avro_schema) ) - bq_blocks_3 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]] - avro_blocks_3 = _bq_to_avro_blocks(bq_blocks_3, avro_schema) - - mock_gapic_client.read_rows.side_effect = (avro_blocks_2, avro_blocks_3) + bq_blocks_3 = [[{"int_col": -1}, {"int_col": -2}], [{"int_col": -4}]] + avro_blocks_3 = _pages_w_unknown(_bq_to_avro_blocks(bq_blocks_3, avro_schema)) + bq_blocks_4 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]] + avro_blocks_4 = _bq_to_avro_blocks(bq_blocks_4, avro_schema) + + mock_gapic_client.read_rows.side_effect = ( + avro_blocks_2, + avro_blocks_3, + avro_blocks_4, + ) reader = class_under_test( avro_blocks_1, @@ -260,6 +271,7 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client): itertools.chain.from_iterable(bq_blocks_1), itertools.chain.from_iterable(bq_blocks_2), itertools.chain.from_iterable(bq_blocks_3), + itertools.chain.from_iterable(bq_blocks_4), ) ) @@ -267,9 +279,12 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client): mock_gapic_client.read_rows.assert_any_call( read_stream="teststream", offset=4, metadata={"test-key": "test-value"} ) - mock_gapic_client.read_rows.assert_called_with( + mock_gapic_client.read_rows.assert_any_call( read_stream="teststream", offset=7, metadata={"test-key": "test-value"} ) + mock_gapic_client.read_rows.assert_called_with( + read_stream="teststream", offset=10, metadata={"test-key": "test-value"} + ) def test_rows_w_reconnect_by_page(class_under_test, mock_gapic_client):