-
Notifications
You must be signed in to change notification settings - Fork 12
/
retrying_connection.py
150 lines (132 loc) · 5.87 KB
/
retrying_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from asyncio import Future
from typing import Optional
from google.api_core.exceptions import GoogleAPICallError, Cancelled
from google.cloud.pubsublite.internal.status_codes import is_retryable
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors
from google.cloud.pubsublite.internal.wire.connection_reinitializer import (
ConnectionReinitializer,
)
from google.cloud.pubsublite.internal.wire.connection import (
Connection,
Request,
Response,
ConnectionFactory,
)
from google.cloud.pubsublite.internal.wire.work_item import WorkItem
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
_MIN_BACKOFF_SECS = 0.01
_MAX_BACKOFF_SECS = 10
class RetryingConnection(Connection[Request, Response], PermanentFailable):
"""A connection which performs retries on an underlying stream when experiencing retryable errors."""
_connection_factory: ConnectionFactory[Request, Response]
_reinitializer: ConnectionReinitializer[Request, Response]
_initialized_once: asyncio.Event
_loop_task: asyncio.Future
_write_queue: "asyncio.Queue[WorkItem[Request, None]]"
_read_queue: "asyncio.Queue[Response]"
def __init__(
self,
connection_factory: ConnectionFactory[Request, Response],
reinitializer: ConnectionReinitializer[Request, Response],
):
super().__init__()
self._connection_factory = connection_factory
self._reinitializer = reinitializer
self._initialized_once = asyncio.Event()
self._write_queue = asyncio.Queue(maxsize=1)
self._read_queue = asyncio.Queue(maxsize=1)
async def __aenter__(self):
self._loop_task = asyncio.ensure_future(self._run_loop())
await self.await_unless_failed(self._initialized_once.wait())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.fail(Cancelled("Connection shutting down."))
async def write(self, request: Request) -> None:
item = WorkItem(request)
await self.await_unless_failed(self._write_queue.put(item))
return await self.await_unless_failed(item.response_future)
async def read(self) -> Response:
return await self.await_unless_failed(self._read_queue.get())
async def _run_loop(self):
"""
Processes actions on this connection and handles retries until cancelled.
"""
last_failure: Optional[GoogleAPICallError] = None
try:
bad_retries = 0
while True:
try:
conn_fut = self._connection_factory.new()
async with (await conn_fut) as connection:
# Needs to happen prior to reinitialization to clear outstanding waiters.
if last_failure is not None:
while not self._write_queue.empty():
self._write_queue.get_nowait().response_future.set_exception(
last_failure
)
self._read_queue = asyncio.Queue(maxsize=1)
self._write_queue = asyncio.Queue(maxsize=1)
await self._reinitializer.reinitialize(connection, last_failure)
self._initialized_once.set()
bad_retries = 0
await self._loop_connection(connection)
except GoogleAPICallError as e:
last_failure = e
if not is_retryable(e):
self.fail(e)
return
await asyncio.sleep(
min(_MAX_BACKOFF_SECS, _MIN_BACKOFF_SECS * (2 ** bad_retries))
)
bad_retries += 1
except asyncio.CancelledError:
return
except Exception as e:
import traceback
traceback.print_exc()
print(e)
async def _loop_connection(self, connection: Connection[Request, Response]):
read_task: "Future[Response]" = asyncio.ensure_future(connection.read())
write_task: "Future[WorkItem[Request]]" = asyncio.ensure_future(
self._write_queue.get()
)
try:
while True:
done, _ = await asyncio.wait(
[write_task, read_task], return_when=asyncio.FIRST_COMPLETED
)
if write_task in done:
await self._handle_write(connection, await write_task)
write_task = asyncio.ensure_future(self._write_queue.get())
if read_task in done:
await self._read_queue.put(await read_task)
read_task = asyncio.ensure_future(connection.read())
finally:
read_task.cancel()
write_task.cancel()
await wait_ignore_errors(read_task)
await wait_ignore_errors(write_task)
@staticmethod
async def _handle_write(
connection: Connection[Request, Response], to_write: WorkItem[Request, Response]
):
try:
await connection.write(to_write.request)
to_write.response_future.set_result(None)
except GoogleAPICallError as e:
to_write.response_future.set_exception(e)
raise e