Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement SinglePartitionSubscriber. #22

Merged
merged 3 commits into from Sep 24, 2020

Conversation

dpcollins-google
Copy link
Collaborator

This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.

This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.
@dpcollins-google dpcollins-google requested a review from a team as a code owner September 23, 2020 01:59
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Sep 23, 2020
sized_message = self._messages_by_offset[offset]
try:
self._nack_handler.on_nack(sized_message.message,
lambda: self._queue.put(requests.AckRequest(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to use AckRequests? It seems simpler to just call _handle_ack() here. What am I missing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_handle_ack is a coroutine. the nack handler function is a Callable[[PubsubMessage], None] to enable it being called from other threads that are not part of the event loop. It would be harder to call _handle_ack from here than it would be to just put an AckRequest on the queue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self.fail(e)

async def _handle_queue_message(self, message: Union[
requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the queue really have all these kinds of messages? Do we ever want them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can. This is the protocol I backed out from here https://github.com/googleapis/python-pubsub/blob/master/google/cloud/pubsub_v1/subscriber/message.py

Technically drop and modack requests can be sent from the message, but as you can see on the line below we immediately fail the client if those requests are sent.

else:
self._handle_nack(message)

async def _looper(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use asyncio's default event_loop for asynchronicity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does use asyncio's default event loop? This is just a coroutine running in it that polls for a queue.Queue to have a message. There is asyncio.Queue (used elsewhere) which you can await on, but Message.ack/nack (from CPS' client library) needs to be able to be called from other threads, so it uses the threadsafe queue instead of the asyncio-enabled one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,11 @@
from typing import NamedTuple

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta comment; do you mind setting your editor's line length limit to 100 lines? The line breaks in your PRs look quite unreadable; this is fine, but I would appreciate it for the next ones.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking with tianzi, there's an auto-formatter that exists. I'll run that after this chain of prs is submitted since it would be quite difficult to fix the commit chain history :/ I hope thats acceptable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, totally fine. Thanks.

else:
self._handle_nack(message)

async def _looper(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?

sized_message = self._messages_by_offset[offset]
try:
self._nack_handler.on_nack(sized_message.message,
lambda: self._queue.put(requests.AckRequest(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.

@@ -0,0 +1,11 @@
from typing import NamedTuple

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, totally fine. Thanks.

@dpcollins-google dpcollins-google merged commit bb76d90 into master Sep 24, 2020
@anguillanneuf anguillanneuf deleted the single_partition_subscriber branch March 25, 2022 22:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants