Skip to content

Commit

Permalink
ref: Remove all references to settings.KAFKA_TOPICS so it can be remo…
Browse files Browse the repository at this point in the history
…ved (#66288)

Depends on #66283 and
#66343

---------

Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
  • Loading branch information
lynnagara and getsantry[bot] committed Mar 6, 2024
1 parent ba72cff commit 7e29cd4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 21 deletions.
14 changes: 4 additions & 10 deletions src/sentry/consumers/__init__.py
Expand Up @@ -377,8 +377,6 @@ def get_stream_processor(
validate_schema: bool = False,
group_instance_id: str | None = None,
) -> StreamProcessor:
from django.conf import settings

from sentry.utils import kafka_config

try:
Expand All @@ -398,8 +396,10 @@ def get_stream_processor(

strategy_factory_cls = import_string(consumer_definition["strategy_factory"])
consumer_topic = consumer_definition["topic"]
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)

topic_defn = get_topic_definition(consumer_topic)
real_topic = topic_defn["real_topic_name"]
cluster = topic_defn["cluster"]

if topic is None:
topic = real_topic
Expand All @@ -412,12 +412,6 @@ def get_stream_processor(
strategy_factory_cls, **cmd_context.params, **consumer_definition.get("static_args") or {}
)

topic_def = settings.KAFKA_TOPICS[real_topic]
assert topic_def is not None

if cluster is None:
cluster = topic_def["cluster"]

def build_consumer_config(group_id: str):
assert cluster is not None

Expand Down
16 changes: 7 additions & 9 deletions src/sentry/sentry_metrics/client/kafka.py
Expand Up @@ -5,17 +5,17 @@
from typing import Any

import sentry_kafka_schemas
from arroyo import Topic
from arroyo import Topic as ArroyoTopic
from arroyo.backends.abstract import Producer
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.core.cache import cache

from sentry import quotas
from sentry.conf.types.kafka_definition import Topic
from sentry.sentry_metrics.client.base import GenericMetricsBackend
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.utils import json
from sentry.utils.kafka_config import get_kafka_producer_cluster_options
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

ingest_codec: sentry_kafka_schemas.codecs.Codec[Any] = sentry_kafka_schemas.get_codec(
"ingest-metrics"
Expand Down Expand Up @@ -50,12 +50,10 @@ def get_retention_from_org_id(org_id: int) -> int:

class KafkaMetricsBackend(GenericMetricsBackend):
def __init__(self) -> None:
kafka_topic_name = settings.KAFKA_INGEST_PERFORMANCE_METRICS
self.kafka_topic = Topic(kafka_topic_name)

kafka_topic_dict = settings.KAFKA_TOPICS[kafka_topic_name]
assert kafka_topic_dict is not None
cluster_name = kafka_topic_dict["cluster"]
logical_topic = Topic.INGEST_PERFORMANCE_METRICS
topic_defn = get_topic_definition(logical_topic)
self.kafka_topic = ArroyoTopic(topic_defn["real_topic_name"])
cluster_name = topic_defn["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
self.producer: Producer = KafkaProducer(
build_kafka_configuration(default_config=producer_config)
Expand Down
Expand Up @@ -5,6 +5,7 @@
from typing import Any
from unittest.mock import patch

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing import StreamProcessor
from arroyo.utils import metrics
from confluent_kafka import Producer
Expand Down Expand Up @@ -62,7 +63,10 @@ def setUp(self) -> None:
KAFKA_TOPICS={
self.events_topic: {"cluster": "default"},
},
KAFKA_TOPIC_OVERRIDES={"events": self.events_topic, "transactions": self.events_topic},
KAFKA_TOPIC_OVERRIDES={
"events": self.events_topic,
"transactions": self.events_topic,
},
)

self.override_settings_cm.__enter__()
Expand All @@ -81,7 +85,7 @@ def tearDown(self) -> None:

def get_test_stream_processor(
self, mode: str, consumer_group: str, synchronize_commit_group: str
) -> StreamProcessor:
) -> StreamProcessor[KafkaPayload]:
return get_stream_processor(
consumer_name="post-process-forwarder-errors",
consumer_args=[f"--mode={mode}"],
Expand Down

0 comments on commit 7e29cd4

Please sign in to comment.