Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: address issue in establishing an emulator connection #246

Merged
merged 13 commits into from Mar 19, 2021
91 changes: 64 additions & 27 deletions google/cloud/bigtable/client.py
Expand Up @@ -32,6 +32,7 @@
import grpc

from google.api_core.gapic_v1 import client_info
import google.auth

from google.cloud import bigtable_v2
from google.cloud import bigtable_admin_v2
Expand Down Expand Up @@ -66,20 +67,14 @@
READ_ONLY_SCOPE = "https://www.googleapis.com/auth/bigtable.data.readonly"
"""Scope for reading table data."""


def _create_gapic_client(client_class, client_options=None, transport=None):
def inner(self):
if self._emulator_host is None:
return client_class(
return client_class(
credentials=None,
client_info=self._client_info,
client_options=client_options,
transport=transport,
)
else:
return client_class(
channel=self._emulator_channel, client_info=self._client_info
)

return inner

Expand Down Expand Up @@ -166,16 +161,6 @@ def __init__(
self._admin = bool(admin)
self._client_info = client_info
self._emulator_host = os.getenv(BIGTABLE_EMULATOR)
self._emulator_channel = None

if self._emulator_host is not None:
self._emulator_channel = grpc.insecure_channel(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change moves most of this to a method. Also, using the method derived in firestore, a secure channel can be used.

target=self._emulator_host,
options={
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)

if channel is not None:
warnings.warn(
Expand Down Expand Up @@ -208,22 +193,74 @@ def _get_scopes(self):

return scopes

def _emulator_channel(self, transport, options):
"""
Creates a channel using self._credentials in a similar way to grpc.secure_channel but
using grpc.local_channel_credentials() rather than grpc.ssh_channel_credentials()
to allow easy connection to a local emulator.
:return: grpc.Channel or grpc.aio.Channel
"""
# TODO: Implement a special credentials type for emulator and use
# "transport.create_channel" to create gRPC channels once google-auth
# extends it's allowed credentials types.
# Note: this code also exists in the firestore client.
if "GrpcAsyncIOTransport" in str(transport.__name__):
return grpc.aio.secure_channel(
self._emulator_host, self._local_composite_credentials(), options=options
)
else:
return grpc.secure_channel(
self._emulator_host, self._local_composite_credentials(), options=options
)

def _local_composite_credentials(self):
"""
Creates the credentials for the local emulator channel
:return: grpc.ChannelCredentials
"""
credentials = google.auth.credentials.with_scopes_if_required(
self._credentials, None
)
request = google.auth.transport.requests.Request()

# Create the metadata plugin for inserting the authorization header.
metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin(
credentials, request
)

# Create a set of grpc.CallCredentials using the metadata plugin.
google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)

# Using the local_credentials to allow connection to emulator
local_credentials = grpc.local_channel_credentials()

# Combine the local credentials and the authorization credentials.
return grpc.composite_channel_credentials(
local_credentials, google_auth_credentials
)

def _create_gapic_client_channel(self, client_class, grpc_transport):
options = {
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items()
if self._client_options and self._client_options.api_endpoint:
api_endpoint = self._client_options.api_endpoint
else:
api_endpoint = client_class.DEFAULT_ENDPOINT

channel = grpc_transport.create_channel(
host=api_endpoint,
credentials=self._credentials,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)
channel = None
if self._emulator_host is not None:
api_endpoint = self._emulator_host
channel = self._emulator_channel(grpc_transport, options)
else:
channel = grpc_transport.create_channel(
host=api_endpoint,
credentials=self._credentials,
options=options,
)
transport = grpc_transport(channel=channel, host=api_endpoint)
return transport

Expand Down