Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add public transport property and path formatting methods to client #80

Merged
merged 5 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,7 +18,7 @@
from collections import OrderedDict
import functools
import re
from typing import Dict, AsyncIterable, Sequence, Tuple, Type, Union
from typing import Dict, AsyncIterable, Awaitable, Sequence, Tuple, Type, Union
import pkg_resources

import google.api_core.client_options as ClientOptions # type: ignore
Expand Down Expand Up @@ -53,10 +53,46 @@ class BigQueryReadAsyncClient:
parse_read_session_path = staticmethod(BigQueryReadClient.parse_read_session_path)
read_stream_path = staticmethod(BigQueryReadClient.read_stream_path)
parse_read_stream_path = staticmethod(BigQueryReadClient.parse_read_stream_path)
table_path = staticmethod(BigQueryReadClient.table_path)
parse_table_path = staticmethod(BigQueryReadClient.parse_table_path)

common_billing_account_path = staticmethod(
BigQueryReadClient.common_billing_account_path
)
parse_common_billing_account_path = staticmethod(
BigQueryReadClient.parse_common_billing_account_path
)

common_folder_path = staticmethod(BigQueryReadClient.common_folder_path)
parse_common_folder_path = staticmethod(BigQueryReadClient.parse_common_folder_path)

common_organization_path = staticmethod(BigQueryReadClient.common_organization_path)
parse_common_organization_path = staticmethod(
BigQueryReadClient.parse_common_organization_path
)

common_project_path = staticmethod(BigQueryReadClient.common_project_path)
parse_common_project_path = staticmethod(
BigQueryReadClient.parse_common_project_path
)

common_location_path = staticmethod(BigQueryReadClient.common_location_path)
parse_common_location_path = staticmethod(
BigQueryReadClient.parse_common_location_path
)

from_service_account_file = BigQueryReadClient.from_service_account_file
from_service_account_json = from_service_account_file

@property
def transport(self) -> BigQueryReadTransport:
"""Return the transport used by the client instance.

Returns:
BigQueryReadTransport: The transport used by the client instance.
"""
return self._client.transport

get_transport_class = functools.partial(
type(BigQueryReadClient).get_transport_class, type(BigQueryReadClient)
)
Expand Down Expand Up @@ -191,7 +227,8 @@ async def create_read_session(
# Create or coerce a protobuf request object.
# Sanity check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
if request is not None and any([parent, read_session, max_stream_count]):
has_flattened_params = any([parent, read_session, max_stream_count])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
Expand All @@ -218,7 +255,7 @@ async def create_read_session(
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
exceptions.ServiceUnavailable, exceptions.DeadlineExceeded,
exceptions.DeadlineExceeded, exceptions.ServiceUnavailable,
),
),
default_timeout=600.0,
Expand Down Expand Up @@ -248,7 +285,7 @@ def read_rows(
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> AsyncIterable[storage.ReadRowsResponse]:
) -> Awaitable[AsyncIterable[storage.ReadRowsResponse]]:
r"""Reads rows from the stream in the format prescribed
by the ReadSession. Each response contains one or more
table rows, up to a maximum of 100 MiB per response;
Expand Down Expand Up @@ -291,7 +328,8 @@ def read_rows(
# Create or coerce a protobuf request object.
# Sanity check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
if request is not None and any([read_stream, offset]):
has_flattened_params = any([read_stream, offset])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
Expand Down Expand Up @@ -385,7 +423,7 @@ async def split_read_stream(
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
exceptions.ServiceUnavailable, exceptions.DeadlineExceeded,
exceptions.DeadlineExceeded, exceptions.ServiceUnavailable,
),
),
default_timeout=600.0,
Expand Down
92 changes: 88 additions & 4 deletions google/cloud/bigquery_storage_v1/services/big_query_read/client.py
Expand Up @@ -133,6 +133,15 @@ def from_service_account_file(cls, filename: str, *args, **kwargs):

from_service_account_json = from_service_account_file

@property
def transport(self) -> BigQueryReadTransport:
"""Return the transport used by the client instance.

