Skip to content

Commit

Permalink
fix: Backlog never zero despite messages received (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Aug 3, 2021
1 parent a0822f4 commit b93a0bf
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Expand Up @@ -54,7 +54,7 @@ async def ack(self, offset: int):
if receipt == ack:
prefix_acked_offset = receipt
continue
self._receipts.append(receipt)
self._receipts.appendleft(receipt)
self._acks.put(ack)
break
if prefix_acked_offset is None:
Expand Down
Expand Up @@ -51,15 +51,17 @@ async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker):
committer.commit.assert_has_calls([])
await tracker.ack(offset=3)
committer.commit.assert_has_calls([])
await tracker.ack(offset=5)
committer.commit.assert_has_calls([])
await tracker.ack(offset=1)
committer.commit.assert_has_calls([call(Cursor(offset=6))])
committer.commit.assert_has_calls([call(Cursor(offset=4))])
await tracker.ack(offset=5)
committer.commit.assert_has_calls(
[call(Cursor(offset=4)), call(Cursor(offset=6))]
)

tracker.track(offset=8)
await tracker.ack(offset=7)
committer.commit.assert_has_calls(
[call(Cursor(offset=6)), call(Cursor(offset=8))]
[call(Cursor(offset=4)), call(Cursor(offset=6)), call(Cursor(offset=8))]
)
committer.__aexit__.assert_called_once()

Expand Down

0 comments on commit b93a0bf

Please sign in to comment.