Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add manual wrapper for v1beta2 read client #117

Merged
merged 7 commits into from Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/bigquery_storage_v1beta2/library.rst
@@ -0,0 +1,6 @@
Bigquery Storage v1beta2 API Library
====================================

.. automodule:: google.cloud.bigquery_storage_v1beta2.client
:members:
:inherited-members:
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -21,6 +21,7 @@ API Reference
bigquery_storage_v1/library
bigquery_storage_v1/services
bigquery_storage_v1/types
bigquery_storage_v1beta2/library
bigquery_storage_v1beta2/services
bigquery_storage_v1beta2/types

Expand Down
39 changes: 39 additions & 0 deletions google/cloud/bigquery_storage_v1beta2/__init__.py
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import pkg_resources

__version__ = pkg_resources.get_distribution(
"google-cloud-bigquery-storage"
).version # noqa

from google.cloud.bigquery_storage_v1beta2 import client
from google.cloud.bigquery_storage_v1beta2 import types


class BigQueryReadClient(client.BigQueryReadClient):
__doc__ = client.BigQueryReadClient.__doc__


__all__ = (
# google.cloud.bigquery_storage_v1beta2
"__version__",
"types",
# google.cloud.bigquery_storage_v1beta2.client
"BigQueryReadClient",
)
137 changes: 137 additions & 0 deletions google/cloud/bigquery_storage_v1beta2/client.py
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Parent client for calling the Cloud BigQuery Storage API.

This is the base from which all interactions with the API occur.
"""

from __future__ import absolute_import

import google.api_core.gapic_v1.method

from google.cloud.bigquery_storage_v1 import reader
from google.cloud.bigquery_storage_v1beta2.services import big_query_read


_SCOPES = (
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
)


class BigQueryReadClient(big_query_read.BigQueryReadClient):
"""Client for interacting with BigQuery Storage API.

The BigQuery storage API can be used to read data stored in BigQuery.
"""

def read_rows(
self,
name,
offset=0,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
):
"""
Reads rows from the table in the format prescribed by the read
session. Each response contains one or more table rows, up to a
maximum of 10 MiB per response; read requests which attempt to read
individual rows larger than this will fail.

Each request also returns a set of stream statistics reflecting the
estimated total number of rows in the read stream. This number is
computed based on the total table size and the number of active
streams in the read session, and may change as other streams continue
to read data.

Example:
>>> from google.cloud import bigquery_storage
>>>
>>> client = bigquery_storage.BigQueryReadClient()
>>>
>>> # TODO: Initialize ``table``:
>>> table = "projects/{}/datasets/{}/tables/{}".format(
... 'project_id': 'your-data-project-id',
... 'dataset_id': 'your_dataset_id',
... 'table_id': 'your_table_id',
... )
>>>
>>> # TODO: Initialize `parent`:
>>> parent = 'projects/your-billing-project-id'
>>>
>>> requested_session = bigquery_storage.types.ReadSession(
... table=table,
... data_format=bigquery_storage.types.DataFormat.AVRO,
... )
>>> session = client.create_read_session(
... parent=parent, read_session=requested_session
... )
>>>
>>> stream = session.streams[0], # TODO: Also read any other streams.
>>> read_rows_stream = client.read_rows(stream.name)
>>>
>>> for element in read_rows_stream.rows(session):
... # process element
... pass

Args:
name (str):
Required. Name of the stream to start
reading from, of the form
`projects/{project_id}/locations/{location}/sessions/{session_id}/streams/{stream_id}`
offset (Optional[int]):
The starting offset from which to begin reading rows from
in the stream. The offset requested must be less than the last
row read from ReadRows. Requesting a larger offset is
undefined.
retry (Optional[google.api_core.retry.Retry]): A retry object used
to retry requests. If ``None`` is specified, requests will not
be retried.
timeout (Optional[float]): The amount of time, in seconds, to wait
for the request to complete. Note that if ``retry`` is
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.

Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
An iterable of
:class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`.

Raises:
google.api_core.exceptions.GoogleAPICallError: If the request
failed for any reason.
google.api_core.exceptions.RetryError: If the request failed due
to a retryable error and retry attempts failed.
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
)
2 changes: 1 addition & 1 deletion synth.py
Expand Up @@ -22,7 +22,7 @@

gapic = gcp.GAPICBazel()
common = gcp.CommonTemplates()
versions = ["v1"]
versions = ["v1beta2", "v1"]

for version in versions:
library = gapic.py_library(
Expand Down
9 changes: 1 addition & 8 deletions tests/system/v1/conftest.py → tests/system/conftest.py
Expand Up @@ -20,11 +20,9 @@

import pytest

from google.cloud import bigquery_storage

_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}"

_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../assets")
_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets")


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -52,11 +50,6 @@ def credentials(use_mtls):
return service_account.Credentials.from_service_account_file(filename)


@pytest.fixture(scope="session")
def client(credentials):
return bigquery_storage.BigQueryReadClient(credentials=credentials)


@pytest.fixture()
def table_reference():
return _TABLE_FORMAT.format("bigquery-public-data", "usa_names", "usa_1910_2013")
Expand Down
38 changes: 38 additions & 0 deletions tests/system/reader/conftest.py
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""System tests for reading rows from tables."""

import pytest

from google.cloud import bigquery_storage
from google.cloud import bigquery_storage_v1beta2


@pytest.fixture(scope="session")
def client_v1(credentials):
return bigquery_storage.BigQueryReadClient(credentials=credentials)


@pytest.fixture(scope="session")
def client_v1beta2(credentials):
return bigquery_storage_v1beta2.BigQueryReadClient(credentials=credentials)


@pytest.fixture(scope="session", params=["v1", "v1beta2"])
def client_and_types(request, client_v1, client_v1beta2):
if request.param == "v1":
return client_v1, bigquery_storage.types
return client_v1beta2, bigquery_storage_v1beta2.types