diff --git a/docs/bigquery_storage_v1beta2/library.rst b/docs/bigquery_storage_v1beta2/library.rst new file mode 100644 index 00000000..4e25d9d8 --- /dev/null +++ b/docs/bigquery_storage_v1beta2/library.rst @@ -0,0 +1,6 @@ +Bigquery Storage v1beta2 API Library +==================================== + +.. automodule:: google.cloud.bigquery_storage_v1beta2.client + :members: + :inherited-members: diff --git a/docs/index.rst b/docs/index.rst index 6892b30c..802cdca2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -21,6 +21,7 @@ API Reference bigquery_storage_v1/library bigquery_storage_v1/services bigquery_storage_v1/types + bigquery_storage_v1beta2/library bigquery_storage_v1beta2/services bigquery_storage_v1beta2/types diff --git a/google/cloud/bigquery_storage_v1beta2/__init__.py b/google/cloud/bigquery_storage_v1beta2/__init__.py new file mode 100644 index 00000000..6d0b34e1 --- /dev/null +++ b/google/cloud/bigquery_storage_v1beta2/__init__.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import pkg_resources + +__version__ = pkg_resources.get_distribution( + "google-cloud-bigquery-storage" +).version # noqa + +from google.cloud.bigquery_storage_v1beta2 import client +from google.cloud.bigquery_storage_v1beta2 import types + + +class BigQueryReadClient(client.BigQueryReadClient): + __doc__ = client.BigQueryReadClient.__doc__ + + +__all__ = ( + # google.cloud.bigquery_storage_v1beta2 + "__version__", + "types", + # google.cloud.bigquery_storage_v1beta2.client + "BigQueryReadClient", +) diff --git a/google/cloud/bigquery_storage_v1beta2/client.py b/google/cloud/bigquery_storage_v1beta2/client.py new file mode 100644 index 00000000..f2776a20 --- /dev/null +++ b/google/cloud/bigquery_storage_v1beta2/client.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Parent client for calling the Cloud BigQuery Storage API. + +This is the base from which all interactions with the API occur. +""" + +from __future__ import absolute_import + +import google.api_core.gapic_v1.method + +from google.cloud.bigquery_storage_v1 import reader +from google.cloud.bigquery_storage_v1beta2.services import big_query_read + + +_SCOPES = ( + "https://www.googleapis.com/auth/bigquery", + "https://www.googleapis.com/auth/cloud-platform", +) + + +class BigQueryReadClient(big_query_read.BigQueryReadClient): + """Client for interacting with BigQuery Storage API. + + The BigQuery storage API can be used to read data stored in BigQuery. + """ + + def read_rows( + self, + name, + offset=0, + retry=google.api_core.gapic_v1.method.DEFAULT, + timeout=google.api_core.gapic_v1.method.DEFAULT, + metadata=(), + ): + """ + Reads rows from the table in the format prescribed by the read + session. Each response contains one or more table rows, up to a + maximum of 10 MiB per response; read requests which attempt to read + individual rows larger than this will fail. + + Each request also returns a set of stream statistics reflecting the + estimated total number of rows in the read stream. This number is + computed based on the total table size and the number of active + streams in the read session, and may change as other streams continue + to read data. + + Example: + >>> from google.cloud import bigquery_storage + >>> + >>> client = bigquery_storage.BigQueryReadClient() + >>> + >>> # TODO: Initialize ``table``: + >>> table = "projects/{}/datasets/{}/tables/{}".format( + ... 'project_id': 'your-data-project-id', + ... 'dataset_id': 'your_dataset_id', + ... 'table_id': 'your_table_id', + ... ) + >>> + >>> # TODO: Initialize `parent`: + >>> parent = 'projects/your-billing-project-id' + >>> + >>> requested_session = bigquery_storage.types.ReadSession( + ... table=table, + ... data_format=bigquery_storage.types.DataFormat.AVRO, + ... ) + >>> session = client.create_read_session( + ... parent=parent, read_session=requested_session + ... ) + >>> + >>> stream = session.streams[0], # TODO: Also read any other streams. + >>> read_rows_stream = client.read_rows(stream.name) + >>> + >>> for element in read_rows_stream.rows(session): + ... # process element + ... pass + + Args: + name (str): + Required. Name of the stream to start + reading from, of the form + `projects/{project_id}/locations/{location}/sessions/{session_id}/streams/{stream_id}` + offset (Optional[int]): + The starting offset from which to begin reading rows from + in the stream. The offset requested must be less than the last + row read from ReadRows. Requesting a larger offset is + undefined. + retry (Optional[google.api_core.retry.Retry]): A retry object used + to retry requests. If ``None`` is specified, requests will not + be retried. + timeout (Optional[float]): The amount of time, in seconds, to wait + for the request to complete. Note that if ``retry`` is + specified, the timeout applies to each individual attempt. + metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata + that is provided to the method. + + Returns: + ~google.cloud.bigquery_storage_v1.reader.ReadRowsStream: + An iterable of + :class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`. + + Raises: + google.api_core.exceptions.GoogleAPICallError: If the request + failed for any reason. + google.api_core.exceptions.RetryError: If the request failed due + to a retryable error and retry attempts failed. + ValueError: If the parameters are invalid. + """ + gapic_client = super(BigQueryReadClient, self) + stream = gapic_client.read_rows( + read_stream=name, + offset=offset, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return reader.ReadRowsStream( + stream, + gapic_client, + name, + offset, + {"retry": retry, "timeout": timeout, "metadata": metadata}, + ) diff --git a/synth.py b/synth.py index 21fb8b3a..f9abef9e 100644 --- a/synth.py +++ b/synth.py @@ -22,7 +22,7 @@ gapic = gcp.GAPICBazel() common = gcp.CommonTemplates() -versions = ["v1"] +versions = ["v1beta2", "v1"] for version in versions: library = gapic.py_library( diff --git a/tests/system/v1/conftest.py b/tests/system/conftest.py similarity index 97% rename from tests/system/v1/conftest.py rename to tests/system/conftest.py index f8ac01f5..dd42e736 100644 --- a/tests/system/v1/conftest.py +++ b/tests/system/conftest.py @@ -20,11 +20,9 @@ import pytest -from google.cloud import bigquery_storage - _TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}" -_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../assets") +_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets") @pytest.fixture(scope="session") @@ -52,11 +50,6 @@ def credentials(use_mtls): return service_account.Credentials.from_service_account_file(filename) -@pytest.fixture(scope="session") -def client(credentials): - return bigquery_storage.BigQueryReadClient(credentials=credentials) - - @pytest.fixture() def table_reference(): return _TABLE_FORMAT.format("bigquery-public-data", "usa_names", "usa_1910_2013") diff --git a/tests/system/reader/conftest.py b/tests/system/reader/conftest.py new file mode 100644 index 00000000..c27bd771 --- /dev/null +++ b/tests/system/reader/conftest.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""System tests for reading rows from tables.""" + +import pytest + +from google.cloud import bigquery_storage +from google.cloud import bigquery_storage_v1beta2 + + +@pytest.fixture(scope="session") +def client_v1(credentials): + return bigquery_storage.BigQueryReadClient(credentials=credentials) + + +@pytest.fixture(scope="session") +def client_v1beta2(credentials): + return bigquery_storage_v1beta2.BigQueryReadClient(credentials=credentials) + + +@pytest.fixture(scope="session", params=["v1", "v1beta2"]) +def client_and_types(request, client_v1, client_v1beta2): + if request.param == "v1": + return client_v1, bigquery_storage.types + return client_v1beta2, bigquery_storage_v1beta2.types diff --git a/tests/system/v1/test_reader_v1.py b/tests/system/reader/test_reader.py similarity index 78% rename from tests/system/v1/test_reader_v1.py rename to tests/system/reader/test_reader.py index a77b65f7..d0328041 100644 --- a/tests/system/v1/test_reader_v1.py +++ b/tests/system/reader/test_reader.py @@ -24,7 +24,6 @@ import pytz from google.cloud import bigquery -from google.cloud.bigquery_storage import types def _to_bq_table_ref(table_name_string, partition_suffix=""): @@ -54,12 +53,16 @@ def _to_bq_table_ref(table_name_string, partition_suffix=""): @pytest.mark.parametrize( "data_format,expected_schema_type", - ((types.DataFormat.AVRO, "avro_schema"), (types.DataFormat.ARROW, "arrow_schema")), + (("AVRO", "avro_schema"), ("ARROW", "arrow_schema")), ) def test_read_rows_as_blocks_full_table( - client, project_id, small_table_reference, data_format, expected_schema_type + client_and_types, + project_id, + small_table_reference, + data_format, + expected_schema_type, ): - + client, types = client_and_types read_session = types.ReadSession() read_session.table = small_table_reference read_session.data_format = data_format @@ -81,14 +84,11 @@ def test_read_rows_as_blocks_full_table( assert len(blocks) > 0 -@pytest.mark.parametrize( - "data_format,expected_schema_type", - ((types.DataFormat.AVRO, "avro_schema"), (types.DataFormat.ARROW, "arrow_schema")), -) +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) def test_read_rows_as_rows_full_table( - client, project_id, small_table_reference, data_format, expected_schema_type + client_and_types, project_id, small_table_reference, data_format ): - + client, types = client_and_types read_session = types.ReadSession() read_session.table = small_table_reference read_session.data_format = data_format @@ -107,10 +107,11 @@ def test_read_rows_as_rows_full_table( assert len(rows) > 0 -@pytest.mark.parametrize( - "data_format", ((types.DataFormat.AVRO), (types.DataFormat.ARROW)) -) -def test_basic_nonfiltered_read(client, project_id, table_with_data_ref, data_format): +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) +def test_basic_nonfiltered_read( + client_and_types, project_id, table_with_data_ref, data_format +): + client, types = client_and_types read_session = types.ReadSession() read_session.table = table_with_data_ref read_session.data_format = data_format @@ -129,7 +130,8 @@ def test_basic_nonfiltered_read(client, project_id, table_with_data_ref, data_fo assert len(rows) == 5 # all table rows -def test_filtered_rows_read(client, project_id, table_with_data_ref): +def test_filtered_rows_read(client_and_types, project_id, table_with_data_ref): + client, types = client_and_types read_session = types.ReadSession() read_session.table = table_with_data_ref read_session.data_format = types.DataFormat.AVRO @@ -149,11 +151,11 @@ def test_filtered_rows_read(client, project_id, table_with_data_ref): assert len(rows) == 2 -@pytest.mark.parametrize( - "data_format", ((types.DataFormat.AVRO), (types.DataFormat.ARROW)) -) -def test_column_selection_read(client, project_id, table_with_data_ref, data_format): - +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) +def test_column_selection_read( + client_and_types, project_id, table_with_data_ref, data_format +): + client, types = client_and_types read_session = types.ReadSession() read_session.table = table_with_data_ref read_session.data_format = data_format @@ -175,14 +177,14 @@ def test_column_selection_read(client, project_id, table_with_data_ref, data_for assert sorted(row.keys()) == ["age", "first_name"] -def test_snapshot(client, project_id, table_with_data_ref, bq_client): - before_new_data = types.Timestamp() - before_new_data.GetCurrentTime() +def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client): + client, types = client_and_types + before_new_data = dt.datetime.now(tz=dt.timezone.utc) # load additional data into the table new_data = [ - {u"first_name": u"NewGuyFoo", u"last_name": u"Smith", u"age": 46}, - {u"first_name": u"NewGuyBar", u"last_name": u"Jones", u"age": 30}, + {"first_name": "NewGuyFoo", "last_name": "Smith", "age": 46}, + {"first_name": "NewGuyBar", "last_name": "Jones", "age": 30}, ] destination = _to_bq_table_ref(table_with_data_ref) @@ -214,8 +216,9 @@ def test_snapshot(client, project_id, table_with_data_ref, bq_client): def test_column_partitioned_table( - client, project_id, col_partition_table_ref, bq_client + client_and_types, project_id, col_partition_table_ref, bq_client ): + client, types = client_and_types data = [ {"description": "Tracking established.", "occurred": "2017-02-15"}, {"description": "Look, a solar eclipse!", "occurred": "2018-02-15"}, @@ -256,12 +259,11 @@ def test_column_partitioned_table( assert row["description"] in expected_descriptions -@pytest.mark.parametrize( - "data_format", ((types.DataFormat.AVRO), (types.DataFormat.ARROW)) -) +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) def test_ingestion_time_partitioned_table( - client, project_id, ingest_partition_table_ref, bq_client, data_format + client_and_types, project_id, ingest_partition_table_ref, bq_client, data_format ): + client, types = client_and_types data = [{"shape": "cigar", "altitude": 1200}, {"shape": "disc", "altitude": 750}] destination = _to_bq_table_ref( ingest_partition_table_ref, partition_suffix="$20190809" @@ -306,37 +308,38 @@ def test_ingestion_time_partitioned_table( rows = list(client.read_rows(stream).rows(session)) assert len(rows) == 2 + data_format = getattr(types.DataFormat, data_format) if data_format == types.DataFormat.AVRO: actual_items = {(row["shape"], row["altitude"]) for row in rows} - else: - assert data_format == types.DataFormat.ARROW + elif data_format == types.DataFormat.ARROW: actual_items = {(row["shape"].as_py(), row["altitude"].as_py()) for row in rows} + else: + raise AssertionError(f"got unexpected data_format: {data_format}") expected_items = {("sphere", 3500), ("doughnut", 100)} assert actual_items == expected_items -@pytest.mark.parametrize( - "data_format", ((types.DataFormat.AVRO), (types.DataFormat.ARROW)) -) +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) def test_decoding_data_types( - client, project_id, all_types_table_ref, bq_client, data_format + client_and_types, project_id, all_types_table_ref, bq_client, data_format ): + client, types = client_and_types data = [ { - u"string_field": u"Price: € 9.95.", - u"bytes_field": bigquery._helpers._bytes_to_json(b"byteees"), - u"int64_field": -1085, - u"float64_field": -42.195, - u"numeric_field": "1.4142", - u"bool_field": True, - u"geography_field": '{"type": "Point", "coordinates": [-49.3028, 69.0622]}', - u"person_struct_field": {u"name": u"John", u"age": 42}, - u"timestamp_field": 1565357902.017896, # 2019-08-09T13:38:22.017896 - u"date_field": u"1995-03-17", - u"time_field": u"16:24:51", - u"datetime_field": u"2005-10-26T19:49:41", - u"string_array_field": [u"foo", u"bar", u"baz"], + "string_field": "Price: € 9.95.", + "bytes_field": bigquery._helpers._bytes_to_json(b"byteees"), + "int64_field": -1085, + "float64_field": -42.195, + "numeric_field": "1.4142", + "bool_field": True, + "geography_field": '{"type": "Point", "coordinates": [-49.3028, 69.0622]}', + "person_struct_field": {"name": "John", "age": 42}, + "timestamp_field": 1565357902.017896, # 2019-08-09T13:38:22.017896 + "date_field": "1995-03-17", + "time_field": "16:24:51", + "datetime_field": "2005-10-26T19:49:41", + "string_array_field": ["foo", "bar", "baz"], } ] @@ -386,28 +389,30 @@ def test_decoding_data_types( stream = session.streams[0].name + data_format = getattr(types.DataFormat, data_format) if data_format == types.DataFormat.AVRO: rows = list(client.read_rows(stream).rows(session)) - else: - assert data_format == types.DataFormat.ARROW + elif data_format == types.DataFormat.ARROW: rows = list( dict((key, value.as_py()) for key, value in row_dict.items()) for row_dict in client.read_rows(stream).rows(session) ) + else: + raise AssertionError(f"got unexpected data_format: {data_format}") expected_result = { - u"string_field": u"Price: € 9.95.", - u"bytes_field": b"byteees", - u"int64_field": -1085, - u"float64_field": -42.195, - u"numeric_field": decimal.Decimal("1.4142"), - u"bool_field": True, - u"geography_field": "POINT(-49.3028 69.0622)", - u"person_struct_field": {u"name": u"John", u"age": 42}, - u"timestamp_field": dt.datetime(2019, 8, 9, 13, 38, 22, 17896, tzinfo=pytz.UTC), - u"date_field": dt.date(1995, 3, 17), - u"time_field": dt.time(16, 24, 51), - u"string_array_field": [u"foo", u"bar", u"baz"], + "string_field": "Price: € 9.95.", + "bytes_field": b"byteees", + "int64_field": -1085, + "float64_field": -42.195, + "numeric_field": decimal.Decimal("1.4142"), + "bool_field": True, + "geography_field": "POINT(-49.3028 69.0622)", + "person_struct_field": {"name": "John", "age": 42}, + "timestamp_field": dt.datetime(2019, 8, 9, 13, 38, 22, 17896, tzinfo=pytz.UTC), + "date_field": dt.date(1995, 3, 17), + "time_field": dt.time(16, 24, 51), + "string_array_field": ["foo", "bar", "baz"], } result_copy = copy.copy(rows[0]) @@ -421,13 +426,11 @@ def test_decoding_data_types( assert expected_pattern.match(str(rows[0]["datetime_field"])) -@pytest.mark.parametrize( - "data_format", ((types.DataFormat.AVRO), (types.DataFormat.ARROW)) -) +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) def test_resuming_read_from_offset( - client, project_id, data_format, local_shakespeare_table_reference + client_and_types, project_id, data_format, local_shakespeare_table_reference ): - + client, types = client_and_types read_session = types.ReadSession() read_session.table = local_shakespeare_table_reference read_session.data_format = data_format @@ -463,9 +466,10 @@ def test_resuming_read_from_offset( assert actual_len == expected_len -def test_read_rows_to_dataframe_with_wide_table(client, project_id): +def test_read_rows_to_dataframe_with_wide_table(client_and_types, project_id): # Use a wide table to boost the chance of getting a large message size. # https://github.com/googleapis/python-bigquery-storage/issues/78 + client, types = client_and_types read_session = types.ReadSession() read_session.table = "projects/{}/datasets/{}/tables/{}".format( "bigquery-public-data", "geo_census_tracts", "us_census_tracts_national" diff --git a/tests/system/v1/test_reader_dataframe_v1.py b/tests/system/reader/test_reader_dataframe.py similarity index 91% rename from tests/system/v1/test_reader_dataframe_v1.py rename to tests/system/reader/test_reader_dataframe.py index ce1a46ee..7defe888 100644 --- a/tests/system/v1/test_reader_dataframe_v1.py +++ b/tests/system/reader/test_reader_dataframe.py @@ -19,10 +19,9 @@ import pyarrow.types import pytest -from google.cloud.bigquery_storage import types - -def test_read_v1(client, project_id): +def test_read_rows_to_arrow(client_and_types, project_id): + client, types = client_and_types read_session = types.ReadSession() read_session.table = "projects/{}/datasets/{}/tables/{}".format( "bigquery-public-data", "new_york_citibike", "citibike_stations" @@ -60,9 +59,12 @@ def test_read_v1(client, project_id): @pytest.mark.parametrize( "data_format,expected_schema_type", - ((types.DataFormat.AVRO, "avro_schema"), (types.DataFormat.ARROW, "arrow_schema")), + (("AVRO", "avro_schema"), ("ARROW", "arrow_schema")), ) -def test_read_rows_to_dataframe(client, project_id, data_format, expected_schema_type): +def test_read_rows_to_dataframe( + client_and_types, project_id, data_format, expected_schema_type +): + client, types = client_and_types read_session = types.ReadSession() read_session.table = "projects/{}/datasets/{}/tables/{}".format( "bigquery-public-data", "new_york_citibike", "citibike_stations"