/
simple_event.py
72 lines (59 loc) · 2.63 KB
/
simple_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
import msgpack
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message
from sentry.models.project import Project
from sentry.utils import metrics
from .processors import IngestMessage, Retriable, process_event
logger = logging.getLogger(__name__)
def process_simple_event_message(
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool
) -> None:
"""
Processes a single Kafka Message containing a "simple" Event payload.
This does:
- Decode the Kafka payload which is in msgpack format and has a bit of
metadata like `type` and `project_id`.
- Fetch the corresponding Project from cache.
- Decode the actual event payload which is in JSON format and perform some
initial loadshedding on it.
- Store the JSON payload in the event processing store, and pass it on to
`preprocess_event`, which will schedule a followup task such as
`symbolicate_event` or `process_event`.
"""
try:
raw_payload = raw_message.payload.value
metrics.distribution(
"ingest_consumer.payload_size",
len(raw_payload),
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)
message_type = message["type"]
project_id = message["project_id"]
if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")
try:
try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return
return process_event(message, project, reprocess_only_stuck_events)
except Exception as exc:
raise Retriable(exc)
except Exception as exc:
# Non retriable exceptions raise InvalidMessage, which Arroyo will DLQ.
# The consumer will crash on any potentially retriable exceptions.
# A better approach would be to instead validate the schema here and only
# DLQ messages that fail schema validation, but this topic has no schema
# defined currently.
if not isinstance(exc, Retriable):
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset)
else:
raise