Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shutdown event loop if publisher fails to start and set exception on result future #124

Merged
merged 19 commits into from Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
03951be
feat: adding ability to create subscriptions at head
hannahrogers-google Feb 24, 2021
2064449
Merge branch 'master' of github.com:googleapis/python-pubsublite into…
hannahrogers-google Feb 24, 2021
94d0fed
fix: lint errors
hannahrogers-google Feb 24, 2021
730a278
fix: remove absl dependency
hannahrogers-google Mar 4, 2021
1e508b2
Merge branch 'master' of github.com:googleapis/python-pubsublite into…
hannahrogers-google Mar 11, 2021
cfad1e0
fix: lint
hannahrogers-google Mar 16, 2021
92609d3
Merge pull request #1 from hannahrogers-google/seek-to-head
hannahrogers-google Mar 16, 2021
02694ad
feat: use default keyword args
hannahrogers-google Mar 16, 2021
13a571a
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google Mar 16, 2021
abf1a4f
fix: rename offset location to backlog location
hannahrogers-google Mar 17, 2021
c7279ad
fix: broken samples
hannahrogers-google Mar 18, 2021
bb03a4d
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google Apr 21, 2021
700fb7d
fix: do not crash if pubsublite distribution can not be found when ex…
hannahrogers-google Apr 21, 2021
bdee7eb
fix: properly shutdown event loop when failing to initialize publisher
hannahrogers-google Apr 26, 2021
71b7e72
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google Apr 26, 2021
df63818
fix: ensure proper shutdown on failure
hannahrogers-google May 5, 2021
cc10147
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google May 5, 2021
1e56434
fix: remove unused dep
hannahrogers-google May 5, 2021
2af6cbb
fix: adding tests and requested changes
hannahrogers-google May 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,15 +50,28 @@ def publish(
) -> Future:
if isinstance(topic, str):
topic = TopicPath.parse(topic)
publisher = self._multiplexer.get_or_create(
topic, lambda: self._publisher_factory(topic).__enter__()
)
try:
publisher = self._multiplexer.get_or_create(
topic, lambda: self._create_and_start_publisher(topic)
)
except GoogleAPICallError as e:
failed = Future()
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
failed.set_exception(e)
return failed
future = publisher.publish(data=data, ordering_key=ordering_key, **attrs)
future.add_done_callback(
lambda fut: self._on_future_completion(topic, publisher, fut)
)
return future

def _create_and_start_publisher(self, topic: Union[TopicPath, str]):
publisher = self._publisher_factory(topic)
try:
return publisher.__enter__()
except GoogleAPICallError:
publisher.__exit__(None, None, None)
raise

def _on_future_completion(
self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]"
):
Expand Down
Expand Up @@ -42,6 +42,7 @@ def __init__(
self._publisher_factory = publisher_factory
self._policy_factory = policy_factory
self._watcher = watcher
self._partition_count_poller = None

async def __aenter__(self):
try:
Expand All @@ -56,9 +57,10 @@ async def __aenter__(self):
return self
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._partition_count_poller.cancel()
await wait_ignore_cancelled(self._partition_count_poller)
await self._watcher.__aexit__(exc_type, exc_val, exc_tb)
if self._partition_count_poller is not None:
self._partition_count_poller.cancel()
await wait_ignore_cancelled(self._partition_count_poller)
await self._watcher.__aexit__(exc_type, exc_val, exc_tb)
for publisher in self._publishers.values():
await publisher.__aexit__(exc_type, exc_val, exc_tb)

Expand Down
Empty file added log.txt
Empty file.
@@ -0,0 +1,78 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from asynctest.mock import MagicMock
import pytest

from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_publisher_client import (
MultiplexedPublisherClient,
)
from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
SinglePublisher,
)
from google.cloud.pubsublite.types import TopicPath
from google.api_core.exceptions import GoogleAPICallError


@pytest.fixture()
def topic1():
return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic1")


@pytest.fixture()
def topic2():
return TopicPath.parse("projects/1/locations/us-central1-a/topics/topic2")


@pytest.fixture()
def topic1_publisher():
topic1_publisher = MagicMock(spec=SinglePublisher)
return topic1_publisher


@pytest.fixture()
def topic2_publisher():
topic2_publisher = MagicMock(spec=SinglePublisher)
return topic2_publisher


@pytest.fixture()
def multiplexed_publisher(topic1, topic1_publisher, topic2_publisher):
return MultiplexedPublisherClient(
lambda topic: topic1_publisher if topic == topic1 else topic2_publisher
)


def test_multiplexed_publish(
topic1, topic2, topic1_publisher, topic2_publisher, multiplexed_publisher
):
topic1_publisher.__enter__.return_value = topic1_publisher
topic2_publisher.__enter__.return_value = topic2_publisher
with multiplexed_publisher:
multiplexed_publisher.publish(topic1, data=b"abc")
topic1_publisher.__enter__.assert_called_once()
topic1_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="")
multiplexed_publisher.publish(topic2, data=b"abc")
topic2_publisher.__enter__.assert_called_once()
topic2_publisher.publish.assert_called_once_with(data=b"abc", ordering_key="")
topic1_publisher.__exit__.assert_called_once()
topic2_publisher.__exit__.assert_called_once()


def test_publisher_init_failure(topic1, topic1_publisher, multiplexed_publisher):
topic1_publisher.__enter__.side_effect = GoogleAPICallError("error")
with multiplexed_publisher:
future = multiplexed_publisher.publish(topic1, data=b"abc")
with pytest.raises(GoogleAPICallError):
future.result()
Expand Up @@ -66,10 +66,10 @@ async def test_init(mock_watcher, publisher):
async def test_failed_init(mock_watcher, publisher):
mock_watcher.get_partition_count.side_effect = GoogleAPICallError("error")
with pytest.raises(GoogleAPICallError):
async with publisher:
pass
await publisher.__aenter__()
mock_watcher.__aenter__.assert_called_once()
mock_watcher.__aexit__.assert_called_once()
await publisher.__aexit__(None, None, None)


async def test_simple_publish(mock_publishers, mock_policies, mock_watcher, publisher):
Expand Down