-
Notifications
You must be signed in to change notification settings - Fork 12
/
flow_control_batcher_test.py
28 lines (24 loc) · 1.11 KB
/
flow_control_batcher_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from google.cloud.pubsublite.internal.wire.flow_control_batcher import FlowControlBatcher
from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage
def test_restart_clears_send():
batcher = FlowControlBatcher()
batcher.add(FlowControlRequest(allowed_bytes=10, allowed_messages=3))
assert batcher.should_expedite()
to_send = batcher.release_pending_request()
assert to_send.allowed_bytes == 10
assert to_send.allowed_messages == 3
restart_1 = batcher.request_for_restart()
assert restart_1.allowed_bytes == 10
assert restart_1.allowed_messages == 3
assert not batcher.should_expedite()
assert batcher.release_pending_request() is None
def test_add_remove():
batcher = FlowControlBatcher()
batcher.add(FlowControlRequest(allowed_bytes=10, allowed_messages=3))
restart_1 = batcher.request_for_restart()
assert restart_1.allowed_bytes == 10
assert restart_1.allowed_messages == 3
batcher.on_messages([SequencedMessage(size_bytes=2), SequencedMessage(size_bytes=3)])
restart_2 = batcher.request_for_restart()
assert restart_2.allowed_bytes == 5
assert restart_2.allowed_messages == 1