From 644163d2adc067372c379c617d570497b3f9354e Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 4 Aug 2021 23:30:07 +1000 Subject: [PATCH] fix: Increment ack generation id (#203) * fix: Increment ack generation id Also handle duplicate acks. * Remove duplicate acks change --- .../internal/single_partition_subscriber.py | 2 +- .../single_partition_subscriber_test.py | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py index 1e244aa4..e0df2bfb 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py @@ -96,7 +96,7 @@ def __init__( async def handle_reset(self): # Increment ack generation id to ignore unacked messages. - ++self._ack_generation_id + self._ack_generation_id += 1 await self._ack_set_tracker.clear_and_commit() async def read(self) -> Message: diff --git a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py index f2a43928..4a151484 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import json from typing import Callable from asynctest.mock import MagicMock, call @@ -47,6 +48,10 @@ def mock_async_context_manager(cm): return cm +def ack_id(generation, offset) -> str: + return json.dumps({"generation": generation, "offset": offset}) + + @pytest.fixture() def underlying(): return mock_async_context_manager(MagicMock(spec=Subscriber)) @@ -259,28 +264,21 @@ async def test_handle_reset( read_1: Message = await subscriber.read() ack_set_tracker.track.assert_has_calls([call(1)]) assert read_1.message_id == "1" + assert read_1.ack_id == ack_id(0, 1) await subscriber.handle_reset() ack_set_tracker.clear_and_commit.assert_called_once() - # After reset, flow control tokens of unacked messages are refilled, - # but offset not committed. + # Message ACKed after reset. Its flow control tokens are refilled + # but offset not committed (verified below after message 2). read_1.ack() - await ack_called_queue.get() - await ack_result_queue.put(None) - underlying.allow_flow.assert_has_calls( - [ - call(FlowControlRequest(allowed_messages=1000, allowed_bytes=1000,)), - call(FlowControlRequest(allowed_messages=1, allowed_bytes=5,)), - ] - ) - ack_set_tracker.ack.assert_has_calls([]) message_2 = SequencedMessage(cursor=Cursor(offset=2), size_bytes=10) underlying.read.return_value = message_2 read_2: Message = await subscriber.read() ack_set_tracker.track.assert_has_calls([call(1), call(2)]) assert read_2.message_id == "2" + assert read_2.ack_id == ack_id(1, 2) read_2.ack() await ack_called_queue.get() await ack_result_queue.put(None)