-
-
Notifications
You must be signed in to change notification settings - Fork 663
/
connection_handler.py
131 lines (119 loc) · 5.78 KB
/
connection_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
'''
Copyright (C) 2017-2024 Bryant Moscon - bmoscon@gmail.com
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
import asyncio
import logging
import random
from socket import error as socket_error
import time
from typing import Awaitable
import zlib
from websockets import ConnectionClosed
from websockets.exceptions import InvalidStatusCode
from cryptofeed.connection import AsyncConnection
from cryptofeed.exceptions import ExhaustedRetries
from cryptofeed.defines import HUOBI, HUOBI_DM, HUOBI_SWAP, OKCOIN, OKX
LOG = logging.getLogger('feedhandler')
class ConnectionHandler:
def __init__(self, conn: AsyncConnection, subscribe: Awaitable, handler: Awaitable, authenticate: Awaitable, retries: int, timeout=120, timeout_interval=30, exceptions=None, log_on_error=False, start_delay=0):
self.conn = conn
self.subscribe = subscribe
self.handler = handler
self.authenticate = authenticate
self.retries = retries
self.exceptions = exceptions
self.log_on_error = log_on_error
self.timeout = timeout
self.timeout_interval = timeout_interval
self.running = True
self.start_delay = start_delay
def start(self, loop: asyncio.AbstractEventLoop):
loop.create_task(self._create_connection())
async def _watcher(self):
while self.conn.is_open and self.running:
if self.conn.last_message:
if time.time() - self.conn.last_message > self.timeout:
LOG.warning("%s: received no messages within timeout, restarting connection", self.conn.uuid)
await self.conn.close()
break
await asyncio.sleep(self.timeout_interval)
async def _create_connection(self):
await asyncio.sleep(self.start_delay)
retries = 0
rate_limited = 1
delay = 1
while (retries <= self.retries or self.retries == -1) and self.running:
try:
async with self.conn.connect() as connection:
await self.authenticate(connection)
await self.subscribe(connection)
# connection was successful, reset retry count and delay
retries = 0
rate_limited = 0
delay = 1
if self.timeout != -1:
loop = asyncio.get_running_loop()
loop.create_task(self._watcher())
await self._handler(connection, self.handler)
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError, socket_error) as e:
if self.exceptions:
for ex in self.exceptions:
if isinstance(e, ex):
LOG.warning("%s: encountered exception %s, which is on the ignore list. Raising", self.conn.uuid, str(e))
raise
LOG.warning("%s: encountered connection issue %s - reconnecting in %.1f seconds...", self.conn.uuid, str(e), delay, exc_info=True)
await asyncio.sleep(delay)
retries += 1
delay *= 2
except InvalidStatusCode as e:
if self.exceptions:
for ex in self.exceptions:
if isinstance(e, ex):
LOG.warning("%s: encountered exception %s, which is on the ignore list. Raising", self.conn.uuid, str(e))
raise
if e.status_code == 429:
rand = random.uniform(1.0, 3.0)
LOG.warning("%s: Rate Limited - waiting %d seconds to reconnect", self.conn.uuid, (rate_limited * 60 * rand))
await asyncio.sleep(rate_limited * 60 * rand)
rate_limited += 1
else:
LOG.warning("%s: encountered connection issue %s - reconnecting in %.1f seconds...", self.conn.uuid, str(e), delay, exc_info=True)
await asyncio.sleep(delay)
retries += 1
delay *= 2
except Exception as e:
if self.exceptions:
for ex in self.exceptions:
if isinstance(e, ex):
LOG.warning("%s: encountered exception %s, which is on the ignore list. Raising", self.conn.uuid, str(e))
raise
LOG.error("%s: encountered an exception, reconnecting in %.1f seconds", self.conn.uuid, delay, exc_info=True)
await asyncio.sleep(delay)
retries += 1
delay *= 2
if not self.running:
LOG.info('%s: terminate the connection handler because not running', self.conn.uuid)
else:
LOG.error('%s: failed to reconnect after %d retries - exiting', self.conn.uuid, retries)
raise ExhaustedRetries()
async def _handler(self, connection, handler):
try:
async for message in connection.read():
if not self.running:
await connection.close()
return
await handler(message, connection, self.conn.last_message)
except Exception:
if not self.running:
return
if self.log_on_error:
if connection.uuid in {HUOBI, HUOBI_DM, HUOBI_SWAP}:
message = zlib.decompress(message, 16 + zlib.MAX_WBITS)
elif connection.uuid in {OKCOIN, OKX}:
message = zlib.decompress(message, -15)
LOG.error("%s: error handling message %s", connection.uuid, message)
# exception will be logged with traceback when connection handler
# retries the connection
raise