Skip to content

Commit

Permalink
fix: shutdown event loop if publisher fails to start and set exceptio…
Browse files Browse the repository at this point in the history
…n on result future (#124)

* feat: adding ability to create subscriptions at head

* fix: lint errors

* fix: remove absl dependency

* fix: lint

* feat: use default keyword args

* fix: rename offset location to backlog location

* fix: broken samples

* fix: do not crash if pubsublite distribution can not be found when extracting semver

* fix: properly shutdown event loop when failing to initialize publisher

* fix: ensure proper shutdown on failure

* fix: remove unused dep

* fix: adding tests and requested changes
  • Loading branch information
hannahrogers-google committed Jun 2, 2021
1 parent 8edef67 commit c2c2b00
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 8 deletions.
Expand Up @@ -50,15 +50,28 @@ def publish(
) -> "Future[str]":
if isinstance(topic, str):
topic = TopicPath.parse(topic)
publisher = self._multiplexer.get_or_create(
topic, lambda: self._publisher_factory(topic).__enter__()
)
try:
publisher = self._multiplexer.get_or_create(
topic, lambda: self._create_and_start_publisher(topic)
)
except GoogleAPICallError as e:
failed = Future()
failed.set_exception(e)
return failed
future = publisher.publish(data=data, ordering_key=ordering_key, **attrs)
future.add_done_callback(
lambda fut: self._on_future_completion(topic, publisher, fut)
)
return future

def _create_and_start_publisher(self, topic: Union[TopicPath, str]):
publisher = self._publisher_factory(topic)
try:
return publisher.__enter__()
except GoogleAPICallError:
publisher.__exit__(None, None, None)
raise

def _on_future_completion(
self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]"
):
Expand Down
Expand Up @@ -42,6 +42,7 @@ def __init__(
self._publisher_factory = publisher_factory
self._policy_factory = policy_factory
self._watcher = watcher
self._partition_count_poller = None

async def __aenter__(self):
try:
Expand All @@ -56,9 +57,10 @@ async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._partition_count_poller.cancel()
await wait_ignore_cancelled(self._partition_count_poller)
await self._watcher.__aexit__(exc_type, exc_val, exc_tb)
if self._partition_count_poller is not None:
self._partition_count_poller.cancel()
await wait_ignore_cancelled(self._partition_count_poller)
await self._watcher.__aexit__(exc_type, exc_val, exc_tb)
for publisher in self._publishers.values():
await publisher.__aexit__(exc_type, exc_val, exc_tb)

Expand Down
Empty file added log.txt
Empty file.
@@ -0,0 +1,78 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from asynctest.mock import MagicMock
import pytest

from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_publisher_client import (
MultiplexedPublisherClient,
)
from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
SinglePublisher,
)
from google.cloud.pubsublite.types import TopicPath
from google.api_core.exceptions import GoogleAPICallError


@pytest.fixture()
def topic1():
return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic1")


@pytest.fixture()
def topic2():
return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic2")


@pytest.fixture()
def topic1_publisher():
topic1_publisher = MagicMock(spec=SinglePublisher)
return topic1_publisher


@pytest.fixture()
def topic2_publisher():
topic2_publisher = MagicMock(spec=SinglePublisher)
return topic2_publisher


@pytest.fixture()
def multiplexed_publisher(topic1, topic1_publisher, topic2_publisher):
return MultiplexedPublisherClient(
lambda topic: topic1_publisher if topic == topic1 else topic2_publisher
)


def test_multiplexed_publish(
topic1, topic2, topic1_publisher, topic2_publisher, multiplexed_publisher
):
topic1_publisher.__enter__.return_value = topic1_publisher
topic2_publisher.__enter__.return_value = topic2_publisher
with multiplexed_publisher:
multiplexed_publisher.publish(topic1, data=b"abc")
topic1_publisher.__enter__.assert_called_once()
topic1_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="")
multiplexed_publisher.publish(topic2, data=b"abc")
topic2_publisher.__enter__.assert_called_once()
topic2_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="")
topic1_publisher.__exit__.assert_called_once()
topic2_publisher.__exit__.assert_called_once()


def test_publisher_init_failure(topic1, topic1_publisher, multiplexed_publisher):
topic1_publisher.__enter__.side_effect = GoogleAPICallError("error")
with multiplexed_publisher:
future = multiplexed_publisher.publish(topic1, data=b"abc")
with pytest.raises(GoogleAPICallError):
future.result()
Expand Up @@ -66,10 +66,10 @@ async def test_init(mock_watcher, publisher):
async def test_failed_init(mock_watcher, publisher):
mock_watcher.get_partition_count.side_effect = GoogleAPICallError("error")
with pytest.raises(GoogleAPICallError):
async with publisher:
pass
await publisher.__aenter__()
mock_watcher.__aenter__.assert_called_once()
mock_watcher.__aexit__.assert_called_once()
await publisher.__aexit__(None, None, None)


async def test_simple_publish(mock_publishers, mock_policies, mock_watcher, publisher):
Expand Down

0 comments on commit c2c2b00

Please sign in to comment.