/
backend.py
224 lines (194 loc) · 8.68 KB
/
backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from __future__ import annotations
import logging
from collections.abc import Mapping, MutableMapping, Sequence
from typing import TYPE_CHECKING, Any
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.eventstream.base import EventStreamEventType, GroupStates
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.killswitches import killswitch_matches_context
from sentry.utils import json
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from sentry.eventstore.models import Event, GroupEvent
class KafkaEventStream(SnubaProtocolEventStream):
def __init__(self, **options: Any) -> None:
self.topic = Topic.EVENTS
self.transactions_topic = Topic.TRANSACTIONS
self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC
self.__producers: MutableMapping[Topic, Producer] = {}
def get_transactions_topic(self, project_id: int) -> Topic:
return self.transactions_topic
def get_producer(self, topic: Topic) -> Producer:
if topic not in self.__producers:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
self.__producers[topic] = Producer(cluster_options)
return self.__producers[topic]
def delivery_callback(self, error: KafkaError | None, message: KafkaMessage) -> None:
if error is not None:
logger.warning("Could not publish message (error: %s): %r", error, message)
def _get_headers_for_insert(
self,
event: Event | GroupEvent,
is_new: bool,
is_regression: bool,
is_new_group_environment: bool,
primary_hash: str | None,
received_timestamp: float,
skip_consume: bool,
group_states: GroupStates | None = None,
) -> MutableMapping[str, str]:
# HACK: We are putting all this extra information that is required by the
# post process forwarder into the headers so we can skip parsing entire json
# messages. The post process forwarder is currently bound to a single core.
# Once we are able to parallelize the JSON parsing and other transformation
# steps being done there we may want to remove this hack.
def encode_bool(value: bool | None) -> str:
if value is None:
value = False
return str(int(value))
def encode_list(value: Sequence[Any]) -> str:
return json.dumps(value)
# we strip `None` values here so later in the pipeline they can be
# cleanly encoded without nullability checks
def strip_none_values(value: Mapping[str, str | None]) -> MutableMapping[str, str]:
return {key: value for key, value in value.items() if value is not None}
send_new_headers = options.get("eventstream:kafka-headers")
if send_new_headers is True:
return strip_none_values(
{
"Received-Timestamp": str(received_timestamp),
"event_id": str(event.event_id),
"project_id": str(event.project_id),
"occurrence_id": self._get_occurrence_data(event).get("id"),
"group_id": str(event.group_id) if event.group_id is not None else None,
"primary_hash": str(primary_hash) if primary_hash is not None else None,
"is_new": encode_bool(is_new),
"is_new_group_environment": encode_bool(is_new_group_environment),
"is_regression": encode_bool(is_regression),
"skip_consume": encode_bool(skip_consume),
"group_states": encode_list(group_states) if group_states is not None else None,
"queue": self._get_queue_for_post_process(event),
}
)
else:
return {
**super()._get_headers_for_insert(
event,
is_new,
is_regression,
is_new_group_environment,
primary_hash,
received_timestamp,
skip_consume,
),
}
def insert(
self,
event: Event | GroupEvent,
is_new: bool,
is_regression: bool,
is_new_group_environment: bool,
primary_hash: str | None,
received_timestamp: float,
skip_consume: bool = False,
group_states: GroupStates | None = None,
**kwargs: Any,
) -> None:
event_type = self._get_event_type(event)
if event.get_tag("sample_event"):
logger.info(
"insert: inserting event in KafkaEventStream",
extra={
"event.id": event.event_id,
"project_id": event.project_id,
"sample_event": True,
"event_type": event_type.value,
},
)
assign_partitions_randomly = (
(event_type == EventStreamEventType.Generic)
or (event_type == EventStreamEventType.Transaction)
or killswitch_matches_context(
"kafka.send-project-events-to-random-partitions",
{"project_id": event.project_id, "message_type": event_type.value},
)
)
if assign_partitions_randomly:
kwargs[KW_SKIP_SEMANTIC_PARTITIONING] = True
if event.get_tag("sample_event"):
logger.info(
"insert: inserting event in SnubaProtocolEventStream",
extra={
"event.id": event.event_id,
"project_id": event.project_id,
"sample_event": True,
},
)
kwargs["asynchronous"] = False
super().insert(
event,
is_new,
is_regression,
is_new_group_environment,
primary_hash,
received_timestamp,
skip_consume,
group_states,
**kwargs,
)
def _send(
self,
project_id: int,
_type: str,
extra_data: tuple[Any, ...] = (),
asynchronous: bool = True,
headers: MutableMapping[str, str] | None = None,
skip_semantic_partitioning: bool = False,
event_type: EventStreamEventType = EventStreamEventType.Error,
) -> None:
if headers is None:
headers = {}
headers["operation"] = _type
headers["version"] = str(self.EVENT_PROTOCOL_VERSION)
if event_type == EventStreamEventType.Transaction:
topic = self.get_transactions_topic(project_id)
elif event_type == EventStreamEventType.Generic:
topic = self.issue_platform_topic
else:
topic = self.topic
producer = self.get_producer(topic)
# Polling the producer is required to ensure callbacks are fired. This
# means that the latency between a message being delivered (or failing
# to be delivered) and the corresponding callback being fired is
# roughly the same as the duration of time that passes between publish
# calls. If this ends up being too high, the publisher should be moved
# into a background thread that can poll more frequently without
# interfering with request handling. (This does `poll` does not act as
# a heartbeat for the purposes of any sort of session expiration.)
# Note that this call to poll() is *only* dealing with earlier
# asynchronous produce() calls from the same process.
producer.poll(0.0)
assert isinstance(extra_data, tuple)
real_topic = get_topic_definition(topic)["real_topic_name"]
try:
producer.produce(
topic=real_topic,
key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None,
value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data),
on_delivery=self.delivery_callback,
headers=[(k, v.encode("utf-8")) for k, v in headers.items()],
)
except Exception as error:
logger.exception("Could not publish message: %s", error)
return
if not asynchronous:
# flush() is a convenience method that calls poll() until len() is zero
producer.flush()
def requires_post_process_forwarder(self) -> bool:
return True