/
producer.py
99 lines (75 loc) · 3.71 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from __future__ import annotations
import logging
from collections.abc import MutableMapping
from typing import Any, cast
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from arroyo.types import Message, Value
from django.conf import settings
from sentry.conf.types.kafka_definition import Topic
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.run import process_message
from sentry.issues.status_change_message import StatusChangeMessage
from sentry.services.hybrid_cloud import ValueEqualityEnum
from sentry.utils import json
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
logger = logging.getLogger(__name__)
class PayloadType(ValueEqualityEnum):
"""
Defines the type of payload that is being sent to Kafka.
Messages without PayloadTypes default to OCCURRENCE.
When adding new types, existing tests must pass without modifying the payload_type or the payload for backwards compatibility.
"""
OCCURRENCE = "occurrence"
STATUS_CHANGE = "status_change"
def _get_occurrence_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))
_occurrence_producer = SingletonProducer(
_get_occurrence_producer, max_futures=settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT
)
def produce_occurrence_to_kafka(
payload_type: PayloadType = PayloadType.OCCURRENCE,
occurrence: IssueOccurrence | None = None,
status_change: StatusChangeMessage | None = None,
event_data: dict[str, Any] | None = None,
) -> None:
if payload_type == PayloadType.OCCURRENCE:
payload_data = _prepare_occurrence_message(occurrence, event_data)
elif payload_type == PayloadType.STATUS_CHANGE:
payload_data = _prepare_status_change_message(status_change)
else:
raise NotImplementedError(f"Unknown payload type: {payload_type}")
if payload_data is None:
return
payload = KafkaPayload(None, json.dumps(payload_data).encode("utf-8"), [])
if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
# If we're not running Kafka then we're just in dev.
# Skip producing to Kafka and just process the message directly
process_message(Message(Value(payload=payload, committable={})))
return
_occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload)
def _prepare_occurrence_message(
occurrence: IssueOccurrence | None, event_data: dict[str, Any] | None
) -> MutableMapping[str, Any] | None:
if not occurrence:
raise ValueError("occurrence must be provided")
if event_data and occurrence.event_id != event_data["event_id"]:
raise ValueError("Event id on occurrence and event_data must be the same")
payload_data = cast(MutableMapping[str, Any], occurrence.to_dict())
payload_data["payload_type"] = PayloadType.OCCURRENCE.value
if event_data:
payload_data["event"] = event_data
return payload_data
def _prepare_status_change_message(
status_change: StatusChangeMessage | None,
) -> MutableMapping[str, Any] | None:
if not status_change:
raise ValueError("status_change must be provided")
payload_data = cast(MutableMapping[str, Any], status_change.to_dict())
payload_data["payload_type"] = PayloadType.STATUS_CHANGE.value
return payload_data