Skip to content

Commit

Permalink
fix: don't fail with 429 when downloading wide tables (#79)
Browse files Browse the repository at this point in the history
* fix: don't fail with 429 when downloading wide tables

* make ssl_credentials match more generic

* make synth.py more robust

* update synth to update tests

* fix updates to synth.py
  • Loading branch information
tswast committed Oct 21, 2020
1 parent e290752 commit 45faf97
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 11 deletions.
Expand Up @@ -145,6 +145,10 @@ def __init__(
ssl_credentials=ssl_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
else:
host = host if ":" in host else host + ":443"
Expand All @@ -162,6 +166,10 @@ def __init__(
ssl_credentials=ssl_channel_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)

self._stubs = {} # type: Dict[str, Callable]
Expand Down
Expand Up @@ -190,6 +190,10 @@ def __init__(
ssl_credentials=ssl_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
else:
host = host if ":" in host else host + ":443"
Expand All @@ -207,6 +211,10 @@ def __init__(
ssl_credentials=ssl_channel_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)

# Run the base constructor.
Expand Down
56 changes: 45 additions & 11 deletions synth.py
Expand Up @@ -77,7 +77,9 @@
unit_test_dependencies=optional_deps,
cov_level=95,
)
s.move(templated_files, excludes=[".coveragerc"]) # microgenerator has a good .coveragerc file
s.move(
templated_files, excludes=[".coveragerc"]
) # microgenerator has a good .coveragerc file


# ----------------------------------------------------------------------------
Expand All @@ -94,13 +96,51 @@
'\g<0>\n\n session.install("google-cloud-bigquery")',
)

# Remove client-side validation of message length.
# https://github.com/googleapis/python-bigquery-storage/issues/78
s.replace(
[
"google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py",
"google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py",
],
(
r"type\(self\).create_channel\(\s*"
r"host,\s*"
r"credentials=credentials,\s*"
r"credentials_file=credentials_file,\s*"
r"ssl_credentials=ssl_[a-z_]*credentials,\s*"
r"scopes=scopes or self.AUTH_SCOPES,\s*"
r"quota_project_id=quota_project_id"
),
"""\g<0>,
options=(
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
)""",
)
s.replace(
"tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py",
(
r"grpc_create_channel\.assert_called_once_with\([^()]+"
r"scopes=\([^()]+\),\s*"
r"ssl_credentials=[a-z_]+,\s*"
r"quota_project_id=None"
),
"""\g<0>,
options=(
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
)""",
)


# We don't want the generated client to be accessible through
# "google.cloud.bigquery_storage", replace it with the hand written client that
# wraps it.
s.replace(
"google/cloud/bigquery_storage/__init__.py",
r"from google\.cloud\.bigquery_storage_v1\.services.big_query_read.client import",
"from google.cloud.bigquery_storage_v1 import"
"from google.cloud.bigquery_storage_v1 import",
)

# We also don't want to expose the async client just yet, at least not until
Expand All @@ -115,7 +155,7 @@
)
s.replace(
"google/cloud/bigquery_storage/__init__.py",
r"""["']BigQueryReadAsyncClient["'],\n""",
r"""["']BigQueryReadAsyncClient["'],\n""",
"",
)

Expand All @@ -133,11 +173,7 @@
s.replace(
"google/cloud/bigquery_storage/__init__.py",
r"""["']ArrowRecordBatch["']""",
(
'"__version__",\n'
' "types",\n'
" \g<0>"
),
('"__version__",\n' ' "types",\n' " \g<0>"),
)

# We want to expose all types through "google.cloud.bigquery_storage.types",
Expand Down Expand Up @@ -190,9 +226,7 @@
),
)
s.replace(
"noxfile.py",
r'--cov=tests\.unit',
'--cov=tests/unit',
"noxfile.py", r"--cov=tests\.unit", "--cov=tests/unit",
)

# TODO(busunkim): Use latest sphinx after microgenerator transition
Expand Down
28 changes: 28 additions & 0 deletions tests/system/v1/test_reader_v1.py
Expand Up @@ -461,3 +461,31 @@ def test_resuming_read_from_offset(
expected_len = 164656 # total rows in shakespeare table
actual_len = remaining_rows_count + some_rows.row_count + more_rows.row_count
assert actual_len == expected_len


def test_read_rows_to_dataframe_with_wide_table(client, project_id):
# Use a wide table to boost the chance of getting a large message size.
# https://github.com/googleapis/python-bigquery-storage/issues/78
read_session = types.ReadSession()
read_session.table = "projects/{}/datasets/{}/tables/{}".format(
"bigquery-public-data", "geo_census_tracts", "us_census_tracts_national"
)
read_session.data_format = types.DataFormat.ARROW

session = client.create_read_session(
request={
"parent": "projects/{}".format(project_id),
"read_session": read_session,
"max_stream_count": 1,
}
)

stream = session.streams[0].name

read_rows_stream = client.read_rows(stream)

# fetch the first two batches of rows
pages_iter = iter(read_rows_stream.rows(session).pages)
some_rows = next(pages_iter)

assert all(len(row["tract_geom"].as_py()) > 0 for row in some_rows)
8 changes: 8 additions & 0 deletions tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py
Expand Up @@ -1232,6 +1232,10 @@ def test_big_query_read_transport_channel_mtls_with_client_cert_source(transport
),
ssl_credentials=mock_ssl_cred,
quota_project_id=None,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
assert transport.grpc_channel == mock_grpc_channel

Expand Down Expand Up @@ -1273,6 +1277,10 @@ def test_big_query_read_transport_channel_mtls_with_adc(transport_class):
),
ssl_credentials=mock_ssl_cred,
quota_project_id=None,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
assert transport.grpc_channel == mock_grpc_channel

Expand Down

0 comments on commit 45faf97

Please sign in to comment.