From c2c2b00f0141af6f6d26ff095431de547deab96d Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Tue, 1 Jun 2021 17:31:47 -0700 Subject: [PATCH] fix: shutdown event loop if publisher fails to start and set exception on result future (#124) * feat: adding ability to create subscriptions at head * fix: lint errors * fix: remove absl dependency * fix: lint * feat: use default keyword args * fix: rename offset location to backlog location * fix: broken samples * fix: do not crash if pubsublite distribution can not be found when extracting semver * fix: properly shutdown event loop when failing to initialize publisher * fix: ensure proper shutdown on failure * fix: remove unused dep * fix: adding tests and requested changes --- .../internal/multiplexed_publisher_client.py | 19 ++++- .../partition_count_watching_publisher.py | 8 +- log.txt | 0 .../multiplexed_publisher_client_test.py | 78 +++++++++++++++++++ ...partition_count_watching_publisher_test.py | 4 +- 5 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 log.txt create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index 4bfe9cf5..60f9246b 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -50,15 +50,28 @@ def publish( ) -> "Future[str]": if isinstance(topic, str): topic = TopicPath.parse(topic) - publisher = self._multiplexer.get_or_create( - topic, lambda: self._publisher_factory(topic).__enter__() - ) + try: + publisher = self._multiplexer.get_or_create( + topic, lambda: self._create_and_start_publisher(topic) + ) + except GoogleAPICallError as e: + failed = Future() + failed.set_exception(e) + return failed future = publisher.publish(data=data, ordering_key=ordering_key, **attrs) future.add_done_callback( lambda fut: self._on_future_completion(topic, publisher, fut) ) return future + def _create_and_start_publisher(self, topic: Union[TopicPath, str]): + publisher = self._publisher_factory(topic) + try: + return publisher.__enter__() + except GoogleAPICallError: + publisher.__exit__(None, None, None) + raise + def _on_future_completion( self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]" ): diff --git a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py index ba3a962e..b625a0dc 100644 --- a/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py +++ b/google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py @@ -42,6 +42,7 @@ def __init__( self._publisher_factory = publisher_factory self._policy_factory = policy_factory self._watcher = watcher + self._partition_count_poller = None async def __aenter__(self): try: @@ -56,9 +57,10 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - self._partition_count_poller.cancel() - await wait_ignore_cancelled(self._partition_count_poller) - await self._watcher.__aexit__(exc_type, exc_val, exc_tb) + if self._partition_count_poller is not None: + self._partition_count_poller.cancel() + await wait_ignore_cancelled(self._partition_count_poller) + await self._watcher.__aexit__(exc_type, exc_val, exc_tb) for publisher in self._publishers.values(): await publisher.__aexit__(exc_type, exc_val, exc_tb) diff --git a/log.txt b/log.txt new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py new file mode 100644 index 00000000..0b226257 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_publisher_client_test.py @@ -0,0 +1,78 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from asynctest.mock import MagicMock +import pytest + +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_publisher_client import ( + MultiplexedPublisherClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + SinglePublisher, +) +from google.cloud.pubsublite.types import TopicPath +from google.api_core.exceptions import GoogleAPICallError + + +@pytest.fixture() +def topic1(): + return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic1") + + +@pytest.fixture() +def topic2(): + return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic2") + + +@pytest.fixture() +def topic1_publisher(): + topic1_publisher = MagicMock(spec=SinglePublisher) + return topic1_publisher + + +@pytest.fixture() +def topic2_publisher(): + topic2_publisher = MagicMock(spec=SinglePublisher) + return topic2_publisher + + +@pytest.fixture() +def multiplexed_publisher(topic1, topic1_publisher, topic2_publisher): + return MultiplexedPublisherClient( + lambda topic: topic1_publisher if topic == topic1 else topic2_publisher + ) + + +def test_multiplexed_publish( + topic1, topic2, topic1_publisher, topic2_publisher, multiplexed_publisher +): + topic1_publisher.__enter__.return_value = topic1_publisher + topic2_publisher.__enter__.return_value = topic2_publisher + with multiplexed_publisher: + multiplexed_publisher.publish(topic1, data=b"abc") + topic1_publisher.__enter__.assert_called_once() + topic1_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="") + multiplexed_publisher.publish(topic2, data=b"abc") + topic2_publisher.__enter__.assert_called_once() + topic2_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="") + topic1_publisher.__exit__.assert_called_once() + topic2_publisher.__exit__.assert_called_once() + + +def test_publisher_init_failure(topic1, topic1_publisher, multiplexed_publisher): + topic1_publisher.__enter__.side_effect = GoogleAPICallError("error") + with multiplexed_publisher: + future = multiplexed_publisher.publish(topic1, data=b"abc") + with pytest.raises(GoogleAPICallError): + future.result() diff --git a/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py b/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py index a31f12fa..ec9f6f4e 100644 --- a/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py +++ b/tests/unit/pubsublite/internal/wire/partition_count_watching_publisher_test.py @@ -66,10 +66,10 @@ async def test_init(mock_watcher, publisher): async def test_failed_init(mock_watcher, publisher): mock_watcher.get_partition_count.side_effect = GoogleAPICallError("error") with pytest.raises(GoogleAPICallError): - async with publisher: - pass + await publisher.__aenter__() mock_watcher.__aenter__.assert_called_once() mock_watcher.__aexit__.assert_called_once() + await publisher.__aexit__(None, None, None) async def test_simple_publish(mock_publishers, mock_policies, mock_watcher, publisher):