Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shutdown event loop if publisher fails to start and set exception on result future #124

Merged
merged 19 commits into from Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
03951be
feat: adding ability to create subscriptions at head
hannahrogers-google Feb 24, 2021
2064449
Merge branch 'master' of github.com:googleapis/python-pubsublite into…
hannahrogers-google Feb 24, 2021
94d0fed
fix: lint errors
hannahrogers-google Feb 24, 2021
730a278
fix: remove absl dependency
hannahrogers-google Mar 4, 2021
1e508b2
Merge branch 'master' of github.com:googleapis/python-pubsublite into…
hannahrogers-google Mar 11, 2021
cfad1e0
fix: lint
hannahrogers-google Mar 16, 2021
92609d3
Merge pull request #1 from hannahrogers-google/seek-to-head
hannahrogers-google Mar 16, 2021
02694ad
feat: use default keyword args
hannahrogers-google Mar 16, 2021
13a571a
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google Mar 16, 2021
abf1a4f
fix: rename offset location to backlog location
hannahrogers-google Mar 17, 2021
c7279ad
fix: broken samples
hannahrogers-google Mar 18, 2021
bb03a4d
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google Apr 21, 2021
700fb7d
fix: do not crash if pubsublite distribution can not be found when ex…
hannahrogers-google Apr 21, 2021
bdee7eb
fix: properly shutdown event loop when failing to initialize publisher
hannahrogers-google Apr 26, 2021
71b7e72
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google Apr 26, 2021
df63818
fix: ensure proper shutdown on failure
hannahrogers-google May 5, 2021
cc10147
Merge branch 'master' of github.com:googleapis/python-pubsublite
hannahrogers-google May 5, 2021
1e56434
fix: remove unused dep
hannahrogers-google May 5, 2021
2af6cbb
fix: adding tests and requested changes
hannahrogers-google May 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,15 +50,28 @@ def publish(
) -> Future:
if isinstance(topic, str):
topic = TopicPath.parse(topic)
publisher = self._multiplexer.get_or_create(
topic, lambda: self._publisher_factory(topic).__enter__()
)
try:
publisher = self._multiplexer.get_or_create(
topic, lambda: self._create_and_start_publisher(topic)
)
except GoogleAPICallError as e:
failed = Future()
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
failed.set_exception(e)
return failed
future = publisher.publish(data=data, ordering_key=ordering_key, **attrs)
future.add_done_callback(
lambda fut: self._on_future_completion(topic, publisher, fut)
)
return future

def _create_and_start_publisher(self, topic: Union[TopicPath, str]):
publisher = self._publisher_factory(topic)
try:
publisher.__enter__()
except GoogleAPICallError:
publisher.__exit__(None, None, None)
raise

def _on_future_completion(
self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]"
):
Expand Down
Expand Up @@ -56,8 +56,9 @@ async def __aenter__(self):
return self
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._partition_count_poller.cancel()
await wait_ignore_cancelled(self._partition_count_poller)
if hasattr(self, '_partition_count_poller'):
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
self._partition_count_poller.cancel()
await wait_ignore_cancelled(self._partition_count_poller)
await self._watcher.__aexit__(exc_type, exc_val, exc_tb)
for publisher in self._publishers.values():
await publisher.__aexit__(exc_type, exc_val, exc_tb)
Expand Down