From 7e29cd4f0dd59702ce33a6120bedd90fb945276f Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 6 Mar 2024 15:54:00 -0800 Subject: [PATCH] ref: Remove all references to settings.KAFKA_TOPICS so it can be removed (#66288) Depends on https://github.com/getsentry/sentry/pull/66283 and https://github.com/getsentry/sentry/pull/66343 --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> --- src/sentry/consumers/__init__.py | 14 ++++---------- src/sentry/sentry_metrics/client/kafka.py | 16 +++++++--------- .../test_post_process_forwarder.py | 8 ++++++-- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index bcce33693885c..c44f394517f1d 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -377,8 +377,6 @@ def get_stream_processor( validate_schema: bool = False, group_instance_id: str | None = None, ) -> StreamProcessor: - from django.conf import settings - from sentry.utils import kafka_config try: @@ -398,8 +396,10 @@ def get_stream_processor( strategy_factory_cls = import_string(consumer_definition["strategy_factory"]) consumer_topic = consumer_definition["topic"] - default_topic = consumer_topic.value - real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic) + + topic_defn = get_topic_definition(consumer_topic) + real_topic = topic_defn["real_topic_name"] + cluster = topic_defn["cluster"] if topic is None: topic = real_topic @@ -412,12 +412,6 @@ def get_stream_processor( strategy_factory_cls, **cmd_context.params, **consumer_definition.get("static_args") or {} ) - topic_def = settings.KAFKA_TOPICS[real_topic] - assert topic_def is not None - - if cluster is None: - cluster = topic_def["cluster"] - def build_consumer_config(group_id: str): assert cluster is not None diff --git a/src/sentry/sentry_metrics/client/kafka.py b/src/sentry/sentry_metrics/client/kafka.py index a4c0425cdd5cb..27f9be70dbceb 100644 --- a/src/sentry/sentry_metrics/client/kafka.py +++ b/src/sentry/sentry_metrics/client/kafka.py @@ -5,17 +5,17 @@ from typing import Any import sentry_kafka_schemas -from arroyo import Topic +from arroyo import Topic as ArroyoTopic from arroyo.backends.abstract import Producer from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration -from django.conf import settings from django.core.cache import cache from sentry import quotas +from sentry.conf.types.kafka_definition import Topic from sentry.sentry_metrics.client.base import GenericMetricsBackend from sentry.sentry_metrics.use_case_id_registry import UseCaseID from sentry.utils import json -from sentry.utils.kafka_config import get_kafka_producer_cluster_options +from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition ingest_codec: sentry_kafka_schemas.codecs.Codec[Any] = sentry_kafka_schemas.get_codec( "ingest-metrics" @@ -50,12 +50,10 @@ def get_retention_from_org_id(org_id: int) -> int: class KafkaMetricsBackend(GenericMetricsBackend): def __init__(self) -> None: - kafka_topic_name = settings.KAFKA_INGEST_PERFORMANCE_METRICS - self.kafka_topic = Topic(kafka_topic_name) - - kafka_topic_dict = settings.KAFKA_TOPICS[kafka_topic_name] - assert kafka_topic_dict is not None - cluster_name = kafka_topic_dict["cluster"] + logical_topic = Topic.INGEST_PERFORMANCE_METRICS + topic_defn = get_topic_definition(logical_topic) + self.kafka_topic = ArroyoTopic(topic_defn["real_topic_name"]) + cluster_name = topic_defn["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) self.producer: Producer = KafkaProducer( build_kafka_configuration(default_config=producer_config) diff --git a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py index c1b57fbbe9f2c..19754b4948d3a 100644 --- a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py +++ b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py @@ -5,6 +5,7 @@ from typing import Any from unittest.mock import patch +from arroyo.backends.kafka import KafkaPayload from arroyo.processing import StreamProcessor from arroyo.utils import metrics from confluent_kafka import Producer @@ -62,7 +63,10 @@ def setUp(self) -> None: KAFKA_TOPICS={ self.events_topic: {"cluster": "default"}, }, - KAFKA_TOPIC_OVERRIDES={"events": self.events_topic, "transactions": self.events_topic}, + KAFKA_TOPIC_OVERRIDES={ + "events": self.events_topic, + "transactions": self.events_topic, + }, ) self.override_settings_cm.__enter__() @@ -81,7 +85,7 @@ def tearDown(self) -> None: def get_test_stream_processor( self, mode: str, consumer_group: str, synchronize_commit_group: str - ) -> StreamProcessor: + ) -> StreamProcessor[KafkaPayload]: return get_stream_processor( consumer_name="post-process-forwarder-errors", consumer_args=[f"--mode={mode}"],