Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't fail with 429 when downloading wide tables #79

Merged
merged 5 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -145,6 +145,10 @@ def __init__(
ssl_credentials=ssl_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
else:
host = host if ":" in host else host + ":443"
Expand All @@ -162,6 +166,10 @@ def __init__(
ssl_credentials=ssl_channel_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)

self._stubs = {} # type: Dict[str, Callable]
Expand Down
Expand Up @@ -190,6 +190,10 @@ def __init__(
ssl_credentials=ssl_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
else:
host = host if ":" in host else host + ":443"
Expand All @@ -207,6 +211,10 @@ def __init__(
ssl_credentials=ssl_channel_credentials,
scopes=scopes or self.AUTH_SCOPES,
quota_project_id=quota_project_id,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)

# Run the base constructor.
Expand Down
56 changes: 45 additions & 11 deletions synth.py
Expand Up @@ -77,7 +77,9 @@
unit_test_dependencies=optional_deps,
cov_level=95,
)
s.move(templated_files, excludes=[".coveragerc"]) # microgenerator has a good .coveragerc file
s.move(
templated_files, excludes=[".coveragerc"]
) # microgenerator has a good .coveragerc file


# ----------------------------------------------------------------------------
Expand All @@ -94,13 +96,51 @@
'\g<0>\n\n session.install("google-cloud-bigquery")',
)

# Remove client-side validation of message length.
# https://github.com/googleapis/python-bigquery-storage/issues/78
s.replace(
[
"google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py",
"google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py",
],
(
r"type\(self\).create_channel\(\s*"
r"host,\s*"
r"credentials=credentials,\s*"
r"credentials_file=credentials_file,\s*"
r"ssl_credentials=ssl_[a-z_]*credentials,\s*"
r"scopes=scopes or self.AUTH_SCOPES,\s*"
r"quota_project_id=quota_project_id"
),
"""\g<0>,
options=(
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
)""",
)
s.replace(
"tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py",
(
r"grpc_create_channel\.assert_called_once_with\([^()]+"
r"scopes=\([^()]+\),\s*"
r"ssl_credentials=[a-z_]+,\s*"
r"quota_project_id=None"
),
"""\g<0>,
options=(
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
)""",
)


# We don't want the generated client to be accessible through
# "google.cloud.bigquery_storage", replace it with the hand written client that
# wraps it.
s.replace(
"google/cloud/bigquery_storage/__init__.py",
r"from google\.cloud\.bigquery_storage_v1\.services.big_query_read.client import",
"from google.cloud.bigquery_storage_v1 import"
"from google.cloud.bigquery_storage_v1 import",
)

# We also don't want to expose the async client just yet, at least not until
Expand All @@ -115,7 +155,7 @@
)
s.replace(
"google/cloud/bigquery_storage/__init__.py",
r"""["']BigQueryReadAsyncClient["'],\n""",
r"""["']BigQueryReadAsyncClient["'],\n""",
"",
)

Expand All @@ -133,11 +173,7 @@
s.replace(
"google/cloud/bigquery_storage/__init__.py",
r"""["']ArrowRecordBatch["']""",
(
'"__version__",\n'
' "types",\n'
" \g<0>"
),
('"__version__",\n' ' "types",\n' " \g<0>"),
)

# We want to expose all types through "google.cloud.bigquery_storage.types",
Expand Down Expand Up @@ -190,9 +226,7 @@
),
)
s.replace(
"noxfile.py",
r'--cov=tests\.unit',
'--cov=tests/unit',
"noxfile.py", r"--cov=tests\.unit", "--cov=tests/unit",
)

# TODO(busunkim): Use latest sphinx after microgenerator transition
Expand Down
28 changes: 28 additions & 0 deletions tests/system/v1/test_reader_v1.py
Expand Up @@ -461,3 +461,31 @@ def test_resuming_read_from_offset(
expected_len = 164656 # total rows in shakespeare table
actual_len = remaining_rows_count + some_rows.row_count + more_rows.row_count
assert actual_len == expected_len


def test_read_rows_to_dataframe_with_wide_table(client, project_id):
# Use a wide table to boost the chance of getting a large message size.
# https://github.com/googleapis/python-bigquery-storage/issues/78
read_session = types.ReadSession()
read_session.table = "projects/{}/datasets/{}/tables/{}".format(
"bigquery-public-data", "geo_census_tracts", "us_census_tracts_national"
)
read_session.data_format = types.DataFormat.ARROW

session = client.create_read_session(
request={
"parent": "projects/{}".format(project_id),
"read_session": read_session,
"max_stream_count": 1,
}
)

stream = session.streams[0].name

read_rows_stream = client.read_rows(stream)

# fetch the first two batches of rows
pages_iter = iter(read_rows_stream.rows(session).pages)
some_rows = next(pages_iter)

assert all(len(row["tract_geom"].as_py()) > 0 for row in some_rows)
8 changes: 8 additions & 0 deletions tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py
Expand Up @@ -1232,6 +1232,10 @@ def test_big_query_read_transport_channel_mtls_with_client_cert_source(transport
),
ssl_credentials=mock_ssl_cred,
quota_project_id=None,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
assert transport.grpc_channel == mock_grpc_channel

Expand Down Expand Up @@ -1273,6 +1277,10 @@ def test_big_query_read_transport_channel_mtls_with_adc(transport_class):
),
ssl_credentials=mock_ssl_cred,
quota_project_id=None,
options=(
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
),
)
assert transport.grpc_channel == mock_grpc_channel

Expand Down