diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index f1e198b1a..f1de9f1f4 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -130,15 +130,19 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs): target=os.environ.get("PUBSUB_EMULATOR_HOST") ) + # The GAPIC client has mTLS logic to determine the api endpoint and the + # ssl credentials to use. Here we create a GAPIC client to help compute the + # api endpoint and ssl credentials. The api endpoint will be used to set + # `self._target`, and ssl credentials will be passed to + # `grpc_helpers.create_channel` to establish a mTLS channel (if ssl + # credentials is not None). client_options = kwargs.get("client_options", None) - if ( - client_options - and "api_endpoint" in client_options - and isinstance(client_options["api_endpoint"], six.string_types) - ): - self._target = client_options["api_endpoint"] - else: - self._target = publisher_client.PublisherClient.SERVICE_ADDRESS + credentials = kwargs.get("credentials", None) + client_for_mtls_info = publisher_client.PublisherClient( + credentials=credentials, client_options=client_options + ) + + self._target = client_for_mtls_info._transport._host # Use a custom channel. # We need this in order to set appropriate default message size and @@ -149,6 +153,7 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs): channel = grpc_helpers.create_channel( credentials=kwargs.pop("credentials", None), target=self.target, + ssl_credentials=client_for_mtls_info._transport._ssl_channel_credentials, scopes=publisher_client.PublisherClient._DEFAULT_SCOPES, options={ "grpc.max_send_message_length": -1, diff --git a/google/cloud/pubsub_v1/subscriber/client.py b/google/cloud/pubsub_v1/subscriber/client.py index e0b10c888..e33a0e2e6 100644 --- a/google/cloud/pubsub_v1/subscriber/client.py +++ b/google/cloud/pubsub_v1/subscriber/client.py @@ -16,7 +16,6 @@ import os import pkg_resources -import six import grpc @@ -82,16 +81,19 @@ def __init__(self, **kwargs): target=os.environ.get("PUBSUB_EMULATOR_HOST") ) - # api_endpoint wont be applied if 'transport' is passed in. + # The GAPIC client has mTLS logic to determine the api endpoint and the + # ssl credentials to use. Here we create a GAPIC client to help compute the + # api endpoint and ssl credentials. The api endpoint will be used to set + # `self._target`, and ssl credentials will be passed to + # `grpc_helpers.create_channel` to establish a mTLS channel (if ssl + # credentials is not None). client_options = kwargs.get("client_options", None) - if ( - client_options - and "api_endpoint" in client_options - and isinstance(client_options["api_endpoint"], six.string_types) - ): - self._target = client_options["api_endpoint"] - else: - self._target = subscriber_client.SubscriberClient.SERVICE_ADDRESS + credentials = kwargs.get("credentials", None) + client_for_mtls_info = subscriber_client.SubscriberClient( + credentials=credentials, client_options=client_options + ) + + self._target = client_for_mtls_info._transport._host # Use a custom channel. # We need this in order to set appropriate default message size and @@ -102,6 +104,7 @@ def __init__(self, **kwargs): channel = grpc_helpers.create_channel( credentials=kwargs.pop("credentials", None), target=self.target, + ssl_credentials=client_for_mtls_info._transport._ssl_channel_credentials, scopes=subscriber_client.SubscriberClient._DEFAULT_SCOPES, options={ "grpc.max_send_message_length": -1, diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 3b6aa1477..0f661c2fa 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -18,6 +18,7 @@ import inspect from google.auth import credentials +import grpc import mock import pytest @@ -81,7 +82,7 @@ def test_init_w_api_endpoint(): assert isinstance(client.api, publisher_client.PublisherClient) assert (client.api._transport.grpc_channel._channel.target()).decode( "utf-8" - ) == "testendpoint.google.com" + ) == "testendpoint.google.com:443" def test_init_w_unicode_api_endpoint(): @@ -91,7 +92,7 @@ def test_init_w_unicode_api_endpoint(): assert isinstance(client.api, publisher_client.PublisherClient) assert (client.api._transport.grpc_channel._channel.target()).decode( "utf-8" - ) == "testendpoint.google.com" + ) == "testendpoint.google.com:443" def test_init_w_empty_client_options(): @@ -104,8 +105,13 @@ def test_init_w_empty_client_options(): def test_init_client_options_pass_through(): + mock_ssl_creds = grpc.ssl_channel_credentials() + def init(self, *args, **kwargs): self.kwargs = kwargs + self._transport = mock.Mock() + self._transport._host = "testendpoint.google.com" + self._transport._ssl_channel_credentials = mock_ssl_creds with mock.patch.object(publisher_client.PublisherClient, "__init__", init): client = publisher.Client( @@ -119,6 +125,8 @@ def init(self, *args, **kwargs): assert client_options.get("quota_project_id") == "42" assert client_options.get("scopes") == [] assert client_options.get("credentials_file") == "file.json" + assert client.target == "testendpoint.google.com" + assert client.api.transport._ssl_channel_credentials == mock_ssl_creds def test_init_emulator(monkeypatch): diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index 634351757..d56289276 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -13,6 +13,7 @@ # limitations under the License. from google.auth import credentials +import grpc import mock from google.cloud.pubsub_v1 import subscriber @@ -42,7 +43,7 @@ def test_init_w_api_endpoint(): assert isinstance(client.api, subscriber_client.SubscriberClient) assert (client.api._transport.grpc_channel._channel.target()).decode( "utf-8" - ) == "testendpoint.google.com" + ) == "testendpoint.google.com:443" def test_init_w_unicode_api_endpoint(): @@ -52,7 +53,7 @@ def test_init_w_unicode_api_endpoint(): assert isinstance(client.api, subscriber_client.SubscriberClient) assert (client.api._transport.grpc_channel._channel.target()).decode( "utf-8" - ) == "testendpoint.google.com" + ) == "testendpoint.google.com:443" def test_init_w_empty_client_options(): @@ -65,8 +66,13 @@ def test_init_w_empty_client_options(): def test_init_client_options_pass_through(): + mock_ssl_creds = grpc.ssl_channel_credentials() + def init(self, *args, **kwargs): self.kwargs = kwargs + self._transport = mock.Mock() + self._transport._host = "testendpoint.google.com" + self._transport._ssl_channel_credentials = mock_ssl_creds with mock.patch.object(subscriber_client.SubscriberClient, "__init__", init): client = subscriber.Client( @@ -80,6 +86,8 @@ def init(self, *args, **kwargs): assert client_options.get("quota_project_id") == "42" assert client_options.get("scopes") == [] assert client_options.get("credentials_file") == "file.json" + assert client.target == "testendpoint.google.com" + assert client.api.transport._ssl_channel_credentials == mock_ssl_creds def test_init_emulator(monkeypatch):