-
Notifications
You must be signed in to change notification settings - Fork 12
/
subscriber_impl_test.py
93 lines (68 loc) · 2.88 KB
/
subscriber_impl_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
import concurrent
from concurrent.futures.thread import ThreadPoolExecutor
from queue import Queue
from asynctest.mock import MagicMock
import pytest
from google.api_core.exceptions import FailedPrecondition
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub.internal.streaming_pull_manager import CloseCallback
from google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl import SubscriberImpl
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber, MessageCallback
from google.cloud.pubsublite.testing.test_utils import Box
@pytest.fixture()
def async_subscriber():
subscriber = MagicMock(spec=AsyncSubscriber)
subscriber.__aenter__.return_value = subscriber
return subscriber
@pytest.fixture()
def message_callback():
return MagicMock(spec=MessageCallback)
@pytest.fixture()
def close_callback():
return MagicMock(spec=CloseCallback)
@pytest.fixture()
def subscriber(async_subscriber, message_callback, close_callback):
return SubscriberImpl(async_subscriber, message_callback, ThreadPoolExecutor(max_workers=1))
async def sleep_forever(*args, **kwargs):
await asyncio.sleep(float("inf"))
def test_init(subscriber: SubscriberImpl, async_subscriber, close_callback):
async_subscriber.read.side_effect = sleep_forever
subscriber.add_close_callback(close_callback)
subscriber.__enter__()
async_subscriber.__aenter__.assert_called_once()
subscriber.close()
async_subscriber.__aexit__.assert_called_once()
close_callback.assert_called_once_with(subscriber, None)
def test_failed(subscriber: SubscriberImpl, async_subscriber, close_callback):
error = FailedPrecondition("bad read")
async_subscriber.read.side_effect = error
close_called = concurrent.futures.Future()
close_callback.side_effect = lambda manager, err: close_called.set_result(None)
subscriber.add_close_callback(close_callback)
subscriber.__enter__()
async_subscriber.__aenter__.assert_called_once()
close_called.result()
async_subscriber.__aexit__.assert_called_once()
close_callback.assert_called_once_with(subscriber, error)
def test_messages_received(subscriber: SubscriberImpl, async_subscriber, message_callback, close_callback):
message1 = Message(PubsubMessage(message_id="1")._pb, "", 0, None)
message2 = Message(PubsubMessage(message_id="2")._pb, "", 0, None)
counter = Box[int]()
counter.val = 0
async def on_read() -> Message:
counter.val += 1
if counter.val == 1:
return message1
if counter.val == 2:
return message2
await sleep_forever()
async_subscriber.read.side_effect = on_read
results = Queue()
message_callback.side_effect = lambda m: results.put(m.message_id)
subscriber.add_close_callback(close_callback)
subscriber.__enter__()
assert results.get() == "1"
assert results.get() == "2"
subscriber.close()