Returns:
BigQueryReadTransport: The transport used by the client instance.
"""
return self._transport

@staticmethod
def read_session_path(project: str, location: str, session: str,) -> str:
"""Return a fully-qualified read_session string."""
Expand Down Expand Up @@ -167,6 +176,81 @@ def parse_read_stream_path(path: str) -> Dict[str, str]:
)
return m.groupdict() if m else {}

@staticmethod
def table_path(project: str, dataset: str, table: str,) -> str:
"""Return a fully-qualified table string."""
return "projects/{project}/datasets/{dataset}/tables/{table}".format(
project=project, dataset=dataset, table=table,
)

@staticmethod
def parse_table_path(path: str) -> Dict[str, str]:
"""Parse a table path into its component segments."""
m = re.match(
r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$",
path,
)
return m.groupdict() if m else {}

@staticmethod
def common_billing_account_path(billing_account: str,) -> str:
"""Return a fully-qualified billing_account string."""
return "billingAccounts/{billing_account}".format(
billing_account=billing_account,
)

@staticmethod
def parse_common_billing_account_path(path: str) -> Dict[str, str]:
"""Parse a billing_account path into its component segments."""
m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_folder_path(folder: str,) -> str:
"""Return a fully-qualified folder string."""
return "folders/{folder}".format(folder=folder,)

@staticmethod
def parse_common_folder_path(path: str) -> Dict[str, str]:
"""Parse a folder path into its component segments."""
m = re.match(r"^folders/(?P<folder>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_organization_path(organization: str,) -> str:
"""Return a fully-qualified organization string."""
return "organizations/{organization}".format(organization=organization,)

@staticmethod
def parse_common_organization_path(path: str) -> Dict[str, str]:
"""Parse a organization path into its component segments."""
m = re.match(r"^organizations/(?P<organization>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_project_path(project: str,) -> str:
"""Return a fully-qualified project string."""
return "projects/{project}".format(project=project,)

@staticmethod
def parse_common_project_path(path: str) -> Dict[str, str]:
"""Parse a project path into its component segments."""
m = re.match(r"^projects/(?P<project>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_location_path(project: str, location: str,) -> str:
"""Return a fully-qualified location string."""
return "projects/{project}/locations/{location}".format(
project=project, location=location,
)

@staticmethod
def parse_common_location_path(path: str) -> Dict[str, str]:
"""Parse a location path into its component segments."""
m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
return m.groupdict() if m else {}

def __init__(
self,
*,
Expand Down Expand Up @@ -202,10 +286,10 @@ def __init__(
not provided, the default SSL client certificate will be used if
present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
set, no client certificate will be used.
client_info (google.api_core.gapic_v1.client_info.ClientInfo):
The client info used to send a user-agent string along with
API requests. If ``None``, then default info will be used.
Generally, you only need to set this if you're developing
client_info (google.api_core.gapic_v1.client_info.ClientInfo):
The client info used to send a user-agent string along with
API requests. If ``None``, then default info will be used.
Generally, you only need to set this if you're developing
your own client library.

Raises:
Expand Down
Expand Up @@ -118,7 +118,7 @@ def _prep_wrapped_messages(self, client_info):
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
exceptions.ServiceUnavailable, exceptions.DeadlineExceeded,
exceptions.DeadlineExceeded, exceptions.ServiceUnavailable,
),
),
default_timeout=600.0,
Expand All @@ -142,7 +142,7 @@ def _prep_wrapped_messages(self, client_info):
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
exceptions.ServiceUnavailable, exceptions.DeadlineExceeded,
exceptions.DeadlineExceeded, exceptions.ServiceUnavailable,
),
),
default_timeout=600.0,
Expand Down
Expand Up @@ -91,10 +91,10 @@ def __init__(
for grpc channel. It is ignored if ``channel`` is provided.
quota_project_id (Optional[str]): An optional project to use for billing
and quota.
client_info (google.api_core.gapic_v1.client_info.ClientInfo):
The client info used to send a user-agent string along with
API requests. If ``None``, then default info will be used.
Generally, you only need to set this if you're developing
client_info (google.api_core.gapic_v1.client_info.ClientInfo):
The client info used to send a user-agent string along with
API requests. If ``None``, then default info will be used.
Generally, you only need to set this if you're developing
your own client library.

Raises:
Expand Down Expand Up @@ -231,12 +231,8 @@ def create_channel(

@property
def grpc_channel(self) -> grpc.Channel:
"""Create the channel designed to connect to this service.

This property caches on the instance; repeated calls return
the same channel.
"""Return the channel designed to connect to this service.
"""
# Return the channel from cache.
return self._grpc_channel

@property
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigquery_storage_v1/types/storage.py
Expand Up @@ -166,9 +166,9 @@ class ReadRowsResponse(proto.Message):

row_count = proto.Field(proto.INT64, number=6)

stats = proto.Field(proto.MESSAGE, number=2, message=StreamStats,)
stats = proto.Field(proto.MESSAGE, number=2, message="StreamStats",)

throttle_state = proto.Field(proto.MESSAGE, number=5, message=ThrottleState,)
throttle_state = proto.Field(proto.MESSAGE, number=5, message="ThrottleState",)


class SplitReadStreamRequest(proto.Message):
Expand Down
6 changes: 3 additions & 3 deletions synth.metadata
Expand Up @@ -4,15 +4,15 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/python-bigquery-storage.git",
"sha": "a7fe7626312a5b9fe1e7bd0e0fe5601ae97605c7"
"sha": "e290752ee4e771ebda01c2756b7631b40c4e1c5a"
}
},
{
"git": {
"name": "googleapis",
"remote": "https://github.com/googleapis/googleapis.git",
"sha": "062f46f246c78fde2160524db593fa0fa7bdbe64",
"internalRef": "337404700"
"sha": "c7331b75b0b7bbd614373b7d37085db1c80dd4be",
"internalRef": "338157137"
}
},
{
Expand Down