Skip to content

Commit

Permalink
fix: Increment ack generation id (#203)
Browse files Browse the repository at this point in the history
* fix: Increment ack generation id

Also handle duplicate acks.

* Remove duplicate acks change
  • Loading branch information
tmdiep committed Aug 4, 2021
1 parent b93a0bf commit 644163d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
Expand Up @@ -96,7 +96,7 @@ def __init__(

async def handle_reset(self):
# Increment ack generation id to ignore unacked messages.
++self._ack_generation_id
self._ack_generation_id += 1
await self._ack_set_tracker.clear_and_commit()

async def read(self) -> Message:
Expand Down
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import json
from typing import Callable

from asynctest.mock import MagicMock, call
Expand Down Expand Up @@ -47,6 +48,10 @@ def mock_async_context_manager(cm):
return cm


def ack_id(generation, offset) -> str:
return json.dumps({"generation": generation, "offset": offset})


@pytest.fixture()
def underlying():
return mock_async_context_manager(MagicMock(spec=Subscriber))
Expand Down Expand Up @@ -259,28 +264,21 @@ async def test_handle_reset(
read_1: Message = await subscriber.read()
ack_set_tracker.track.assert_has_calls([call(1)])
assert read_1.message_id == "1"
assert read_1.ack_id == ack_id(0, 1)

await subscriber.handle_reset()
ack_set_tracker.clear_and_commit.assert_called_once()

# After reset, flow control tokens of unacked messages are refilled,
# but offset not committed.
# Message ACKed after reset. Its flow control tokens are refilled
# but offset not committed (verified below after message 2).
read_1.ack()
await ack_called_queue.get()
await ack_result_queue.put(None)
underlying.allow_flow.assert_has_calls(
[
call(FlowControlRequest(allowed_messages=1000, allowed_bytes=1000,)),
call(FlowControlRequest(allowed_messages=1, allowed_bytes=5,)),
]
)
ack_set_tracker.ack.assert_has_calls([])

message_2 = SequencedMessage(cursor=Cursor(offset=2), size_bytes=10)
underlying.read.return_value = message_2
read_2: Message = await subscriber.read()
ack_set_tracker.track.assert_has_calls([call(1), call(2)])
assert read_2.message_id == "2"
assert read_2.ack_id == ack_id(1, 2)
read_2.ack()
await ack_called_queue.get()
await ack_result_queue.put(None)
Expand Down

0 comments on commit 644163d

Please sign in to comment.