-
Notifications
You must be signed in to change notification settings - Fork 12
/
retrying_connection.py
136 lines (119 loc) · 5.29 KB
/
retrying_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import asyncio
from asyncio import Future
from typing import Optional
from google.api_core.exceptions import GoogleAPICallError, Cancelled
from google.cloud.pubsublite.internal.status_codes import is_retryable
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors
from google.cloud.pubsublite.internal.wire.connection_reinitializer import (
ConnectionReinitializer,
)
from google.cloud.pubsublite.internal.wire.connection import (
Connection,
Request,
Response,
ConnectionFactory,
)
from google.cloud.pubsublite.internal.wire.work_item import WorkItem
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
_MIN_BACKOFF_SECS = 0.01
_MAX_BACKOFF_SECS = 10
class RetryingConnection(Connection[Request, Response], PermanentFailable):
"""A connection which performs retries on an underlying stream when experiencing retryable errors."""
_connection_factory: ConnectionFactory[Request, Response]
_reinitializer: ConnectionReinitializer[Request, Response]
_initialized_once: asyncio.Event
_loop_task: asyncio.Future
_write_queue: "asyncio.Queue[WorkItem[Request, None]]"
_read_queue: "asyncio.Queue[Response]"
def __init__(
self,
connection_factory: ConnectionFactory[Request, Response],
reinitializer: ConnectionReinitializer[Request, Response],
):
super().__init__()
self._connection_factory = connection_factory
self._reinitializer = reinitializer
self._initialized_once = asyncio.Event()
self._write_queue = asyncio.Queue(maxsize=1)
self._read_queue = asyncio.Queue(maxsize=1)
async def __aenter__(self):
self._loop_task = asyncio.ensure_future(self._run_loop())
await self.await_unless_failed(self._initialized_once.wait())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.fail(Cancelled("Connection shutting down."))
async def write(self, request: Request) -> None:
item = WorkItem(request)
await self.await_unless_failed(self._write_queue.put(item))
return await self.await_unless_failed(item.response_future)
async def read(self) -> Response:
return await self.await_unless_failed(self._read_queue.get())
async def _run_loop(self):
"""
Processes actions on this connection and handles retries until cancelled.
"""
last_failure: Optional[GoogleAPICallError] = None
try:
bad_retries = 0
while True:
try:
conn_fut = self._connection_factory.new()
async with (await conn_fut) as connection:
# Needs to happen prior to reinitialization to clear outstanding waiters.
if last_failure is not None:
while not self._write_queue.empty():
self._write_queue.get_nowait().response_future.set_exception(
last_failure
)
self._read_queue = asyncio.Queue(maxsize=1)
self._write_queue = asyncio.Queue(maxsize=1)
await self._reinitializer.reinitialize(connection)
self._initialized_once.set()
bad_retries = 0
await self._loop_connection(connection)
except GoogleAPICallError as e:
last_failure = e
if not is_retryable(e):
self.fail(e)
return
await asyncio.sleep(
min(_MAX_BACKOFF_SECS, _MIN_BACKOFF_SECS * (2 ** bad_retries))
)
bad_retries += 1
except asyncio.CancelledError:
return
except Exception as e:
import traceback
traceback.print_exc()
print(e)
async def _loop_connection(self, connection: Connection[Request, Response]):
read_task: "Future[Response]" = asyncio.ensure_future(connection.read())
write_task: "Future[WorkItem[Request]]" = asyncio.ensure_future(
self._write_queue.get()
)
try:
while True:
done, _ = await asyncio.wait(
[write_task, read_task], return_when=asyncio.FIRST_COMPLETED
)
if write_task in done:
await self._handle_write(connection, await write_task)
write_task = asyncio.ensure_future(self._write_queue.get())
if read_task in done:
await self._read_queue.put(await read_task)
read_task = asyncio.ensure_future(connection.read())
finally:
read_task.cancel()
write_task.cancel()
await wait_ignore_errors(read_task)
await wait_ignore_errors(write_task)
@staticmethod
async def _handle_write(
connection: Connection[Request, Response], to_write: WorkItem[Request, Response]
):
try:
await connection.write(to_write.request)
to_write.response_future.set_result(None)
except GoogleAPICallError as e:
to_write.response_future.set_exception(e)
raise e