From 2143a593fd6b0952007d48f5c923c93d1a9aa704 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 5 Mar 2024 15:52:56 -0800 Subject: [PATCH 1/2] Revert "Revert "ref: Move more code to the new way of defining kafka topics and overrides (#66283)"" This reverts commit 622827e76320cbfe541b320f109bd04209a8d00d. --- src/sentry/conf/server.py | 10 ++++++++-- src/sentry/conf/types/kafka_definition.py | 2 +- src/sentry/conf/types/topic_definition.py | 2 ++ src/sentry/consumers/__init__.py | 9 +++++---- src/sentry/eventstream/kafka/backend.py | 18 ++++++++++-------- src/sentry/issues/attributes.py | 7 ++++--- src/sentry/issues/producer.py | 7 ++++--- src/sentry/monitors/tasks.py | 14 ++++++++------ src/sentry/replays/lib/kafka.py | 5 ++--- .../replays/usecases/ingest/dom_index.py | 5 ++--- src/sentry/runner/commands/devserver.py | 7 +++---- src/sentry/sentry_metrics/configuration.py | 8 +++++--- .../consumers/indexer/multiprocess.py | 3 ++- src/sentry/usage_accountant/accountant.py | 4 ++-- src/sentry/utils/kafka_config.py | 12 ++++++------ src/sentry/utils/outcomes.py | 5 +++-- tests/sentry/utils/test_outcomes.py | 17 ++++++----------- tests/snuba/incidents/test_tasks.py | 17 ++++++++++------- 18 files changed, 83 insertions(+), 69 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index a86d00344dbf1..e82f1a73b6fe0 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -21,7 +21,6 @@ from sentry.conf.types.logging_config import LoggingConfig from sentry.conf.types.role_dict import RoleDict from sentry.conf.types.sdk_config import ServerSdkConfig -from sentry.conf.types.topic_definition import TopicDefinition from sentry.utils import json # NOQA (used in getsentry config) from sentry.utils.celery import crontab_with_minute_jitter from sentry.utils.types import Type, type_from_value @@ -3529,9 +3528,16 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] "shared-resources-usage": "default", } +from typing import TypedDict + + +class LegacyTopicDefinition(TypedDict): + cluster: str + + # Cluster configuration for each Kafka topic by name. # DEPRECATED -KAFKA_TOPICS: Mapping[str, TopicDefinition] = { +KAFKA_TOPICS: Mapping[str, LegacyTopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, KAFKA_TRANSACTIONS: {"cluster": "default"}, diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 61820572647de..773272aec17fd 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -70,7 +70,7 @@ class ConsumerDefinition(TypedDict, total=False): synchronize_commit_group_default: str synchronize_commit_log_topic_default: str - dlq_topic: str + dlq_topic: Topic dlq_max_invalid_ratio: float | None dlq_max_consecutive_count: int | None diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index 41992b74d9ad7..bc5aaa44ddef8 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -5,3 +5,5 @@ class TopicDefinition(TypedDict): cluster: str + # The topic name may be overridden from the default via KAFKA_TOPIC_OVERRIDES + real_topic_name: str diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index a0d72d4b8760a..4f70c17fc3d3a 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -298,7 +298,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "ingest_profile": "release-health", }, - "dlq_topic": settings.KAFKA_INGEST_METRICS_DLQ, + "dlq_topic": Topic.INGEST_METRICS_DLQ, "dlq_max_invalid_ratio": 0.01, "dlq_max_consecutive_count": 1000, }, @@ -309,7 +309,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "ingest_profile": "performance", }, - "dlq_topic": settings.KAFKA_INGEST_GENERIC_METRICS_DLQ, + "dlq_topic": Topic.INGEST_GENERIC_METRICS_DLQ, "dlq_max_invalid_ratio": 0.01, "dlq_max_consecutive_count": 1000, }, @@ -517,7 +517,8 @@ def build_consumer_config(group_id: str): f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it" ) from e try: - cluster_setting = get_topic_definition(dlq_topic)["cluster"] + dlq_topic_defn = get_topic_definition(dlq_topic) + cluster_setting = dlq_topic_defn["cluster"] except ValueError as e: raise click.BadParameter( f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment" @@ -527,7 +528,7 @@ def build_consumer_config(group_id: str): dlq_producer = KafkaProducer(producer_config) dlq_policy = DlqPolicy( - KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)), + KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])), DlqLimit( max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"], max_consecutive_count=consumer_definition["dlq_max_consecutive_count"], diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index 4ec2aa728aa37..97fe9b98de843 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -7,9 +7,9 @@ from confluent_kafka import KafkaError from confluent_kafka import Message as KafkaMessage from confluent_kafka import Producer -from django.conf import settings from sentry import options +from sentry.conf.types.kafka_definition import Topic from sentry.eventstream.base import EventStreamEventType, GroupStates from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream from sentry.killswitches import killswitch_matches_context @@ -24,15 +24,15 @@ class KafkaEventStream(SnubaProtocolEventStream): def __init__(self, **options: Any) -> None: - self.topic = settings.KAFKA_EVENTS - self.transactions_topic = settings.KAFKA_TRANSACTIONS - self.issue_platform_topic = settings.KAFKA_EVENTSTREAM_GENERIC - self.__producers: MutableMapping[str, Producer] = {} + self.topic = Topic.EVENTS + self.transactions_topic = Topic.TRANSACTIONS + self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC + self.__producers: MutableMapping[Topic, Producer] = {} - def get_transactions_topic(self, project_id: int) -> str: + def get_transactions_topic(self, project_id: int) -> Topic: return self.transactions_topic - def get_producer(self, topic: str) -> Producer: + def get_producer(self, topic: Topic) -> Producer: if topic not in self.__producers: cluster_name = get_topic_definition(topic)["cluster"] cluster_options = get_kafka_producer_cluster_options(cluster_name) @@ -202,9 +202,11 @@ def _send( assert isinstance(extra_data, tuple) + real_topic = get_topic_definition(topic)["real_topic_name"] + try: producer.produce( - topic=topic, + topic=real_topic, key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None, value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data), on_delivery=self.delivery_callback, diff --git a/src/sentry/issues/attributes.py b/src/sentry/issues/attributes.py index 5e0e699662f77..9b15f83155856 100644 --- a/src/sentry/issues/attributes.py +++ b/src/sentry/issues/attributes.py @@ -6,7 +6,7 @@ import requests import urllib3 -from arroyo import Topic +from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from django.conf import settings from django.db.models import F, Window @@ -16,6 +16,7 @@ from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot from sentry import options +from sentry.conf.types.kafka_definition import Topic from sentry.models.group import Group from sentry.models.groupassignee import GroupAssignee from sentry.models.groupowner import GroupOwner, GroupOwnerType @@ -44,7 +45,7 @@ class GroupValues: def _get_attribute_snapshot_producer() -> KafkaProducer: - cluster_name = get_topic_definition(settings.KAFKA_GROUP_ATTRIBUTES)["cluster"] + cluster_name = get_topic_definition(Topic.GROUP_ATTRIBUTES)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -122,7 +123,7 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None: raise snuba.SnubaError(err) else: payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), []) - _attribute_snapshot_producer.produce(Topic(settings.KAFKA_GROUP_ATTRIBUTES), payload) + _attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload) def _retrieve_group_values(group_id: int) -> GroupValues: diff --git a/src/sentry/issues/producer.py b/src/sentry/issues/producer.py index 5acfef85adcf1..8cd499bbecae7 100644 --- a/src/sentry/issues/producer.py +++ b/src/sentry/issues/producer.py @@ -4,11 +4,12 @@ from collections.abc import MutableMapping from typing import Any, cast -from arroyo import Topic +from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from arroyo.types import Message, Value from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.run import process_message from sentry.issues.status_change_message import StatusChangeMessage @@ -33,7 +34,7 @@ class PayloadType(ValueEqualityEnum): def _get_occurrence_producer() -> KafkaProducer: - cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"] + cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -68,7 +69,7 @@ def produce_occurrence_to_kafka( process_message(Message(Value(payload=payload, committable={}))) return - _occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload) + _occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload) def _prepare_occurrence_message( diff --git a/src/sentry/monitors/tasks.py b/src/sentry/monitors/tasks.py index 79f86b62a7f40..82fd558235d5a 100644 --- a/src/sentry/monitors/tasks.py +++ b/src/sentry/monitors/tasks.py @@ -7,11 +7,13 @@ import msgpack import sentry_sdk -from arroyo import Partition, Topic +from arroyo import Partition +from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from confluent_kafka.admin import AdminClient, PartitionMetadata from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.constants import ObjectStatus from sentry.monitors.logic.mark_failed import mark_failed from sentry.monitors.schedule import get_prev_schedule @@ -50,7 +52,7 @@ def _get_producer() -> KafkaProducer: - cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"] + cluster_name = get_topic_definition(Topic.INGEST_MONITORS)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -62,10 +64,10 @@ def _get_producer() -> KafkaProducer: @lru_cache(maxsize=None) def _get_partitions() -> Mapping[int, PartitionMetadata]: - topic = settings.KAFKA_INGEST_MONITORS - cluster_name = get_topic_definition(topic)["cluster"] + topic_defn = get_topic_definition(Topic.INGEST_MONITORS) + topic = topic_defn["real_topic_name"] - conf = get_kafka_admin_cluster_options(cluster_name) + conf = get_kafka_admin_cluster_options(topic_defn["cluster"]) admin_client = AdminClient(conf) result = admin_client.list_topics(topic) topic_metadata = result.topics.get(topic) @@ -203,7 +205,7 @@ def clock_pulse(current_datetime=None): # topic. This is a requirement to ensure that none of the partitions stall, # since the global clock is tied to the slowest partition. for partition in _get_partitions().values(): - dest = Partition(Topic(settings.KAFKA_INGEST_MONITORS), partition.id) + dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id) _checkin_producer.produce(dest, payload) diff --git a/src/sentry/replays/lib/kafka.py b/src/sentry/replays/lib/kafka.py index 26ab2368e649c..2bde967b5faf0 100644 --- a/src/sentry/replays/lib/kafka.py +++ b/src/sentry/replays/lib/kafka.py @@ -1,5 +1,4 @@ -from django.conf import settings - +from sentry.conf.types.kafka_definition import Topic from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition from sentry.utils.pubsub import KafkaPublisher @@ -10,7 +9,7 @@ def initialize_replays_publisher(is_async=False) -> KafkaPublisher: global replay_publisher if replay_publisher is None: - config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS) + config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS) replay_publisher = KafkaPublisher( get_kafka_producer_cluster_options(config["cluster"]), asynchronous=is_async, diff --git a/src/sentry/replays/usecases/ingest/dom_index.py b/src/sentry/replays/usecases/ingest/dom_index.py index 80b626bb20c98..763162f521e3f 100644 --- a/src/sentry/replays/usecases/ingest/dom_index.py +++ b/src/sentry/replays/usecases/ingest/dom_index.py @@ -8,9 +8,8 @@ from hashlib import md5 from typing import Any, Literal, TypedDict, cast -from django.conf import settings - from sentry import features +from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.replays.usecases.ingest.events import SentryEvent from sentry.replays.usecases.ingest.issue_creation import ( @@ -219,7 +218,7 @@ def _initialize_publisher() -> KafkaPublisher: global replay_publisher if replay_publisher is None: - config = kafka_config.get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS) + config = kafka_config.get_topic_definition(Topic.INGEST_REPLAY_EVENTS) replay_publisher = KafkaPublisher( kafka_config.get_kafka_producer_cluster_options(config["cluster"]) ) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 038a66bfa7223..6e00c6890af29 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -366,12 +366,11 @@ def devserver( from sentry.conf.types.kafka_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics + from sentry.utils.kafka_config import get_topic_definition for topic in Topic: - default_name = topic.value - physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name) - cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name] - create_topics(cluster_name, [physical_name]) + topic_defn = get_topic_definition(topic) + create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]]) if dev_consumer: daemons.append( diff --git a/src/sentry/sentry_metrics/configuration.py b/src/sentry/sentry_metrics/configuration.py index eddebed13a322..a885712f379d6 100644 --- a/src/sentry/sentry_metrics/configuration.py +++ b/src/sentry/sentry_metrics/configuration.py @@ -10,6 +10,8 @@ import sentry_sdk +from sentry.conf.types.kafka_definition import Topic + # The maximum length of a column that is indexed in postgres. It is important to keep this in # sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py MAX_INDEXED_COLUMN_LENGTH = 200 @@ -46,7 +48,7 @@ class MetricsIngestConfiguration: db_backend: IndexerStorage db_backend_options: Mapping[str, Any] input_topic: str - output_topic: str + output_topic: Topic use_case_id: UseCaseKey internal_metrics_tag: str | None writes_limiter_cluster_options: Mapping[str, Any] @@ -79,7 +81,7 @@ def get_ingest_config( db_backend=IndexerStorage.POSTGRES, db_backend_options={}, input_topic=settings.KAFKA_INGEST_METRICS, - output_topic=settings.KAFKA_SNUBA_METRICS, + output_topic=Topic.SNUBA_METRICS, use_case_id=UseCaseKey.RELEASE_HEALTH, internal_metrics_tag="release-health", writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS, @@ -96,7 +98,7 @@ def get_ingest_config( db_backend=IndexerStorage.POSTGRES, db_backend_options={}, input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS, - output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS, + output_topic=Topic.SNUBA_GENERIC_METRICS, use_case_id=UseCaseKey.PERFORMANCE, internal_metrics_tag="perf", writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE, diff --git a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py index dd56520a20f52..8cb2fdd5639b2 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py +++ b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py @@ -10,6 +10,7 @@ from arroyo.types import Commit, FilteredPayload, Message, Partition from confluent_kafka import Producer +from sentry.conf.types.kafka_definition import Topic from sentry.utils import kafka_config, metrics logger = logging.getLogger(__name__) @@ -18,7 +19,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]): def __init__( self, - output_topic: str, + output_topic: Topic, commit_function: Commit, producer: AbstractProducer[KafkaPayload] | None = None, ) -> None: diff --git a/src/sentry/usage_accountant/accountant.py b/src/sentry/usage_accountant/accountant.py index 2ecf3c49f75c0..ee1e98a8c9cc8 100644 --- a/src/sentry/usage_accountant/accountant.py +++ b/src/sentry/usage_accountant/accountant.py @@ -12,9 +12,9 @@ from arroyo.backends.abstract import Producer from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration -from django.conf import settings from usageaccountant import UsageAccumulator, UsageUnit +from sentry.conf.types.kafka_definition import Topic from sentry.options import get from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -71,7 +71,7 @@ def record( if _accountant_backend is None: cluster_name = get_topic_definition( - settings.KAFKA_SHARED_RESOURCES_USAGE, + Topic.SHARED_RESOURCES_USAGE, )["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer = KafkaProducer( diff --git a/src/sentry/utils/kafka_config.py b/src/sentry/utils/kafka_config.py index 2ca53a67bf3a4..93e3c4fc87a12 100644 --- a/src/sentry/utils/kafka_config.py +++ b/src/sentry/utils/kafka_config.py @@ -3,6 +3,7 @@ from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.conf.types.topic_definition import TopicDefinition SUPPORTED_KAFKA_CONFIGURATION = ( @@ -96,9 +97,8 @@ def get_kafka_admin_cluster_options( ) -def get_topic_definition(topic: str) -> TopicDefinition: - defn = settings.KAFKA_TOPICS.get(topic) - if defn is not None: - return defn - else: - raise ValueError(f"Unknown {topic=}") +def get_topic_definition(topic: Topic) -> TopicDefinition: + return { + "cluster": settings.KAFKA_TOPIC_TO_CLUSTER[topic.value], + "real_topic_name": settings.KAFKA_TOPIC_OVERRIDES.get(topic.value, topic.value), + } diff --git a/src/sentry/utils/outcomes.py b/src/sentry/utils/outcomes.py index 4aa2951b4bc0a..19774c0a294a0 100644 --- a/src/sentry/utils/outcomes.py +++ b/src/sentry/utils/outcomes.py @@ -6,6 +6,7 @@ from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.constants import DataCategory from sentry.utils import json, kafka_config, metrics from sentry.utils.dates import to_datetime @@ -72,8 +73,8 @@ def track_outcome( assert isinstance(category, (type(None), DataCategory)) assert isinstance(quantity, int) - outcomes_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES) - billing_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES_BILLING) + outcomes_config = kafka_config.get_topic_definition(Topic.OUTCOMES) + billing_config = kafka_config.get_topic_definition(Topic.OUTCOMES_BILLING) use_billing = outcome.is_billing() diff --git a/tests/sentry/utils/test_outcomes.py b/tests/sentry/utils/test_outcomes.py index 65a476143d05e..c7f6a479c47f0 100644 --- a/tests/sentry/utils/test_outcomes.py +++ b/tests/sentry/utils/test_outcomes.py @@ -4,6 +4,7 @@ import pytest from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.utils import json, kafka_config, outcomes from sentry.utils.outcomes import Outcome, track_outcome @@ -79,9 +80,7 @@ def test_track_outcome_default(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == ( - kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], - ) + assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, payload), _ = setup.mock_publisher.return_value.publish.call_args @@ -117,7 +116,7 @@ def test_track_outcome_billing(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == (kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"],) + assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, _), _ = setup.mock_publisher.return_value.publish.call_args @@ -136,7 +135,7 @@ def test_track_outcome_billing_topic(setup): settings.KAFKA_TOPICS, { settings.KAFKA_OUTCOMES_BILLING: { - "cluster": kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], + "cluster": kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"], } }, ): @@ -148,9 +147,7 @@ def test_track_outcome_billing_topic(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == ( - kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], - ) + assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, _), _ = setup.mock_publisher.return_value.publish.call_args @@ -164,9 +161,7 @@ def test_track_outcome_billing_cluster(settings, setup): Checks that outcomes are routed to the dedicated cluster and topic. """ - with mock.patch.dict( - settings.KAFKA_TOPICS, {settings.KAFKA_OUTCOMES_BILLING: {"cluster": "different"}} - ): + with mock.patch.dict(settings.KAFKA_TOPIC_TO_CLUSTER, {"outcomes-billing": "different"}): track_outcome( org_id=1, project_id=1, diff --git a/tests/snuba/incidents/test_tasks.py b/tests/snuba/incidents/test_tasks.py index 9354d9bb79c0e..3bb5185f07275 100644 --- a/tests/snuba/incidents/test_tasks.py +++ b/tests/snuba/incidents/test_tasks.py @@ -8,6 +8,7 @@ from django.conf import settings from django.core import mail +from sentry.conf.types.kafka_definition import Topic from sentry.incidents.action_handlers import ( EmailActionHandler, generate_incident_trigger_email_context, @@ -40,7 +41,7 @@ class HandleSnubaQueryUpdateTest(TestCase): def setUp(self): super().setUp() - self.topic = "metrics-subscription-results" + self.topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS self.orig_registry = deepcopy(subscriber_registry) cluster_options = kafka_config.get_kafka_admin_cluster_options( @@ -48,15 +49,18 @@ def setUp(self): ) self.admin_client = AdminClient(cluster_options) - kafka_cluster = kafka_config.get_topic_definition(self.topic)["cluster"] - create_topics(kafka_cluster, [self.topic]) + topic_defn = kafka_config.get_topic_definition(self.topic) + self.real_topic = topic_defn["real_topic_name"] + self.cluster = topic_defn["cluster"] + + create_topics(self.cluster, [self.real_topic]) def tearDown(self): super().tearDown() subscriber_registry.clear() subscriber_registry.update(self.orig_registry) - self.admin_client.delete_topics([self.topic]) + self.admin_client.delete_topics([self.real_topic]) metrics._metrics_backend = None @cached_property @@ -93,9 +97,8 @@ def action(self): @cached_property def producer(self): - cluster_name = kafka_config.get_topic_definition(self.topic)["cluster"] conf = { - "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][ + "bootstrap.servers": settings.KAFKA_CLUSTERS[self.cluster]["common"][ "bootstrap.servers" ], "session.timeout.ms": 6000, @@ -129,7 +132,7 @@ def run_test(self, consumer): "timestamp": "2020-01-01T01:23:45.1234", }, } - self.producer.produce(self.topic, json.dumps(message)) + self.producer.produce(self.real_topic, json.dumps(message)) self.producer.flush() def active_incident(): From 43d7276c73e42f9a15dd712144c424d025152108 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 5 Mar 2024 15:53:25 -0800 Subject: [PATCH 2/2] fix indexer strategy --- src/sentry/sentry_metrics/consumers/indexer/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py index 8cb2fdd5639b2..4dbd6a27f54d0 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py +++ b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py @@ -27,7 +27,7 @@ def __init__( self.__producer = Producer( kafka_config.get_kafka_producer_cluster_options(snuba_metrics["cluster"]), ) - self.__producer_topic = output_topic + self.__producer_topic = snuba_metrics["real_topic_name"] self.__commit_function = commit_function self.__closed = False