Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Move more code to the new way of defining kafka topics and overrides #66283

Merged
merged 5 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/sentry/conf/server.py
Expand Up @@ -21,7 +21,6 @@
from sentry.conf.types.logging_config import LoggingConfig
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
from sentry.conf.types.topic_definition import TopicDefinition
from sentry.utils import json # NOQA (used in getsentry config)
from sentry.utils.celery import crontab_with_minute_jitter
from sentry.utils.types import Type, type_from_value
Expand Down Expand Up @@ -3531,9 +3530,16 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"shared-resources-usage": "default",
}

from typing import TypedDict


class LegacyTopicDefinition(TypedDict):
cluster: str


# Cluster configuration for each Kafka topic by name.
# DEPRECATED
KAFKA_TOPICS: Mapping[str, TopicDefinition] = {
KAFKA_TOPICS: Mapping[str, LegacyTopicDefinition] = {
KAFKA_EVENTS: {"cluster": "default"},
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
KAFKA_TRANSACTIONS: {"cluster": "default"},
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/conf/types/kafka_definition.py
Expand Up @@ -70,7 +70,7 @@ class ConsumerDefinition(TypedDict, total=False):
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str

dlq_topic: str
dlq_topic: Topic
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None

Expand Down
2 changes: 2 additions & 0 deletions src/sentry/conf/types/topic_definition.py
Expand Up @@ -5,3 +5,5 @@

class TopicDefinition(TypedDict):
cluster: str
# The topic name may be overridden from the default via KAFKA_TOPIC_OVERRIDES
real_topic_name: str
9 changes: 5 additions & 4 deletions src/sentry/consumers/__init__.py
Expand Up @@ -298,7 +298,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"ingest_profile": "release-health",
},
"dlq_topic": settings.KAFKA_INGEST_METRICS_DLQ,
"dlq_topic": Topic.INGEST_METRICS_DLQ,
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
},
Expand All @@ -309,7 +309,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"ingest_profile": "performance",
},
"dlq_topic": settings.KAFKA_INGEST_GENERIC_METRICS_DLQ,
"dlq_topic": Topic.INGEST_GENERIC_METRICS_DLQ,
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
},
Expand Down Expand Up @@ -517,7 +517,8 @@ def build_consumer_config(group_id: str):
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
) from e
try:
cluster_setting = get_topic_definition(dlq_topic)["cluster"]
dlq_topic_defn = get_topic_definition(dlq_topic)
cluster_setting = dlq_topic_defn["cluster"]
except ValueError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
Expand All @@ -527,7 +528,7 @@ def build_consumer_config(group_id: str):
dlq_producer = KafkaProducer(producer_config)

dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)),
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])),
DlqLimit(
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"],
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"],
Expand Down
18 changes: 10 additions & 8 deletions src/sentry/eventstream/kafka/backend.py
Expand Up @@ -7,9 +7,9 @@
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from django.conf import settings

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.eventstream.base import EventStreamEventType, GroupStates
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.killswitches import killswitch_matches_context
Expand All @@ -24,15 +24,15 @@

class KafkaEventStream(SnubaProtocolEventStream):
def __init__(self, **options: Any) -> None:
self.topic = settings.KAFKA_EVENTS
self.transactions_topic = settings.KAFKA_TRANSACTIONS
self.issue_platform_topic = settings.KAFKA_EVENTSTREAM_GENERIC
self.__producers: MutableMapping[str, Producer] = {}
self.topic = Topic.EVENTS
self.transactions_topic = Topic.TRANSACTIONS
self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC
self.__producers: MutableMapping[Topic, Producer] = {}

def get_transactions_topic(self, project_id: int) -> str:
def get_transactions_topic(self, project_id: int) -> Topic:
return self.transactions_topic

def get_producer(self, topic: str) -> Producer:
def get_producer(self, topic: Topic) -> Producer:
if topic not in self.__producers:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
Expand Down Expand Up @@ -202,9 +202,11 @@ def _send(

assert isinstance(extra_data, tuple)

real_topic = get_topic_definition(topic)["real_topic_name"]

try:
producer.produce(
topic=topic,
topic=real_topic,
key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None,
value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data),
on_delivery=self.delivery_callback,
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/issues/attributes.py
Expand Up @@ -6,7 +6,7 @@

import requests
import urllib3
from arroyo import Topic
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.db.models import F, Window
Expand All @@ -16,6 +16,7 @@
from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.models.group import Group
from sentry.models.groupassignee import GroupAssignee
from sentry.models.groupowner import GroupOwner, GroupOwnerType
Expand Down Expand Up @@ -44,7 +45,7 @@ class GroupValues:


def _get_attribute_snapshot_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_GROUP_ATTRIBUTES)["cluster"]
cluster_name = get_topic_definition(Topic.GROUP_ATTRIBUTES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand Down Expand Up @@ -122,7 +123,7 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None:
raise snuba.SnubaError(err)
else:
payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), [])
_attribute_snapshot_producer.produce(Topic(settings.KAFKA_GROUP_ATTRIBUTES), payload)
_attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload)


