Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement AckSetTracker which tracks message acknowledgements. #19

Merged
merged 2 commits into from Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Empty file.
Empty file.
32 changes: 32 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py
@@ -0,0 +1,32 @@
from abc import abstractmethod
from typing import AsyncContextManager


class AckSetTracker(AsyncContextManager):
"""
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
is aggregated.
"""
@abstractmethod
def track(self, offset: int):
"""
Track the provided offset.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: leave a comment in the interface saying that these should be strictly increasing? It would be nice to include the basic behavior on the class description; i.e. that it commits contiguous ranges.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Args:
offset: the offset to track.

Raises:
GoogleAPICallError: On an invalid offset to track.
"""

@abstractmethod
async def ack(self, offset: int):
"""
Acknowledge the message with the provided offset. The offset must have previously been tracked.

Args:
offset: the offset to acknowledge.

Returns:
GoogleAPICallError: On a commit failure.
manuelmenzella-google marked this conversation as resolved.
Show resolved Hide resolved
"""
@@ -0,0 +1,52 @@
import queue
from collections import deque
from typing import Optional

from google.api_core.exceptions import FailedPrecondition
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.internal.wire.committer import Committer
from google.cloud.pubsublite_v1 import Cursor


class AckSetTrackerImpl(AckSetTracker):
_committer: Committer

_receipts: "deque[int]"
_acks: "queue.PriorityQueue[int]"

def __init__(self, committer: Committer):
self._committer = committer
self._receipts = deque()
self._acks = queue.PriorityQueue()

def track(self, offset: int):
if len(self._receipts) > 0:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaves the AckTracker in an inconsistent state on exceptions. Also, there is no need to pop and push, you can just access the first element with [0].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to pop and push

Done.

leaves the AckTracker in an inconsistent state on exceptions

Not quite sure what you mean here, can you elaborate?

last = self._receipts[0]
if last >= offset:
manuelmenzella-google marked this conversation as resolved.
Show resolved Hide resolved
raise FailedPrecondition(f"Tried to track message {offset} which is before last tracked message {last}.")
self._receipts.append(offset)

async def ack(self, offset: int):
# Note: put_nowait is used here and below to ensure that the below logic is executed without yielding
# to another coroutine in the event loop. The queue is unbounded so it will never throw.
self._acks.put_nowait(offset)
manuelmenzella-google marked this conversation as resolved.
Show resolved Hide resolved
prefix_acked_offset: Optional[int] = None
while len(self._receipts) != 0 and not self._acks.empty():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you peek the queues instead of popping and pushing?

PriorityQueue.queue[index] and deque[index].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can, but this seems super implicit as it relies on 1) the fact that Priorityqueue class has a queue submember (undocumented) and 2) the PriorityQueue class uses the heapq module and the element at index 0 of a queue managed by that module is the minimum one. I don't like that, but can make this change if you prefer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's fair. Let's leave as is.

receipt = self._receipts.popleft()
ack = self._acks.get_nowait()
if receipt == ack:
prefix_acked_offset = receipt
continue
self._receipts.append(receipt)
self._acks.put(ack)
break
if prefix_acked_offset is None:
return
# Convert from last acked to first unacked.
await self._committer.commit(Cursor(offset=prefix_acked_offset+1))

async def __aenter__(self):
await self._committer.__aenter__()

async def __aexit__(self, exc_type, exc_value, traceback):
await self._committer.__aexit__(exc_type, exc_value, traceback)
Empty file.
Empty file.
@@ -0,0 +1,46 @@
from asynctest.mock import MagicMock, call
import pytest

# All test coroutines will be treated as marked.
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl
from google.cloud.pubsublite.internal.wire.committer import Committer
from google.cloud.pubsublite_v1 import Cursor

pytestmark = pytest.mark.asyncio


@pytest.fixture()
def committer():
committer = MagicMock(spec=Committer)
committer.__aenter__.return_value = committer
return committer


@pytest.fixture()
def tracker(committer):
return AckSetTrackerImpl(committer)


async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker):
async with tracker:
committer.__aenter__.assert_called_once()
tracker.track(offset=1)
tracker.track(offset=3)
tracker.track(offset=5)
tracker.track(offset=7)

committer.commit.assert_has_calls([])
await tracker.ack(offset=3)
committer.commit.assert_has_calls([])
await tracker.ack(offset=5)
committer.commit.assert_has_calls([])
await tracker.ack(offset=1)
committer.commit.assert_has_calls([call(Cursor(offset=6))])

tracker.track(offset=8)
await tracker.ack(offset=7)
committer.commit.assert_has_calls([call(Cursor(offset=6)), call(Cursor(offset=8))])
committer.__aexit__.assert_called_once()