From 45faf9712b25bd63d962ca7e5afc8b8d3a0d8353 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Oct 2020 10:50:07 -0500 Subject: [PATCH] fix: don't fail with 429 when downloading wide tables (#79) * fix: don't fail with 429 when downloading wide tables * make ssl_credentials match more generic * make synth.py more robust * update synth to update tests * fix updates to synth.py --- .../big_query_read/transports/grpc.py | 8 +++ .../big_query_read/transports/grpc_asyncio.py | 8 +++ synth.py | 56 +++++++++++++++---- tests/system/v1/test_reader_v1.py | 28 ++++++++++ .../test_big_query_read.py | 8 +++ 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py index 7777e68c..36377f1d 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py @@ -145,6 +145,10 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) else: host = host if ":" in host else host + ":443" @@ -162,6 +166,10 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) self._stubs = {} # type: Dict[str, Callable] diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py index c1715a24..b383f36d 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py @@ -190,6 +190,10 @@ def __init__( ssl_credentials=ssl_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) else: host = host if ":" in host else host + ":443" @@ -207,6 +211,10 @@ def __init__( ssl_credentials=ssl_channel_credentials, scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) # Run the base constructor. diff --git a/synth.py b/synth.py index e3fa7eea..863ba860 100644 --- a/synth.py +++ b/synth.py @@ -77,7 +77,9 @@ unit_test_dependencies=optional_deps, cov_level=95, ) -s.move(templated_files, excludes=[".coveragerc"]) # microgenerator has a good .coveragerc file +s.move( + templated_files, excludes=[".coveragerc"] +) # microgenerator has a good .coveragerc file # ---------------------------------------------------------------------------- @@ -94,13 +96,51 @@ '\g<0>\n\n session.install("google-cloud-bigquery")', ) +# Remove client-side validation of message length. +# https://github.com/googleapis/python-bigquery-storage/issues/78 +s.replace( + [ + "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", + "google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py", + ], + ( + r"type\(self\).create_channel\(\s*" + r"host,\s*" + r"credentials=credentials,\s*" + r"credentials_file=credentials_file,\s*" + r"ssl_credentials=ssl_[a-z_]*credentials,\s*" + r"scopes=scopes or self.AUTH_SCOPES,\s*" + r"quota_project_id=quota_project_id" + ), + """\g<0>, + options=( + ('grpc.max_send_message_length', -1), + ('grpc.max_receive_message_length', -1) + )""", +) +s.replace( + "tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py", + ( + r"grpc_create_channel\.assert_called_once_with\([^()]+" + r"scopes=\([^()]+\),\s*" + r"ssl_credentials=[a-z_]+,\s*" + r"quota_project_id=None" + ), + """\g<0>, + options=( + ('grpc.max_send_message_length', -1), + ('grpc.max_receive_message_length', -1) + )""", +) + + # We don't want the generated client to be accessible through # "google.cloud.bigquery_storage", replace it with the hand written client that # wraps it. s.replace( "google/cloud/bigquery_storage/__init__.py", r"from google\.cloud\.bigquery_storage_v1\.services.big_query_read.client import", - "from google.cloud.bigquery_storage_v1 import" + "from google.cloud.bigquery_storage_v1 import", ) # We also don't want to expose the async client just yet, at least not until @@ -115,7 +155,7 @@ ) s.replace( "google/cloud/bigquery_storage/__init__.py", - r"""["']BigQueryReadAsyncClient["'],\n""", + r"""["']BigQueryReadAsyncClient["'],\n""", "", ) @@ -133,11 +173,7 @@ s.replace( "google/cloud/bigquery_storage/__init__.py", r"""["']ArrowRecordBatch["']""", - ( - '"__version__",\n' - ' "types",\n' - " \g<0>" - ), + ('"__version__",\n' ' "types",\n' " \g<0>"), ) # We want to expose all types through "google.cloud.bigquery_storage.types", @@ -190,9 +226,7 @@ ), ) s.replace( - "noxfile.py", - r'--cov=tests\.unit', - '--cov=tests/unit', + "noxfile.py", r"--cov=tests\.unit", "--cov=tests/unit", ) # TODO(busunkim): Use latest sphinx after microgenerator transition diff --git a/tests/system/v1/test_reader_v1.py b/tests/system/v1/test_reader_v1.py index ff0e5b9f..a77b65f7 100644 --- a/tests/system/v1/test_reader_v1.py +++ b/tests/system/v1/test_reader_v1.py @@ -461,3 +461,31 @@ def test_resuming_read_from_offset( expected_len = 164656 # total rows in shakespeare table actual_len = remaining_rows_count + some_rows.row_count + more_rows.row_count assert actual_len == expected_len + + +def test_read_rows_to_dataframe_with_wide_table(client, project_id): + # Use a wide table to boost the chance of getting a large message size. + # https://github.com/googleapis/python-bigquery-storage/issues/78 + read_session = types.ReadSession() + read_session.table = "projects/{}/datasets/{}/tables/{}".format( + "bigquery-public-data", "geo_census_tracts", "us_census_tracts_national" + ) + read_session.data_format = types.DataFormat.ARROW + + session = client.create_read_session( + request={ + "parent": "projects/{}".format(project_id), + "read_session": read_session, + "max_stream_count": 1, + } + ) + + stream = session.streams[0].name + + read_rows_stream = client.read_rows(stream) + + # fetch the first two batches of rows + pages_iter = iter(read_rows_stream.rows(session).pages) + some_rows = next(pages_iter) + + assert all(len(row["tract_geom"].as_py()) > 0 for row in some_rows) diff --git a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py index 5ce4b3b2..a86692f3 100644 --- a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py +++ b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py @@ -1232,6 +1232,10 @@ def test_big_query_read_transport_channel_mtls_with_client_cert_source(transport ), ssl_credentials=mock_ssl_cred, quota_project_id=None, + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) assert transport.grpc_channel == mock_grpc_channel @@ -1273,6 +1277,10 @@ def test_big_query_read_transport_channel_mtls_with_adc(transport_class): ), ssl_credentials=mock_ssl_cred, quota_project_id=None, + options=( + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ), ) assert transport.grpc_channel == mock_grpc_channel