/
asyncsender.py
149 lines (121 loc) · 4.09 KB
/
asyncsender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import threading
from queue import Queue, Full, Empty
from fluent import sender
from fluent.sender import EventTime
__all__ = ["EventTime", "FluentSender"]
DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False
_TOMBSTONE = object()
_global_sender = None
def _set_global_sender(sender): # pragma: no cover
"""[For testing] Function to set global sender directly"""
global _global_sender
_global_sender = sender
def setup(tag, **kwargs): # pragma: no cover
global _global_sender
_global_sender = FluentSender(tag, **kwargs)
def get_global_sender(): # pragma: no cover
return _global_sender
def close(): # pragma: no cover
get_global_sender().close()
class FluentSender(sender.FluentSender):
def __init__(
self,
tag,
host="localhost",
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
queue_overflow_handler=None,
**kwargs,
):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
"""
super().__init__(
tag=tag,
host=host,
port=port,
bufmax=bufmax,
timeout=timeout,
verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision,
msgpack_kwargs=msgpack_kwargs,
**kwargs,
)
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
if queue_circular and queue_overflow_handler:
self._queue_overflow_handler = queue_overflow_handler
else:
self._queue_overflow_handler = self._queue_overflow_handler_default
self._thread_guard = (
threading.Event()
) # This ensures visibility across all variables
self._closed = False
self._queue = Queue(maxsize=queue_maxsize)
self._send_thread = threading.Thread(
target=self._send_loop, name="AsyncFluentSender %d" % id(self)
)
self._send_thread.daemon = True
self._send_thread.start()
def close(self, flush=True):
with self.lock:
if self._closed:
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._queue.put(_TOMBSTONE)
self._send_thread.join()
@property
def queue_maxsize(self):
return self._queue_maxsize
@property
def queue_blocking(self):
return not self._queue_circular
@property
def queue_circular(self):
return self._queue_circular
def _send(self, bytes_):
with self.lock:
if self._closed:
return False
if self._queue_circular and self._queue.full():
# discard oldest
try:
discarded_bytes = self._queue.get(block=False)
except Empty: # pragma: no cover
pass
else:
self._queue_overflow_handler(discarded_bytes)
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full: # pragma: no cover
return False # this actually can't happen
return True
def _send_loop(self):
send_internal = super()._send_internal
try:
while True:
bytes_ = self._queue.get(block=True)
if bytes_ is _TOMBSTONE:
break
send_internal(bytes_)
finally:
self._close()
def _queue_overflow_handler_default(self, discarded_bytes):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()