/
assigning_subscriber.py
91 lines (74 loc) · 3.55 KB
/
assigning_subscriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from asyncio import Future, Queue, ensure_future
from typing import Callable, NamedTuple, Dict, Set, Optional
from google.cloud.pubsub_v1.subscriber.message import Message
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
AsyncSingleSubscriber,
)
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.types import Partition
PartitionSubscriberFactory = Callable[[Partition], AsyncSingleSubscriber]
class _RunningSubscriber(NamedTuple):
subscriber: AsyncSingleSubscriber
poller: Future
class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable):
_assigner_factory: Callable[[], Assigner]
_subscriber_factory: PartitionSubscriberFactory
_subscribers: Dict[Partition, _RunningSubscriber]
# Lazily initialized to ensure they are initialized on the thread where __aenter__ is called.
_assigner: Optional[Assigner]
_messages: Optional["Queue[Message]"]
_assign_poller: Future
def __init__(
self,
assigner_factory: Callable[[], Assigner],
subscriber_factory: PartitionSubscriberFactory,
):
"""
Accepts a factory for an Assigner instead of an Assigner because GRPC asyncio uses the current thread's event
loop.
"""
super().__init__()
self._assigner_factory = assigner_factory
self._assigner = None
self._subscriber_factory = subscriber_factory
self._subscribers = {}
self._messages = None
async def read(self) -> Message:
return await self.await_unless_failed(self._messages.get())
async def _subscribe_action(self, subscriber: AsyncSingleSubscriber):
message = await subscriber.read()
await self._messages.put(message)
async def _start_subscriber(self, partition: Partition):
new_subscriber = self._subscriber_factory(partition)
await new_subscriber.__aenter__()
poller = ensure_future(
self.run_poller(lambda: self._subscribe_action(new_subscriber))
)
self._subscribers[partition] = _RunningSubscriber(new_subscriber, poller)
async def _stop_subscriber(self, running: _RunningSubscriber):
running.poller.cancel()
await wait_ignore_cancelled(running.poller)
await running.subscriber.__aexit__(None, None, None)
async def _assign_action(self):
assignment: Set[Partition] = await self._assigner.get_assignment()
added_partitions = assignment - self._subscribers.keys()
removed_partitions = self._subscribers.keys() - assignment
for partition in added_partitions:
await self._start_subscriber(partition)
for partition in removed_partitions:
await self._stop_subscriber(self._subscribers[partition])
del self._subscribers[partition]
async def __aenter__(self):
self._messages = Queue()
self._assigner = self._assigner_factory()
await self._assigner.__aenter__()
self._assign_poller = ensure_future(self.run_poller(self._assign_action))
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self._assign_poller.cancel()
await wait_ignore_cancelled(self._assign_poller)
await self._assigner.__aexit__(exc_type, exc_value, traceback)
for running in self._subscribers.values():
await self._stop_subscriber(running)