Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement AckSetTracker which tracks message acknowledgements. #19

Merged
merged 2 commits into from Sep 22, 2020

Conversation

dpcollins-google
Copy link
Collaborator

Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.

Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.
@dpcollins-google dpcollins-google requested a review from a team as a code owner September 18, 2020 14:16
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Sep 18, 2020
self._acks = queue.PriorityQueue()

def track(self, offset: int):
if len(self._receipts) > 0:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaves the AckTracker in an inconsistent state on exceptions. Also, there is no need to pop and push, you can just access the first element with [0].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to pop and push

Done.

leaves the AckTracker in an inconsistent state on exceptions

Not quite sure what you mean here, can you elaborate?

async def ack(self, offset: int):
self._acks.put_nowait(offset)
prefix_acked_offset: Optional[int] = None
while len(self._receipts) != 0 and not self._acks.empty():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you peek the queues instead of popping and pushing?

PriorityQueue.queue[index] and deque[index].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can, but this seems super implicit as it relies on 1) the fact that Priorityqueue class has a queue submember (undocumented) and 2) the PriorityQueue class uses the heapq module and the element at index 0 of a queue managed by that module is the minimum one. I don't like that, but can make this change if you prefer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's fair. Let's leave as is.

@@ -0,0 +1,46 @@
from asynctest.mock import MagicMock, CoroutineMock, call

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoroutineMock is unused, unless it does something to the global state?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@abstractmethod
def track(self, offset: int):
"""
Track the provided offset.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: leave a comment in the interface saying that these should be strictly increasing? It would be nice to include the basic behavior on the class description; i.e. that it commits contiguous ranges.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

async def ack(self, offset: int):
self._acks.put_nowait(offset)
prefix_acked_offset: Optional[int] = None
while len(self._receipts) != 0 and not self._acks.empty():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's fair. Let's leave as is.

@dpcollins-google dpcollins-google merged commit 7f88458 into master Sep 22, 2020
@anguillanneuf anguillanneuf deleted the ack_set_tracker branch March 25, 2022 22:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants