-
Notifications
You must be signed in to change notification settings - Fork 12
/
partition_count_watcher_impl_test.py
116 lines (94 loc) · 3.69 KB
/
partition_count_watcher_impl_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import concurrent
import queue
import threading
from asynctest.mock import MagicMock, CoroutineMock
import pytest
from google.cloud.pubsublite import AdminClientInterface
from google.cloud.pubsublite.internal.wire.partition_count_watcher import (
PartitionCountWatcher,
)
from google.cloud.pubsublite.internal.wire.partition_count_watcher_impl import (
PartitionCountWatcherImpl,
)
from google.cloud.pubsublite.internal.wire.partition_count_watching_publisher import (
PartitionCountWatchingPublisher,
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
from google.cloud.pubsublite.testing.test_utils import Box, wire_queues
from google.cloud.pubsublite.types import Partition, TopicPath, CloudZone, CloudRegion
from google.cloud.pubsublite_v1 import PubSubMessage
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import GoogleAPICallError
pytestmark = pytest.mark.asyncio
@pytest.fixture()
def mock_publishers():
return {Partition(i): MagicMock(spec=Publisher) for i in range(10)}
@pytest.fixture()
def topic():
zone = CloudZone(region=CloudRegion("a"), zone_id="a")
return TopicPath(project_number=1, location=zone, name="c")
@pytest.fixture()
def mock_admin():
admin = MagicMock(spec=AdminClientInterface)
return admin
@pytest.fixture()
def watcher(mock_admin, topic):
box = Box()
def set_box():
box.val = PartitionCountWatcherImpl(mock_admin, topic, 0.001)
# Initialize publisher on another thread with a different event loop.
thread = threading.Thread(target=set_box)
thread.start()
thread.join()
return box.val
async def test_init(watcher, mock_admin, topic):
mock_admin.get_topic_partition_count.return_value = 2
async with watcher:
pass
async def test_get_count_first_failure(watcher, mock_admin, topic):
mock_admin.get_topic_partition_count.side_effect = GoogleAPICallError("error")
with pytest.raises(GoogleAPICallError):
async with watcher:
await watcher.get_partition_count()
async def test_get_multiple_counts(watcher, mock_admin, topic):
q = queue.Queue()
mock_admin.get_topic_partition_count.side_effect = q.get
async with watcher:
task1 = asyncio.ensure_future(watcher.get_partition_count())
task2 = asyncio.ensure_future(watcher.get_partition_count())
assert not task1.done()
assert not task2.done()
q.put(3)
assert await task1 == 3
assert not task2.done()
q.put(4)
assert await task2 == 4
async def test_subsequent_failures_ignored(watcher, mock_admin, topic):
q = queue.Queue()
def side_effect():
value = q.get()
if isinstance(value, Exception):
raise value
return value
mock_admin.get_topic_partition_count.side_effect = lambda x: side_effect()
async with watcher:
q.put(3)
assert await watcher.get_partition_count() == 3
q.put(GoogleAPICallError("error"))
q.put(4)
assert await watcher.get_partition_count() == 4