New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: shutdown event loop if publisher fails to start and set exception on result future #124
Conversation
feat: adding ability to create subscriptions at head
try: | ||
self._managed_loop.submit(self._underlying.__aenter__()).result() | ||
except GoogleAPICallError: | ||
self._managed_loop.__exit__(None, None, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't actually do this. This will result in the managed loop being closed twice when a context manager is used. You shouldn't try to catch this exception- the caller still has to call exit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, the problem is that we are not calling exit on SinglePublisherImpl from the MultiplexedPublisherClient.
The MultiplexedPublisherClient attempts to get_or_create a SinglePublisherImpl here:
python-pubsublite/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py
Line 53 in aa52ef4
publisher = self._multiplexer.get_or_create( |
The ClientMultiplexer will attempt to create and start the client here:
python-pubsublite/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py
Line 43 in aa52ef4
self._live_clients[key] = factory() |
But, in the Not Found topic case, the client fails on enter because the call to getTopicPartitions fails. So, this raises an exception and the new publisher client is never assigned to live_clients[key].
When exit is eventually called on the ClientMultiplexer, we will never call exit on the 'not found' client because it was never assigned to live_clients[key]:
python-pubsublite/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py
Line 74 in aa52ef4
self._closer(client) |
Therefore, we will never run exit on the managed_loop. I will re-work this PR to address the real issue.
google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py
Show resolved
Hide resolved
google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py
Outdated
Show resolved
Hide resolved
google/cloud/pubsublite/internal/wire/partition_count_watching_publisher.py
Show resolved
Hide resolved
@dpcollins-google Is this ok to merge? It seems like it but just want to check. |
When publishing to a topic that does not exist, the publisher fails to start because the call to get topic partitions fails with 'Not found'. This change does the following: