-
Notifications
You must be signed in to change notification settings - Fork 12
/
subscriber_impl.py
75 lines (64 loc) · 2.62 KB
/
subscriber_impl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import concurrent.futures
import threading
from asyncio import CancelledError
from concurrent.futures.thread import ThreadPoolExecutor
from typing import ContextManager, Optional
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ManagedEventLoop
from google.cloud.pubsublite.cloudpubsub.internal.streaming_pull_manager import StreamingPullManager, CloseCallback
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber, MessageCallback
class SubscriberImpl(ContextManager, StreamingPullManager):
_underlying: AsyncSubscriber
_callback: MessageCallback
_executor: ThreadPoolExecutor
_event_loop: ManagedEventLoop
_poller_future: concurrent.futures.Future
_close_lock: threading.Lock
_failure: Optional[GoogleAPICallError]
_close_callback: Optional[CloseCallback]
_closed: bool
def __init__(self, underlying: AsyncSubscriber, callback: MessageCallback, executor: ThreadPoolExecutor):
self._underlying = underlying
self._callback = callback
self._executor = executor
self._event_loop = ManagedEventLoop()
self._close_lock = threading.Lock()
self._failure = None
self._close_callback = None
self._closed = False
def add_close_callback(self, close_callback: CloseCallback):
with self._close_lock:
assert self._close_callback is None
self._close_callback = close_callback
def close(self):
with self._close_lock:
if not self._closed:
self._closed = True
self.__exit__(None, None, None)
def _fail(self, error: GoogleAPICallError):
self._failure = error
self.close()
async def _poller(self):
try:
while True:
message = await self._underlying.read()
self._executor.submit(self._callback, message)
except GoogleAPICallError as e:
self._executor.submit(lambda: self._fail(e))
def __enter__(self):
assert self._close_callback is not None
self._event_loop.__enter__()
self._event_loop.submit(self._underlying.__aenter__()).result()
self._poller_future = self._event_loop.submit(self._poller())
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
self._poller_future.cancel()
self._poller_future.result()
except CancelledError:
pass
self._event_loop.submit(self._underlying.__aexit__(exc_type, exc_value, traceback)).result()
self._event_loop.__exit__(exc_type, exc_value, traceback)
assert self._close_callback is not None
self._executor.shutdown(wait=False) # __exit__ may be called from the executor.
self._close_callback(self, self._failure)