/
simple_event.py
81 lines (63 loc) · 2.73 KB
/
simple_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import logging
import msgpack
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message
from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.utils import metrics
from .processors import IngestMessage, Retriable, process_event
logger = logging.getLogger(__name__)
consumer_type_to_default_topic = {
"events": Topic.INGEST_EVENTS,
"transactions": Topic.INGEST_TRANSACTIONS,
"attachments": Topic.INGEST_ATTACHMENTS,
"ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS,
}
def process_simple_event_message(
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool
) -> None:
"""
Processes a single Kafka Message containing a "simple" Event payload.
This does:
- Decode the Kafka payload which is in msgpack format and has a bit of
metadata like `type` and `project_id`.
- Fetch the corresponding Project from cache.
- Decode the actual event payload which is in JSON format and perform some
initial loadshedding on it.
- Store the JSON payload in the event processing store, and pass it on to
`preprocess_event`, which will schedule a followup task such as
`symbolicate_event` or `process_event`.
"""
raw_payload = raw_message.payload.value
metrics.distribution(
"ingest_consumer.payload_size",
len(raw_payload),
tags={"consumer": consumer_type},
unit="byte",
)
try:
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)
message_type = message["type"]
project_id = message["project_id"]
if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")
try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return
return process_event(message, project, reprocess_only_stuck_events)
except Exception as exc:
# If the retriable exception was raised, we should not DLQ
if isinstance(exc, Retriable):
raise
# TODO: Remove this line once all topics (transactions, attachments,
# user feedback) also have DLQs
default_topic = consumer_type_to_default_topic[consumer_type].value
if default_topic != "ingest-events":
raise
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset) from exc