From db28b55aaa83e263e2c57d1714bbb9a78e4c484b Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Tue, 5 Mar 2024 21:49:11 +0000 Subject: [PATCH] Revert "ref: Move more code to the new way of defining kafka topics and overrides (#66283)" This reverts commit f7ffe5a3d31a87ac334c191cc8eb9550d2e1ebce. Co-authored-by: ayirr7 <47572810+ayirr7@users.noreply.github.com> --- src/sentry/conf/server.py | 10 ++-------- src/sentry/conf/types/kafka_definition.py | 2 +- src/sentry/conf/types/topic_definition.py | 2 -- src/sentry/consumers/__init__.py | 9 ++++----- src/sentry/eventstream/kafka/backend.py | 18 ++++++++---------- src/sentry/issues/attributes.py | 7 +++---- src/sentry/issues/producer.py | 7 +++---- src/sentry/monitors/tasks.py | 14 ++++++-------- src/sentry/replays/lib/kafka.py | 5 +++-- .../replays/usecases/ingest/dom_index.py | 5 +++-- src/sentry/runner/commands/devserver.py | 7 ++++--- src/sentry/sentry_metrics/configuration.py | 8 +++----- .../consumers/indexer/multiprocess.py | 3 +-- src/sentry/usage_accountant/accountant.py | 4 ++-- src/sentry/utils/kafka_config.py | 12 ++++++------ src/sentry/utils/outcomes.py | 5 ++--- tests/sentry/utils/test_outcomes.py | 17 +++++++++++------ tests/snuba/incidents/test_tasks.py | 17 +++++++---------- 18 files changed, 69 insertions(+), 83 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 7162fd7e94dc0..63344a5753d3c 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -21,6 +21,7 @@ from sentry.conf.types.logging_config import LoggingConfig from sentry.conf.types.role_dict import RoleDict from sentry.conf.types.sdk_config import ServerSdkConfig +from sentry.conf.types.topic_definition import TopicDefinition from sentry.utils import json # NOQA (used in getsentry config) from sentry.utils.celery import crontab_with_minute_jitter from sentry.utils.types import Type, type_from_value @@ -3528,16 +3529,9 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] "shared-resources-usage": "default", } -from typing import TypedDict - - -class LegacyTopicDefinition(TypedDict): - cluster: str - - # Cluster configuration for each Kafka topic by name. # DEPRECATED -KAFKA_TOPICS: Mapping[str, LegacyTopicDefinition] = { +KAFKA_TOPICS: Mapping[str, TopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, KAFKA_TRANSACTIONS: {"cluster": "default"}, diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 019850320ec94..f9f38687453b7 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -71,7 +71,7 @@ class ConsumerDefinition(TypedDict, total=False): synchronize_commit_group_default: str synchronize_commit_log_topic_default: str - dlq_topic: Topic + dlq_topic: str dlq_max_invalid_ratio: float | None dlq_max_consecutive_count: int | None diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index bc5aaa44ddef8..41992b74d9ad7 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -5,5 +5,3 @@ class TopicDefinition(TypedDict): cluster: str - # The topic name may be overridden from the default via KAFKA_TOPIC_OVERRIDES - real_topic_name: str diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 68aa34db767c4..a74b106078414 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -298,7 +298,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "ingest_profile": "release-health", }, - "dlq_topic": Topic.INGEST_METRICS_DLQ, + "dlq_topic": settings.KAFKA_INGEST_METRICS_DLQ, "dlq_max_invalid_ratio": 0.01, "dlq_max_consecutive_count": 1000, }, @@ -309,7 +309,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "ingest_profile": "performance", }, - "dlq_topic": Topic.INGEST_GENERIC_METRICS_DLQ, + "dlq_topic": settings.KAFKA_INGEST_GENERIC_METRICS_DLQ, "dlq_max_invalid_ratio": 0.01, "dlq_max_consecutive_count": 1000, }, @@ -517,8 +517,7 @@ def build_consumer_config(group_id: str): f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it" ) from e try: - dlq_topic_defn = get_topic_definition(dlq_topic) - cluster_setting = dlq_topic_defn["cluster"] + cluster_setting = get_topic_definition(dlq_topic)["cluster"] except ValueError as e: raise click.BadParameter( f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment" @@ -528,7 +527,7 @@ def build_consumer_config(group_id: str): dlq_producer = KafkaProducer(producer_config) dlq_policy = DlqPolicy( - KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])), + KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)), DlqLimit( max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"], max_consecutive_count=consumer_definition["dlq_max_consecutive_count"], diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index 97fe9b98de843..4ec2aa728aa37 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -7,9 +7,9 @@ from confluent_kafka import KafkaError from confluent_kafka import Message as KafkaMessage from confluent_kafka import Producer +from django.conf import settings from sentry import options -from sentry.conf.types.kafka_definition import Topic from sentry.eventstream.base import EventStreamEventType, GroupStates from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream from sentry.killswitches import killswitch_matches_context @@ -24,15 +24,15 @@ class KafkaEventStream(SnubaProtocolEventStream): def __init__(self, **options: Any) -> None: - self.topic = Topic.EVENTS - self.transactions_topic = Topic.TRANSACTIONS - self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC - self.__producers: MutableMapping[Topic, Producer] = {} + self.topic = settings.KAFKA_EVENTS + self.transactions_topic = settings.KAFKA_TRANSACTIONS + self.issue_platform_topic = settings.KAFKA_EVENTSTREAM_GENERIC + self.__producers: MutableMapping[str, Producer] = {} - def get_transactions_topic(self, project_id: int) -> Topic: + def get_transactions_topic(self, project_id: int) -> str: return self.transactions_topic - def get_producer(self, topic: Topic) -> Producer: + def get_producer(self, topic: str) -> Producer: if topic not in self.__producers: cluster_name = get_topic_definition(topic)["cluster"] cluster_options = get_kafka_producer_cluster_options(cluster_name) @@ -202,11 +202,9 @@ def _send( assert isinstance(extra_data, tuple) - real_topic = get_topic_definition(topic)["real_topic_name"] - try: producer.produce( - topic=real_topic, + topic=topic, key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None, value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data), on_delivery=self.delivery_callback, diff --git a/src/sentry/issues/attributes.py b/src/sentry/issues/attributes.py index 9b15f83155856..5e0e699662f77 100644 --- a/src/sentry/issues/attributes.py +++ b/src/sentry/issues/attributes.py @@ -6,7 +6,7 @@ import requests import urllib3 -from arroyo import Topic as ArroyoTopic +from arroyo import Topic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from django.conf import settings from django.db.models import F, Window @@ -16,7 +16,6 @@ from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot from sentry import options -from sentry.conf.types.kafka_definition import Topic from sentry.models.group import Group from sentry.models.groupassignee import GroupAssignee from sentry.models.groupowner import GroupOwner, GroupOwnerType @@ -45,7 +44,7 @@ class GroupValues: def _get_attribute_snapshot_producer() -> KafkaProducer: - cluster_name = get_topic_definition(Topic.GROUP_ATTRIBUTES)["cluster"] + cluster_name = get_topic_definition(settings.KAFKA_GROUP_ATTRIBUTES)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -123,7 +122,7 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None: raise snuba.SnubaError(err) else: payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), []) - _attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload) + _attribute_snapshot_producer.produce(Topic(settings.KAFKA_GROUP_ATTRIBUTES), payload) def _retrieve_group_values(group_id: int) -> GroupValues: diff --git a/src/sentry/issues/producer.py b/src/sentry/issues/producer.py index 8cd499bbecae7..5acfef85adcf1 100644 --- a/src/sentry/issues/producer.py +++ b/src/sentry/issues/producer.py @@ -4,12 +4,11 @@ from collections.abc import MutableMapping from typing import Any, cast -from arroyo import Topic as ArroyoTopic +from arroyo import Topic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from arroyo.types import Message, Value from django.conf import settings -from sentry.conf.types.kafka_definition import Topic from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.run import process_message from sentry.issues.status_change_message import StatusChangeMessage @@ -34,7 +33,7 @@ class PayloadType(ValueEqualityEnum): def _get_occurrence_producer() -> KafkaProducer: - cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"] + cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -69,7 +68,7 @@ def produce_occurrence_to_kafka( process_message(Message(Value(payload=payload, committable={}))) return - _occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload) + _occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload) def _prepare_occurrence_message( diff --git a/src/sentry/monitors/tasks.py b/src/sentry/monitors/tasks.py index 82fd558235d5a..79f86b62a7f40 100644 --- a/src/sentry/monitors/tasks.py +++ b/src/sentry/monitors/tasks.py @@ -7,13 +7,11 @@ import msgpack import sentry_sdk -from arroyo import Partition -from arroyo import Topic as ArroyoTopic +from arroyo import Partition, Topic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from confluent_kafka.admin import AdminClient, PartitionMetadata from django.conf import settings -from sentry.conf.types.kafka_definition import Topic from sentry.constants import ObjectStatus from sentry.monitors.logic.mark_failed import mark_failed from sentry.monitors.schedule import get_prev_schedule @@ -52,7 +50,7 @@ def _get_producer() -> KafkaProducer: - cluster_name = get_topic_definition(Topic.INGEST_MONITORS)["cluster"] + cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -64,10 +62,10 @@ def _get_producer() -> KafkaProducer: @lru_cache(maxsize=None) def _get_partitions() -> Mapping[int, PartitionMetadata]: - topic_defn = get_topic_definition(Topic.INGEST_MONITORS) - topic = topic_defn["real_topic_name"] + topic = settings.KAFKA_INGEST_MONITORS + cluster_name = get_topic_definition(topic)["cluster"] - conf = get_kafka_admin_cluster_options(topic_defn["cluster"]) + conf = get_kafka_admin_cluster_options(cluster_name) admin_client = AdminClient(conf) result = admin_client.list_topics(topic) topic_metadata = result.topics.get(topic) @@ -205,7 +203,7 @@ def clock_pulse(current_datetime=None): # topic. This is a requirement to ensure that none of the partitions stall, # since the global clock is tied to the slowest partition. for partition in _get_partitions().values(): - dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id) + dest = Partition(Topic(settings.KAFKA_INGEST_MONITORS), partition.id) _checkin_producer.produce(dest, payload) diff --git a/src/sentry/replays/lib/kafka.py b/src/sentry/replays/lib/kafka.py index 2bde967b5faf0..26ab2368e649c 100644 --- a/src/sentry/replays/lib/kafka.py +++ b/src/sentry/replays/lib/kafka.py @@ -1,4 +1,5 @@ -from sentry.conf.types.kafka_definition import Topic +from django.conf import settings + from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition from sentry.utils.pubsub import KafkaPublisher @@ -9,7 +10,7 @@ def initialize_replays_publisher(is_async=False) -> KafkaPublisher: global replay_publisher if replay_publisher is None: - config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS) + config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS) replay_publisher = KafkaPublisher( get_kafka_producer_cluster_options(config["cluster"]), asynchronous=is_async, diff --git a/src/sentry/replays/usecases/ingest/dom_index.py b/src/sentry/replays/usecases/ingest/dom_index.py index 763162f521e3f..80b626bb20c98 100644 --- a/src/sentry/replays/usecases/ingest/dom_index.py +++ b/src/sentry/replays/usecases/ingest/dom_index.py @@ -8,8 +8,9 @@ from hashlib import md5 from typing import Any, Literal, TypedDict, cast +from django.conf import settings + from sentry import features -from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.replays.usecases.ingest.events import SentryEvent from sentry.replays.usecases.ingest.issue_creation import ( @@ -218,7 +219,7 @@ def _initialize_publisher() -> KafkaPublisher: global replay_publisher if replay_publisher is None: - config = kafka_config.get_topic_definition(Topic.INGEST_REPLAY_EVENTS) + config = kafka_config.get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS) replay_publisher = KafkaPublisher( kafka_config.get_kafka_producer_cluster_options(config["cluster"]) ) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 6e00c6890af29..038a66bfa7223 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -366,11 +366,12 @@ def devserver( from sentry.conf.types.kafka_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics - from sentry.utils.kafka_config import get_topic_definition for topic in Topic: - topic_defn = get_topic_definition(topic) - create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]]) + default_name = topic.value + physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name) + cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name] + create_topics(cluster_name, [physical_name]) if dev_consumer: daemons.append( diff --git a/src/sentry/sentry_metrics/configuration.py b/src/sentry/sentry_metrics/configuration.py index a885712f379d6..eddebed13a322 100644 --- a/src/sentry/sentry_metrics/configuration.py +++ b/src/sentry/sentry_metrics/configuration.py @@ -10,8 +10,6 @@ import sentry_sdk -from sentry.conf.types.kafka_definition import Topic - # The maximum length of a column that is indexed in postgres. It is important to keep this in # sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py MAX_INDEXED_COLUMN_LENGTH = 200 @@ -48,7 +46,7 @@ class MetricsIngestConfiguration: db_backend: IndexerStorage db_backend_options: Mapping[str, Any] input_topic: str - output_topic: Topic + output_topic: str use_case_id: UseCaseKey internal_metrics_tag: str | None writes_limiter_cluster_options: Mapping[str, Any] @@ -81,7 +79,7 @@ def get_ingest_config( db_backend=IndexerStorage.POSTGRES, db_backend_options={}, input_topic=settings.KAFKA_INGEST_METRICS, - output_topic=Topic.SNUBA_METRICS, + output_topic=settings.KAFKA_SNUBA_METRICS, use_case_id=UseCaseKey.RELEASE_HEALTH, internal_metrics_tag="release-health", writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS, @@ -98,7 +96,7 @@ def get_ingest_config( db_backend=IndexerStorage.POSTGRES, db_backend_options={}, input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS, - output_topic=Topic.SNUBA_GENERIC_METRICS, + output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS, use_case_id=UseCaseKey.PERFORMANCE, internal_metrics_tag="perf", writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE, diff --git a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py index 8cb2fdd5639b2..dd56520a20f52 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py +++ b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py @@ -10,7 +10,6 @@ from arroyo.types import Commit, FilteredPayload, Message, Partition from confluent_kafka import Producer -from sentry.conf.types.kafka_definition import Topic from sentry.utils import kafka_config, metrics logger = logging.getLogger(__name__) @@ -19,7 +18,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]): def __init__( self, - output_topic: Topic, + output_topic: str, commit_function: Commit, producer: AbstractProducer[KafkaPayload] | None = None, ) -> None: diff --git a/src/sentry/usage_accountant/accountant.py b/src/sentry/usage_accountant/accountant.py index ee1e98a8c9cc8..2ecf3c49f75c0 100644 --- a/src/sentry/usage_accountant/accountant.py +++ b/src/sentry/usage_accountant/accountant.py @@ -12,9 +12,9 @@ from arroyo.backends.abstract import Producer from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration +from django.conf import settings from usageaccountant import UsageAccumulator, UsageUnit -from sentry.conf.types.kafka_definition import Topic from sentry.options import get from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -71,7 +71,7 @@ def record( if _accountant_backend is None: cluster_name = get_topic_definition( - Topic.SHARED_RESOURCES_USAGE, + settings.KAFKA_SHARED_RESOURCES_USAGE, )["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer = KafkaProducer( diff --git a/src/sentry/utils/kafka_config.py b/src/sentry/utils/kafka_config.py index 93e3c4fc87a12..2ca53a67bf3a4 100644 --- a/src/sentry/utils/kafka_config.py +++ b/src/sentry/utils/kafka_config.py @@ -3,7 +3,6 @@ from django.conf import settings -from sentry.conf.types.kafka_definition import Topic from sentry.conf.types.topic_definition import TopicDefinition SUPPORTED_KAFKA_CONFIGURATION = ( @@ -97,8 +96,9 @@ def get_kafka_admin_cluster_options( ) -def get_topic_definition(topic: Topic) -> TopicDefinition: - return { - "cluster": settings.KAFKA_TOPIC_TO_CLUSTER[topic.value], - "real_topic_name": settings.KAFKA_TOPIC_OVERRIDES.get(topic.value, topic.value), - } +def get_topic_definition(topic: str) -> TopicDefinition: + defn = settings.KAFKA_TOPICS.get(topic) + if defn is not None: + return defn + else: + raise ValueError(f"Unknown {topic=}") diff --git a/src/sentry/utils/outcomes.py b/src/sentry/utils/outcomes.py index 19774c0a294a0..4aa2951b4bc0a 100644 --- a/src/sentry/utils/outcomes.py +++ b/src/sentry/utils/outcomes.py @@ -6,7 +6,6 @@ from django.conf import settings -from sentry.conf.types.kafka_definition import Topic from sentry.constants import DataCategory from sentry.utils import json, kafka_config, metrics from sentry.utils.dates import to_datetime @@ -73,8 +72,8 @@ def track_outcome( assert isinstance(category, (type(None), DataCategory)) assert isinstance(quantity, int) - outcomes_config = kafka_config.get_topic_definition(Topic.OUTCOMES) - billing_config = kafka_config.get_topic_definition(Topic.OUTCOMES_BILLING) + outcomes_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES) + billing_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES_BILLING) use_billing = outcome.is_billing() diff --git a/tests/sentry/utils/test_outcomes.py b/tests/sentry/utils/test_outcomes.py index c7f6a479c47f0..65a476143d05e 100644 --- a/tests/sentry/utils/test_outcomes.py +++ b/tests/sentry/utils/test_outcomes.py @@ -4,7 +4,6 @@ import pytest from django.conf import settings -from sentry.conf.types.kafka_definition import Topic from sentry.utils import json, kafka_config, outcomes from sentry.utils.outcomes import Outcome, track_outcome @@ -80,7 +79,9 @@ def test_track_outcome_default(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) + assert cluster_args == ( + kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], + ) assert outcomes.outcomes_publisher (topic_name, payload), _ = setup.mock_publisher.return_value.publish.call_args @@ -116,7 +117,7 @@ def test_track_outcome_billing(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) + assert cluster_args == (kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, _), _ = setup.mock_publisher.return_value.publish.call_args @@ -135,7 +136,7 @@ def test_track_outcome_billing_topic(setup): settings.KAFKA_TOPICS, { settings.KAFKA_OUTCOMES_BILLING: { - "cluster": kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"], + "cluster": kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], } }, ): @@ -147,7 +148,9 @@ def test_track_outcome_billing_topic(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) + assert cluster_args == ( + kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], + ) assert outcomes.outcomes_publisher (topic_name, _), _ = setup.mock_publisher.return_value.publish.call_args @@ -161,7 +164,9 @@ def test_track_outcome_billing_cluster(settings, setup): Checks that outcomes are routed to the dedicated cluster and topic. """ - with mock.patch.dict(settings.KAFKA_TOPIC_TO_CLUSTER, {"outcomes-billing": "different"}): + with mock.patch.dict( + settings.KAFKA_TOPICS, {settings.KAFKA_OUTCOMES_BILLING: {"cluster": "different"}} + ): track_outcome( org_id=1, project_id=1, diff --git a/tests/snuba/incidents/test_tasks.py b/tests/snuba/incidents/test_tasks.py index 3bb5185f07275..9354d9bb79c0e 100644 --- a/tests/snuba/incidents/test_tasks.py +++ b/tests/snuba/incidents/test_tasks.py @@ -8,7 +8,6 @@ from django.conf import settings from django.core import mail -from sentry.conf.types.kafka_definition import Topic from sentry.incidents.action_handlers import ( EmailActionHandler, generate_incident_trigger_email_context, @@ -41,7 +40,7 @@ class HandleSnubaQueryUpdateTest(TestCase): def setUp(self): super().setUp() - self.topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS + self.topic = "metrics-subscription-results" self.orig_registry = deepcopy(subscriber_registry) cluster_options = kafka_config.get_kafka_admin_cluster_options( @@ -49,18 +48,15 @@ def setUp(self): ) self.admin_client = AdminClient(cluster_options) - topic_defn = kafka_config.get_topic_definition(self.topic) - self.real_topic = topic_defn["real_topic_name"] - self.cluster = topic_defn["cluster"] - - create_topics(self.cluster, [self.real_topic]) + kafka_cluster = kafka_config.get_topic_definition(self.topic)["cluster"] + create_topics(kafka_cluster, [self.topic]) def tearDown(self): super().tearDown() subscriber_registry.clear() subscriber_registry.update(self.orig_registry) - self.admin_client.delete_topics([self.real_topic]) + self.admin_client.delete_topics([self.topic]) metrics._metrics_backend = None @cached_property @@ -97,8 +93,9 @@ def action(self): @cached_property def producer(self): + cluster_name = kafka_config.get_topic_definition(self.topic)["cluster"] conf = { - "bootstrap.servers": settings.KAFKA_CLUSTERS[self.cluster]["common"][ + "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][ "bootstrap.servers" ], "session.timeout.ms": 6000, @@ -132,7 +129,7 @@ def run_test(self, consumer): "timestamp": "2020-01-01T01:23:45.1234", }, } - self.producer.produce(self.real_topic, json.dumps(message)) + self.producer.produce(self.topic, json.dumps(message)) self.producer.flush() def active_incident():