Skip to content

Commit

Permalink
fix: address issue in establishing an emulator connection (#246)
Browse files Browse the repository at this point in the history
Adjusts emulation code to use a newer method of creating a gRPC channel
adds a test scenario to validate emulation.
  • Loading branch information
crwilcox committed Mar 19, 2021
1 parent cf3b9a1 commit 1a31826
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 54 deletions.
102 changes: 71 additions & 31 deletions google/cloud/bigtable/client.py
Expand Up @@ -32,6 +32,7 @@
import grpc

from google.api_core.gapic_v1 import client_info
import google.auth

from google.cloud import bigtable_v2
from google.cloud import bigtable_admin_v2
Expand Down Expand Up @@ -69,17 +70,12 @@

def _create_gapic_client(client_class, client_options=None, transport=None):
def inner(self):
if self._emulator_host is None:
return client_class(
credentials=None,
client_info=self._client_info,
client_options=client_options,
transport=transport,
)
else:
return client_class(
channel=self._emulator_channel, client_info=self._client_info
)
return client_class(
credentials=None,
client_info=self._client_info,
client_options=client_options,
transport=transport,
)

return inner

Expand Down Expand Up @@ -166,16 +162,6 @@ def __init__(
self._admin = bool(admin)
self._client_info = client_info
self._emulator_host = os.getenv(BIGTABLE_EMULATOR)
self._emulator_channel = None

if self._emulator_host is not None:
self._emulator_channel = grpc.insecure_channel(
target=self._emulator_host,
options={
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)

if channel is not None:
warnings.warn(
Expand Down Expand Up @@ -208,22 +194,76 @@ def _get_scopes(self):

return scopes

def _emulator_channel(self, transport, options):
"""
Creates a channel using self._credentials in a similar way to grpc.secure_channel but
using grpc.local_channel_credentials() rather than grpc.ssh_channel_credentials()
to allow easy connection to a local emulator.
:return: grpc.Channel or grpc.aio.Channel
"""
# TODO: Implement a special credentials type for emulator and use
# "transport.create_channel" to create gRPC channels once google-auth
# extends it's allowed credentials types.
# Note: this code also exists in the firestore client.
if "GrpcAsyncIOTransport" in str(transport.__name__):
return grpc.aio.secure_channel(
self._emulator_host,
self._local_composite_credentials(),
options=options,
)
else:
return grpc.secure_channel(
self._emulator_host,
self._local_composite_credentials(),
options=options,
)

def _local_composite_credentials(self):
"""
Creates the credentials for the local emulator channel
:return: grpc.ChannelCredentials
"""
credentials = google.auth.credentials.with_scopes_if_required(
self._credentials, None
)
request = google.auth.transport.requests.Request()

# Create the metadata plugin for inserting the authorization header.
metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin(
credentials, request
)

# Create a set of grpc.CallCredentials using the metadata plugin.
google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)

# Using the local_credentials to allow connection to emulator
local_credentials = grpc.local_channel_credentials()

# Combine the local credentials and the authorization credentials.
return grpc.composite_channel_credentials(
local_credentials, google_auth_credentials
)

def _create_gapic_client_channel(self, client_class, grpc_transport):
options = {
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items()
if self._client_options and self._client_options.api_endpoint:
api_endpoint = self._client_options.api_endpoint
else:
api_endpoint = client_class.DEFAULT_ENDPOINT

channel = grpc_transport.create_channel(
host=api_endpoint,
credentials=self._credentials,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)
channel = None
if self._emulator_host is not None:
api_endpoint = self._emulator_host
channel = self._emulator_channel(grpc_transport, options)
else:
channel = grpc_transport.create_channel(
host=api_endpoint, credentials=self._credentials, options=options,
)
transport = grpc_transport(channel=channel, host=api_endpoint)
return transport

Expand Down
26 changes: 26 additions & 0 deletions noxfile.py
Expand Up @@ -33,6 +33,7 @@
# 'docfx' is excluded since it only needs to run in 'docs-presubmit'
nox.options.sessions = [
"unit",
"system_emulated",
"system",
"cover",
"lint",
Expand Down Expand Up @@ -111,6 +112,31 @@ def unit(session):
default(session)


@nox.session(python="3.8")
def system_emulated(session):
import subprocess
import signal

try:
subprocess.call(["gcloud", "--version"])
except OSError:
session.skip("gcloud not found but required for emulator support")

# Currently, CI/CD doesn't have beta component of gcloud.
subprocess.call(["gcloud", "components", "install", "beta", "bigtable"])

hostport = "localhost:8789"
p = subprocess.Popen(
["gcloud", "beta", "emulators", "bigtable", "start", "--host-port", hostport]
)

session.env["BIGTABLE_EMULATOR_HOST"] = hostport
system(session)

# Stop Emulator
os.killpg(os.getpgid(p.pid), signal.SIGTERM)


@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
def system(session):
"""Run the system test suite."""
Expand Down
14 changes: 8 additions & 6 deletions tests/system.py
Expand Up @@ -24,7 +24,8 @@
from google.cloud.environment_vars import BIGTABLE_EMULATOR
from test_utils.retry import RetryErrors
from test_utils.retry import RetryResult
from test_utils.system import EmulatorCreds

# from test_utils.system import EmulatorCreds
from test_utils.system import unique_resource_id

from google.cloud._helpers import _datetime_from_microseconds
Expand Down Expand Up @@ -114,11 +115,9 @@ def setUpModule():

Config.IN_EMULATOR = os.getenv(BIGTABLE_EMULATOR) is not None

if Config.IN_EMULATOR:
credentials = EmulatorCreds()
Config.CLIENT = Client(admin=True, credentials=credentials)
else:
Config.CLIENT = Client(admin=True)
# Previously we created clients using a mock EmulatorCreds when targeting
# an emulator.
Config.CLIENT = Client(admin=True)

Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID, labels=LABELS)
Config.CLUSTER = Config.INSTANCE.cluster(
Expand Down Expand Up @@ -840,6 +839,9 @@ def test_delete_column_family(self):
self.assertEqual(temp_table.list_column_families(), {})

def test_backup(self):
if Config.IN_EMULATOR:
self.skipTest("backups are not supported in the emulator")

from google.cloud._helpers import _datetime_to_pb_timestamp

temp_table_id = "test-backup-table"
Expand Down
41 changes: 24 additions & 17 deletions tests/unit/test_client.py
Expand Up @@ -67,16 +67,23 @@ def test_w_emulator(self):
client_class = mock.Mock()
emulator_host = emulator_channel = object()
credentials = _make_credentials()
client_options = mock.Mock()
transport = mock.Mock()

client = _Client(
credentials, emulator_host=emulator_host, emulator_channel=emulator_channel
)
client_info = client._client_info = mock.Mock()

result = self._invoke_client_factory(client_class)(client)
result = self._invoke_client_factory(
client_class, client_options=client_options, transport=transport
)(client)

self.assertIs(result, client_class.return_value)
client_class.assert_called_once_with(
channel=client._emulator_channel, client_info=client_info
credentials=None,
client_info=client_info,
client_options=client_options,
transport=transport,
)


Expand Down Expand Up @@ -121,7 +128,6 @@ def test_constructor_defaults(self):
self.assertIs(client._client_info, _CLIENT_INFO)
self.assertIsNone(client._channel)
self.assertIsNone(client._emulator_host)
self.assertIsNone(client._emulator_channel)
self.assertEqual(client.SCOPE, (DATA_SCOPE,))

def test_constructor_explicit(self):
Expand Down Expand Up @@ -167,22 +173,23 @@ def test_constructor_with_emulator_host(self):

credentials = _make_credentials()
emulator_host = "localhost:8081"
with mock.patch("os.getenv") as getenv:
getenv.return_value = emulator_host
with mock.patch("grpc.insecure_channel") as factory:
getenv.return_value = emulator_host
with mock.patch("os.environ", {BIGTABLE_EMULATOR: emulator_host}):
with mock.patch("grpc.secure_channel") as factory:
client = self._make_one(project=self.PROJECT, credentials=credentials)
# don't test local_composite_credentials
client._local_composite_credentials = lambda: credentials
# channels are formed when needed, so access a client
# create a gapic channel
client.table_data_client

self.assertEqual(client._emulator_host, emulator_host)
self.assertIs(client._emulator_channel, factory.return_value)
factory.assert_called_once_with(
target=emulator_host,
options={
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)
getenv.assert_called_once_with(BIGTABLE_EMULATOR)
options = {
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items()
factory.assert_called_once_with(emulator_host, credentials, options=options)

def test__get_scopes_default(self):
from google.cloud.bigtable.client import DATA_SCOPE
Expand Down

0 comments on commit 1a31826

Please sign in to comment.