New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement AckSetTracker which tracks message acknowledgements. #19
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
from abc import abstractmethod | ||
from typing import AsyncContextManager | ||
|
||
|
||
class AckSetTracker(AsyncContextManager): | ||
""" | ||
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets | ||
is aggregated. | ||
""" | ||
@abstractmethod | ||
def track(self, offset: int): | ||
""" | ||
Track the provided offset. | ||
|
||
Args: | ||
offset: the offset to track. | ||
|
||
Raises: | ||
GoogleAPICallError: On an invalid offset to track. | ||
""" | ||
|
||
@abstractmethod | ||
async def ack(self, offset: int): | ||
""" | ||
Acknowledge the message with the provided offset. The offset must have previously been tracked. | ||
|
||
Args: | ||
offset: the offset to acknowledge. | ||
|
||
Returns: | ||
GoogleAPICallError: On a commit failure. | ||
manuelmenzella-google marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import queue | ||
from collections import deque | ||
from typing import Optional | ||
|
||
from google.api_core.exceptions import FailedPrecondition | ||
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker | ||
from google.cloud.pubsublite.internal.wire.committer import Committer | ||
from google.cloud.pubsublite_v1 import Cursor | ||
|
||
|
||
class AckSetTrackerImpl(AckSetTracker): | ||
_committer: Committer | ||
|
||
_receipts: "deque[int]" | ||
_acks: "queue.PriorityQueue[int]" | ||
|
||
def __init__(self, committer: Committer): | ||
self._committer = committer | ||
self._receipts = deque() | ||
self._acks = queue.PriorityQueue() | ||
|
||
def track(self, offset: int): | ||
if len(self._receipts) > 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This leaves the AckTracker in an inconsistent state on exceptions. Also, there is no need to pop and push, you can just access the first element with [0]. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done.
Not quite sure what you mean here, can you elaborate? |
||
last = self._receipts[0] | ||
if last >= offset: | ||
manuelmenzella-google marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise FailedPrecondition(f"Tried to track message {offset} which is before last tracked message {last}.") | ||
self._receipts.append(offset) | ||
|
||
async def ack(self, offset: int): | ||
# Note: put_nowait is used here and below to ensure that the below logic is executed without yielding | ||
# to another coroutine in the event loop. The queue is unbounded so it will never throw. | ||
self._acks.put_nowait(offset) | ||
manuelmenzella-google marked this conversation as resolved.
Show resolved
Hide resolved
|
||
prefix_acked_offset: Optional[int] = None | ||
while len(self._receipts) != 0 and not self._acks.empty(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you peek the queues instead of popping and pushing? PriorityQueue.queue[index] and deque[index]. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can, but this seems super implicit as it relies on 1) the fact that Priorityqueue class has a queue submember (undocumented) and 2) the PriorityQueue class uses the heapq module and the element at index 0 of a queue managed by that module is the minimum one. I don't like that, but can make this change if you prefer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think that's fair. Let's leave as is. |
||
receipt = self._receipts.popleft() | ||
ack = self._acks.get_nowait() | ||
if receipt == ack: | ||
prefix_acked_offset = receipt | ||
continue | ||
self._receipts.append(receipt) | ||
self._acks.put(ack) | ||
break | ||
if prefix_acked_offset is None: | ||
return | ||
# Convert from last acked to first unacked. | ||
await self._committer.commit(Cursor(offset=prefix_acked_offset+1)) | ||
|
||
async def __aenter__(self): | ||
await self._committer.__aenter__() | ||
|
||
async def __aexit__(self, exc_type, exc_value, traceback): | ||
await self._committer.__aexit__(exc_type, exc_value, traceback) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from asynctest.mock import MagicMock, call | ||
import pytest | ||
|
||
# All test coroutines will be treated as marked. | ||
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker | ||
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl | ||
from google.cloud.pubsublite.internal.wire.committer import Committer | ||
from google.cloud.pubsublite_v1 import Cursor | ||
|
||
pytestmark = pytest.mark.asyncio | ||
|
||
|
||
@pytest.fixture() | ||
def committer(): | ||
committer = MagicMock(spec=Committer) | ||
committer.__aenter__.return_value = committer | ||
return committer | ||
|
||
|
||
@pytest.fixture() | ||
def tracker(committer): | ||
return AckSetTrackerImpl(committer) | ||
|
||
|
||
async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker): | ||
async with tracker: | ||
committer.__aenter__.assert_called_once() | ||
tracker.track(offset=1) | ||
tracker.track(offset=3) | ||
tracker.track(offset=5) | ||
tracker.track(offset=7) | ||
|
||
committer.commit.assert_has_calls([]) | ||
await tracker.ack(offset=3) | ||
committer.commit.assert_has_calls([]) | ||
await tracker.ack(offset=5) | ||
committer.commit.assert_has_calls([]) | ||
await tracker.ack(offset=1) | ||
committer.commit.assert_has_calls([call(Cursor(offset=6))]) | ||
|
||
tracker.track(offset=8) | ||
await tracker.ack(offset=7) | ||
committer.commit.assert_has_calls([call(Cursor(offset=6)), call(Cursor(offset=8))]) | ||
committer.__aexit__.assert_called_once() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: leave a comment in the interface saying that these should be strictly increasing? It would be nice to include the basic behavior on the class description; i.e. that it commits contiguous ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.