New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement AckSetTracker which tracks message acknowledgements. #19
Conversation
Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.
self._acks = queue.PriorityQueue() | ||
|
||
def track(self, offset: int): | ||
if len(self._receipts) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leaves the AckTracker in an inconsistent state on exceptions. Also, there is no need to pop and push, you can just access the first element with [0].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to pop and push
Done.
leaves the AckTracker in an inconsistent state on exceptions
Not quite sure what you mean here, can you elaborate?
async def ack(self, offset: int): | ||
self._acks.put_nowait(offset) | ||
prefix_acked_offset: Optional[int] = None | ||
while len(self._receipts) != 0 and not self._acks.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you peek the queues instead of popping and pushing?
PriorityQueue.queue[index] and deque[index].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can, but this seems super implicit as it relies on 1) the fact that Priorityqueue class has a queue submember (undocumented) and 2) the PriorityQueue class uses the heapq module and the element at index 0 of a queue managed by that module is the minimum one. I don't like that, but can make this change if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that's fair. Let's leave as is.
@@ -0,0 +1,46 @@ | |||
from asynctest.mock import MagicMock, CoroutineMock, call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoroutineMock is unused, unless it does something to the global state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py
Outdated
Show resolved
Hide resolved
@abstractmethod | ||
def track(self, offset: int): | ||
""" | ||
Track the provided offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: leave a comment in the interface saying that these should be strictly increasing? It would be nice to include the basic behavior on the class description; i.e. that it commits contiguous ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
async def ack(self, offset: int): | ||
self._acks.put_nowait(offset) | ||
prefix_acked_offset: Optional[int] = None | ||
while len(self._receipts) != 0 and not self._acks.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that's fair. Let's leave as is.
Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.