def _retrieve_group_values(group_id: int) -> GroupValues:
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/issues/producer.py
Expand Up @@ -4,11 +4,12 @@
from collections.abc import MutableMapping
from typing import Any, cast

from arroyo import Topic
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from arroyo.types import Message, Value
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.run import process_message
from sentry.issues.status_change_message import StatusChangeMessage
Expand All @@ -33,7 +34,7 @@ class PayloadType(ValueEqualityEnum):


def _get_occurrence_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"]
cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand Down Expand Up @@ -68,7 +69,7 @@ def produce_occurrence_to_kafka(
process_message(Message(Value(payload=payload, committable={})))
return

_occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload)
_occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload)


def _prepare_occurrence_message(
Expand Down
14 changes: 8 additions & 6 deletions src/sentry/monitors/tasks.py
Expand Up @@ -7,11 +7,13 @@

import msgpack
import sentry_sdk
from arroyo import Partition, Topic
from arroyo import Partition
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from confluent_kafka.admin import AdminClient, PartitionMetadata
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.constants import ObjectStatus
from sentry.monitors.logic.mark_failed import mark_failed
from sentry.monitors.schedule import get_prev_schedule
Expand Down Expand Up @@ -50,7 +52,7 @@


def _get_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
cluster_name = get_topic_definition(Topic.INGEST_MONITORS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand All @@ -62,10 +64,10 @@ def _get_producer() -> KafkaProducer:

@lru_cache(maxsize=None)
def _get_partitions() -> Mapping[int, PartitionMetadata]:
topic = settings.KAFKA_INGEST_MONITORS
cluster_name = get_topic_definition(topic)["cluster"]
topic_defn = get_topic_definition(Topic.INGEST_MONITORS)
topic = topic_defn["real_topic_name"]

conf = get_kafka_admin_cluster_options(cluster_name)
conf = get_kafka_admin_cluster_options(topic_defn["cluster"])
admin_client = AdminClient(conf)
result = admin_client.list_topics(topic)
topic_metadata = result.topics.get(topic)
Expand Down Expand Up @@ -203,7 +205,7 @@ def clock_pulse(current_datetime=None):
# topic. This is a requirement to ensure that none of the partitions stall,
# since the global clock is tied to the slowest partition.
for partition in _get_partitions().values():
dest = Partition(Topic(settings.KAFKA_INGEST_MONITORS), partition.id)
dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id)
_checkin_producer.produce(dest, payload)


Expand Down
5 changes: 2 additions & 3 deletions src/sentry/replays/lib/kafka.py
@@ -1,5 +1,4 @@
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.pubsub import KafkaPublisher

Expand All @@ -10,7 +9,7 @@ def initialize_replays_publisher(is_async=False) -> KafkaPublisher:
global replay_publisher

if replay_publisher is None:
config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
get_kafka_producer_cluster_options(config["cluster"]),
asynchronous=is_async,
Expand Down
5 changes: 2 additions & 3 deletions src/sentry/replays/usecases/ingest/dom_index.py
Expand Up @@ -8,9 +8,8 @@
from hashlib import md5
from typing import Any, Literal, TypedDict, cast

from django.conf import settings

from sentry import features
from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.replays.usecases.ingest.events import SentryEvent
from sentry.replays.usecases.ingest.issue_creation import (
Expand Down Expand Up @@ -219,7 +218,7 @@ def _initialize_publisher() -> KafkaPublisher:
global replay_publisher

if replay_publisher is None:
config = kafka_config.get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
config = kafka_config.get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
kafka_config.get_kafka_producer_cluster_options(config["cluster"])
)
Expand Down
7 changes: 3 additions & 4 deletions src/sentry/runner/commands/devserver.py
Expand Up @@ -366,12 +366,11 @@ def devserver(

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.batching_kafka_consumer import create_topics
from sentry.utils.kafka_config import get_topic_definition

for topic in Topic:
default_name = topic.value
physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name)
cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name]
create_topics(cluster_name, [physical_name])
topic_defn = get_topic_definition(topic)
create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]])

