From 1a31826e2e378468e057160c07d850ebca1c5879 Mon Sep 17 00:00:00 2001 From: Christopher Wilcox Date: Fri, 19 Mar 2021 13:34:37 -0700 Subject: [PATCH] fix: address issue in establishing an emulator connection (#246) Adjusts emulation code to use a newer method of creating a gRPC channel adds a test scenario to validate emulation. --- google/cloud/bigtable/client.py | 102 ++++++++++++++++++++++---------- noxfile.py | 26 ++++++++ tests/system.py | 14 +++-- tests/unit/test_client.py | 41 +++++++------ 4 files changed, 129 insertions(+), 54 deletions(-) diff --git a/google/cloud/bigtable/client.py b/google/cloud/bigtable/client.py index 5e49934d0..be536f295 100644 --- a/google/cloud/bigtable/client.py +++ b/google/cloud/bigtable/client.py @@ -32,6 +32,7 @@ import grpc from google.api_core.gapic_v1 import client_info +import google.auth from google.cloud import bigtable_v2 from google.cloud import bigtable_admin_v2 @@ -69,17 +70,12 @@ def _create_gapic_client(client_class, client_options=None, transport=None): def inner(self): - if self._emulator_host is None: - return client_class( - credentials=None, - client_info=self._client_info, - client_options=client_options, - transport=transport, - ) - else: - return client_class( - channel=self._emulator_channel, client_info=self._client_info - ) + return client_class( + credentials=None, + client_info=self._client_info, + client_options=client_options, + transport=transport, + ) return inner @@ -166,16 +162,6 @@ def __init__( self._admin = bool(admin) self._client_info = client_info self._emulator_host = os.getenv(BIGTABLE_EMULATOR) - self._emulator_channel = None - - if self._emulator_host is not None: - self._emulator_channel = grpc.insecure_channel( - target=self._emulator_host, - options={ - "grpc.keepalive_time_ms": 30000, - "grpc.keepalive_timeout_ms": 10000, - }.items(), - ) if channel is not None: warnings.warn( @@ -208,22 +194,76 @@ def _get_scopes(self): return scopes + def _emulator_channel(self, transport, options): + """ + Creates a channel using self._credentials in a similar way to grpc.secure_channel but + using grpc.local_channel_credentials() rather than grpc.ssh_channel_credentials() + to allow easy connection to a local emulator. + :return: grpc.Channel or grpc.aio.Channel + """ + # TODO: Implement a special credentials type for emulator and use + # "transport.create_channel" to create gRPC channels once google-auth + # extends it's allowed credentials types. + # Note: this code also exists in the firestore client. + if "GrpcAsyncIOTransport" in str(transport.__name__): + return grpc.aio.secure_channel( + self._emulator_host, + self._local_composite_credentials(), + options=options, + ) + else: + return grpc.secure_channel( + self._emulator_host, + self._local_composite_credentials(), + options=options, + ) + + def _local_composite_credentials(self): + """ + Creates the credentials for the local emulator channel + :return: grpc.ChannelCredentials + """ + credentials = google.auth.credentials.with_scopes_if_required( + self._credentials, None + ) + request = google.auth.transport.requests.Request() + + # Create the metadata plugin for inserting the authorization header. + metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin( + credentials, request + ) + + # Create a set of grpc.CallCredentials using the metadata plugin. + google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin) + + # Using the local_credentials to allow connection to emulator + local_credentials = grpc.local_channel_credentials() + + # Combine the local credentials and the authorization credentials. + return grpc.composite_channel_credentials( + local_credentials, google_auth_credentials + ) + def _create_gapic_client_channel(self, client_class, grpc_transport): + options = { + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, + "grpc.keepalive_time_ms": 30000, + "grpc.keepalive_timeout_ms": 10000, + }.items() if self._client_options and self._client_options.api_endpoint: api_endpoint = self._client_options.api_endpoint else: api_endpoint = client_class.DEFAULT_ENDPOINT - channel = grpc_transport.create_channel( - host=api_endpoint, - credentials=self._credentials, - options={ - "grpc.max_send_message_length": -1, - "grpc.max_receive_message_length": -1, - "grpc.keepalive_time_ms": 30000, - "grpc.keepalive_timeout_ms": 10000, - }.items(), - ) + channel = None + if self._emulator_host is not None: + api_endpoint = self._emulator_host + channel = self._emulator_channel(grpc_transport, options) + else: + channel = grpc_transport.create_channel( + host=api_endpoint, credentials=self._credentials, options=options, + ) transport = grpc_transport(channel=channel, host=api_endpoint) return transport diff --git a/noxfile.py b/noxfile.py index 72b387570..84fbd0583 100644 --- a/noxfile.py +++ b/noxfile.py @@ -33,6 +33,7 @@ # 'docfx' is excluded since it only needs to run in 'docs-presubmit' nox.options.sessions = [ "unit", + "system_emulated", "system", "cover", "lint", @@ -111,6 +112,31 @@ def unit(session): default(session) +@nox.session(python="3.8") +def system_emulated(session): + import subprocess + import signal + + try: + subprocess.call(["gcloud", "--version"]) + except OSError: + session.skip("gcloud not found but required for emulator support") + + # Currently, CI/CD doesn't have beta component of gcloud. + subprocess.call(["gcloud", "components", "install", "beta", "bigtable"]) + + hostport = "localhost:8789" + p = subprocess.Popen( + ["gcloud", "beta", "emulators", "bigtable", "start", "--host-port", hostport] + ) + + session.env["BIGTABLE_EMULATOR_HOST"] = hostport + system(session) + + # Stop Emulator + os.killpg(os.getpgid(p.pid), signal.SIGTERM) + + @nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) def system(session): """Run the system test suite.""" diff --git a/tests/system.py b/tests/system.py index 84f9977e1..21a39eb29 100644 --- a/tests/system.py +++ b/tests/system.py @@ -24,7 +24,8 @@ from google.cloud.environment_vars import BIGTABLE_EMULATOR from test_utils.retry import RetryErrors from test_utils.retry import RetryResult -from test_utils.system import EmulatorCreds + +# from test_utils.system import EmulatorCreds from test_utils.system import unique_resource_id from google.cloud._helpers import _datetime_from_microseconds @@ -114,11 +115,9 @@ def setUpModule(): Config.IN_EMULATOR = os.getenv(BIGTABLE_EMULATOR) is not None - if Config.IN_EMULATOR: - credentials = EmulatorCreds() - Config.CLIENT = Client(admin=True, credentials=credentials) - else: - Config.CLIENT = Client(admin=True) + # Previously we created clients using a mock EmulatorCreds when targeting + # an emulator. + Config.CLIENT = Client(admin=True) Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID, labels=LABELS) Config.CLUSTER = Config.INSTANCE.cluster( @@ -840,6 +839,9 @@ def test_delete_column_family(self): self.assertEqual(temp_table.list_column_families(), {}) def test_backup(self): + if Config.IN_EMULATOR: + self.skipTest("backups are not supported in the emulator") + from google.cloud._helpers import _datetime_to_pb_timestamp temp_table_id = "test-backup-table" diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 5f2d7db26..f6b8eb5bc 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -67,16 +67,23 @@ def test_w_emulator(self): client_class = mock.Mock() emulator_host = emulator_channel = object() credentials = _make_credentials() + client_options = mock.Mock() + transport = mock.Mock() + client = _Client( credentials, emulator_host=emulator_host, emulator_channel=emulator_channel ) client_info = client._client_info = mock.Mock() - - result = self._invoke_client_factory(client_class)(client) + result = self._invoke_client_factory( + client_class, client_options=client_options, transport=transport + )(client) self.assertIs(result, client_class.return_value) client_class.assert_called_once_with( - channel=client._emulator_channel, client_info=client_info + credentials=None, + client_info=client_info, + client_options=client_options, + transport=transport, ) @@ -121,7 +128,6 @@ def test_constructor_defaults(self): self.assertIs(client._client_info, _CLIENT_INFO) self.assertIsNone(client._channel) self.assertIsNone(client._emulator_host) - self.assertIsNone(client._emulator_channel) self.assertEqual(client.SCOPE, (DATA_SCOPE,)) def test_constructor_explicit(self): @@ -167,22 +173,23 @@ def test_constructor_with_emulator_host(self): credentials = _make_credentials() emulator_host = "localhost:8081" - with mock.patch("os.getenv") as getenv: - getenv.return_value = emulator_host - with mock.patch("grpc.insecure_channel") as factory: - getenv.return_value = emulator_host + with mock.patch("os.environ", {BIGTABLE_EMULATOR: emulator_host}): + with mock.patch("grpc.secure_channel") as factory: client = self._make_one(project=self.PROJECT, credentials=credentials) + # don't test local_composite_credentials + client._local_composite_credentials = lambda: credentials + # channels are formed when needed, so access a client + # create a gapic channel + client.table_data_client self.assertEqual(client._emulator_host, emulator_host) - self.assertIs(client._emulator_channel, factory.return_value) - factory.assert_called_once_with( - target=emulator_host, - options={ - "grpc.keepalive_time_ms": 30000, - "grpc.keepalive_timeout_ms": 10000, - }.items(), - ) - getenv.assert_called_once_with(BIGTABLE_EMULATOR) + options = { + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, + "grpc.keepalive_time_ms": 30000, + "grpc.keepalive_timeout_ms": 10000, + }.items() + factory.assert_called_once_with(emulator_host, credentials, options=options) def test__get_scopes_default(self): from google.cloud.bigtable.client import DATA_SCOPE