diff --git a/google/cloud/bigtable/client.py b/google/cloud/bigtable/client.py index 2ee6e7c77..0d255f172 100644 --- a/google/cloud/bigtable/client.py +++ b/google/cloud/bigtable/client.py @@ -35,6 +35,11 @@ from google.cloud import bigtable_v2 from google.cloud import bigtable_admin_v2 +from google.cloud.bigtable_v2.gapic.transports import bigtable_grpc_transport +from google.cloud.bigtable_admin_v2.gapic.transports import ( + bigtable_table_admin_grpc_transport, + bigtable_instance_admin_grpc_transport, +) from google.cloud.bigtable import __version__ from google.cloud.bigtable.instance import Instance @@ -60,13 +65,14 @@ """Scope for reading table data.""" -def _create_gapic_client(client_class, client_options=None): +def _create_gapic_client(client_class, client_options=None, transport=None): def inner(self): if self._emulator_host is None: return client_class( - credentials=self._credentials, + credentials=None, client_info=self._client_info, client_options=client_options, + transport=transport, ) else: return client_class( @@ -161,7 +167,13 @@ def __init__( self._emulator_channel = None if self._emulator_host is not None: - self._emulator_channel = grpc.insecure_channel(self._emulator_host) + self._emulator_channel = grpc.insecure_channel( + target=self._emulator_host, + options={ + "grpc.keepalive_time_ms": 30000, + "grpc.keepalive_timeout_ms": 10000, + }.items(), + ) if channel is not None: warnings.warn( @@ -196,6 +208,29 @@ def _get_scopes(self): return scopes + def _create_gapic_client_channel(self, client_class, grpc_transport): + if self._client_options and self._client_options.api_endpoint: + api_endpoint = self._client_options.api_endpoint + else: + api_endpoint = client_class.SERVICE_ADDRESS + + channel = grpc_transport.create_channel( + api_endpoint, + self._credentials, + options={ + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, + "grpc.keepalive_time_ms": 30000, + "grpc.keepalive_timeout_ms": 10000, + }.items(), + ) + transport = grpc_transport( + address=api_endpoint, + channel=channel, + credentials=None, + ) + return transport + @property def project_path(self): """Project name to be used with Instance Admin API. @@ -236,8 +271,14 @@ def table_data_client(self): :returns: A BigtableClient object. """ if self._table_data_client is None: + transport = self._create_gapic_client_channel( + bigtable_v2.BigtableClient, + bigtable_grpc_transport.BigtableGrpcTransport, + ) klass = _create_gapic_client( - bigtable_v2.BigtableClient, client_options=self._client_options + bigtable_v2.BigtableClient, + client_options=self._client_options, + transport=transport, ) self._table_data_client = klass(self) return self._table_data_client @@ -262,9 +303,15 @@ def table_admin_client(self): if self._table_admin_client is None: if not self._admin: raise ValueError("Client is not an admin client.") + + transport = self._create_gapic_client_channel( + bigtable_admin_v2.BigtableTableAdminClient, + bigtable_table_admin_grpc_transport.BigtableTableAdminGrpcTransport, + ) klass = _create_gapic_client( bigtable_admin_v2.BigtableTableAdminClient, client_options=self._admin_client_options, + transport=transport, ) self._table_admin_client = klass(self) return self._table_admin_client @@ -289,9 +336,15 @@ def instance_admin_client(self): if self._instance_admin_client is None: if not self._admin: raise ValueError("Client is not an admin client.") + + transport = self._create_gapic_client_channel( + bigtable_admin_v2.BigtableInstanceAdminClient, + bigtable_instance_admin_grpc_transport.BigtableInstanceAdminGrpcTransport, + ) klass = _create_gapic_client( bigtable_admin_v2.BigtableInstanceAdminClient, client_options=self._admin_client_options, + transport=transport, ) self._instance_admin_client = klass(self) return self._instance_admin_client diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 204e1a5c1..21ec479d0 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -31,14 +31,16 @@ def test_wo_emulator(self): credentials = _make_credentials() client = _Client(credentials) client_info = client._client_info = mock.Mock() + transport = mock.Mock() - result = self._invoke_client_factory(client_class)(client) + result = self._invoke_client_factory(client_class, transport=transport)(client) self.assertIs(result, client_class.return_value) client_class.assert_called_once_with( - credentials=client._credentials, + credentials=None, client_info=client_info, client_options=None, + transport=transport, ) def test_wo_emulator_w_client_options(self): @@ -47,16 +49,18 @@ def test_wo_emulator_w_client_options(self): client = _Client(credentials) client_info = client._client_info = mock.Mock() client_options = mock.Mock() + transport = mock.Mock() result = self._invoke_client_factory( - client_class, client_options=client_options + client_class, client_options=client_options, transport=transport )(client) self.assertIs(result, client_class.return_value) client_class.assert_called_once_with( - credentials=client._credentials, + credentials=None, client_info=client_info, client_options=client_options, + transport=transport, ) def test_w_emulator(self): @@ -170,7 +174,13 @@ def test_constructor_with_emulator_host(self): self.assertEqual(client._emulator_host, emulator_host) self.assertIs(client._emulator_channel, factory.return_value) - factory.assert_called_once_with(emulator_host) + factory.assert_called_once_with( + target=emulator_host, + options={ + "grpc.keepalive_time_ms": 30000, + "grpc.keepalive_timeout_ms": 10000, + }.items(), + ) getenv.assert_called_once_with(BIGTABLE_EMULATOR) def test__get_scopes_default(self): @@ -234,7 +244,9 @@ def test_table_data_client_not_initialized_w_client_options(self): from google.api_core.client_options import ClientOptions credentials = _make_credentials() - client_options = ClientOptions(quota_project_id="QUOTA-PROJECT") + client_options = ClientOptions( + quota_project_id="QUOTA-PROJECT", api_endpoint="xyz" + ) client = self._make_one( project=self.PROJECT, credentials=credentials, client_options=client_options ) @@ -245,9 +257,11 @@ def test_table_data_client_not_initialized_w_client_options(self): self.assertIs(table_data_client, mocked.return_value) self.assertIs(client._table_data_client, table_data_client) + mocked.assert_called_once_with( client_info=client._client_info, - credentials=mock.ANY, # added scopes + credentials=None, + transport=mock.ANY, client_options=client_options, ) @@ -308,6 +322,7 @@ def test_table_admin_client_not_initialized_w_client_options(self): admin_client_options=admin_client_options, ) + client._create_gapic_client_channel = mock.Mock() patch = mock.patch("google.cloud.bigtable_admin_v2.BigtableTableAdminClient") with patch as mocked: table_admin_client = client.table_admin_client @@ -316,7 +331,8 @@ def test_table_admin_client_not_initialized_w_client_options(self): self.assertIs(client._table_admin_client, table_admin_client) mocked.assert_called_once_with( client_info=client._client_info, - credentials=mock.ANY, # added scopes + credentials=None, + transport=mock.ANY, client_options=admin_client_options, ) @@ -377,6 +393,7 @@ def test_instance_admin_client_not_initialized_w_client_options(self): admin_client_options=admin_client_options, ) + client._create_gapic_client_channel = mock.Mock() patch = mock.patch("google.cloud.bigtable_admin_v2.BigtableInstanceAdminClient") with patch as mocked: instance_admin_client = client.instance_admin_client @@ -385,7 +402,8 @@ def test_instance_admin_client_not_initialized_w_client_options(self): self.assertIs(client._instance_admin_client, instance_admin_client) mocked.assert_called_once_with( client_info=client._client_info, - credentials=mock.ANY, # added scopes + credentials=None, + transport=mock.ANY, client_options=admin_client_options, )