if dev_consumer:
daemons.append(
Expand Down
8 changes: 5 additions & 3 deletions src/sentry/sentry_metrics/configuration.py
Expand Up @@ -10,6 +10,8 @@

import sentry_sdk

from sentry.conf.types.kafka_definition import Topic

# The maximum length of a column that is indexed in postgres. It is important to keep this in
# sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py
MAX_INDEXED_COLUMN_LENGTH = 200
Expand Down Expand Up @@ -46,7 +48,7 @@ class MetricsIngestConfiguration:
db_backend: IndexerStorage
db_backend_options: Mapping[str, Any]
input_topic: str
output_topic: str
output_topic: Topic
use_case_id: UseCaseKey
internal_metrics_tag: str | None
writes_limiter_cluster_options: Mapping[str, Any]
Expand Down Expand Up @@ -79,7 +81,7 @@ def get_ingest_config(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_METRICS,
output_topic=settings.KAFKA_SNUBA_METRICS,
output_topic=Topic.SNUBA_METRICS,
use_case_id=UseCaseKey.RELEASE_HEALTH,
internal_metrics_tag="release-health",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS,
Expand All @@ -96,7 +98,7 @@ def get_ingest_config(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS,
output_topic=Topic.SNUBA_GENERIC_METRICS,
use_case_id=UseCaseKey.PERFORMANCE,
internal_metrics_tag="perf",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE,
Expand Down
Expand Up @@ -10,6 +10,7 @@
from arroyo.types import Commit, FilteredPayload, Message, Partition
from confluent_kafka import Producer

from sentry.conf.types.kafka_definition import Topic
from sentry.utils import kafka_config, metrics

logger = logging.getLogger(__name__)
Expand All @@ -18,7 +19,7 @@
class SimpleProduceStep(ProcessingStep[KafkaPayload]):
def __init__(
self,
output_topic: str,
output_topic: Topic,
commit_function: Commit,
producer: AbstractProducer[KafkaPayload] | None = None,
) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/usage_accountant/accountant.py
Expand Up @@ -12,9 +12,9 @@

from arroyo.backends.abstract import Producer
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from usageaccountant import UsageAccumulator, UsageUnit

from sentry.conf.types.kafka_definition import Topic
from sentry.options import get
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand Down Expand Up @@ -71,7 +71,7 @@ def record(

if _accountant_backend is None:
cluster_name = get_topic_definition(
settings.KAFKA_SHARED_RESOURCES_USAGE,
Topic.SHARED_RESOURCES_USAGE,
)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer = KafkaProducer(
Expand Down
12 changes: 6 additions & 6 deletions src/sentry/utils/kafka_config.py
Expand Up @@ -3,6 +3,7 @@

from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.conf.types.topic_definition import TopicDefinition

SUPPORTED_KAFKA_CONFIGURATION = (
Expand Down Expand Up @@ -96,9 +97,8 @@ def get_kafka_admin_cluster_options(
)


def get_topic_definition(topic: str) -> TopicDefinition:
defn = settings.KAFKA_TOPICS.get(topic)
if defn is not None:
return defn
else:
raise ValueError(f"Unknown {topic=}")
def get_topic_definition(topic: Topic) -> TopicDefinition:
return {
"cluster": settings.KAFKA_TOPIC_TO_CLUSTER[topic.value],
"real_topic_name": settings.KAFKA_TOPIC_OVERRIDES.get(topic.value, topic.value),
}