Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Increment ack generation id #203

Merged
merged 2 commits into from Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -96,7 +96,7 @@ def __init__(

async def handle_reset(self):
# Increment ack generation id to ignore unacked messages.
++self._ack_generation_id
self._ack_generation_id += 1
await self._ack_set_tracker.clear_and_commit()

async def read(self) -> Message:
Expand All @@ -123,6 +123,9 @@ async def read(self) -> Message:
raise e

async def _handle_ack(self, message: requests.AckRequest):
if message.ack_id not in self._messages_by_ack_id:
# Ignore duplicate acks.
return
await self._underlying.allow_flow(
FlowControlRequest(
allowed_messages=1,
Expand Down
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import json
from typing import Callable

from asynctest.mock import MagicMock, call
Expand Down Expand Up @@ -47,6 +48,10 @@ def mock_async_context_manager(cm):
return cm


def ack_id(generation, offset) -> str:
return json.dumps({"generation": generation, "offset": offset})


@pytest.fixture()
def underlying():
return mock_async_context_manager(MagicMock(spec=Subscriber))
Expand Down Expand Up @@ -140,6 +145,8 @@ async def test_ack(
await ack_called_queue.get()
await ack_result_queue.put(None)
ack_set_tracker.ack.assert_has_calls([call(2)])
# Note: verifies duplicate ack for read_2 ignored.
read_2.ack()
read_1.ack()
await ack_called_queue.get()
await ack_result_queue.put(None)
Expand Down Expand Up @@ -259,28 +266,21 @@ async def test_handle_reset(
read_1: Message = await subscriber.read()
ack_set_tracker.track.assert_has_calls([call(1)])
assert read_1.message_id == "1"
assert read_1.ack_id == ack_id(0, 1)

await subscriber.handle_reset()
ack_set_tracker.clear_and_commit.assert_called_once()

# After reset, flow control tokens of unacked messages are refilled,
# but offset not committed.
# Message ACKed after reset. Its flow control tokens are refilled
# but offset not committed (verified below after message 2).
read_1.ack()
await ack_called_queue.get()
await ack_result_queue.put(None)
underlying.allow_flow.assert_has_calls(
[
call(FlowControlRequest(allowed_messages=1000, allowed_bytes=1000,)),
call(FlowControlRequest(allowed_messages=1, allowed_bytes=5,)),
]
)
ack_set_tracker.ack.assert_has_calls([])

message_2 = SequencedMessage(cursor=Cursor(offset=2), size_bytes=10)
underlying.read.return_value = message_2
read_2: Message = await subscriber.read()
ack_set_tracker.track.assert_has_calls([call(1), call(2)])
assert read_2.message_id == "2"
assert read_2.ack_id == ack_id(1, 2)
read_2.ack()
await ack_called_queue.get()
await ack_result_queue.put(None)
Expand Down