From 1221c853c926af252744780d02792e3803c4a5de Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 4 Mar 2024 17:17:16 -0800 Subject: [PATCH 1/5] ref: Move more code to the new way of defining kafka topics and overrides This moves the `dlq_topic` consumer definitions and anything that uses the `get_topic_definition` helper function over to the new style kafka topic config. --- src/sentry/conf/server.py | 10 ++++++++-- src/sentry/conf/types/kafka_definition.py | 2 +- src/sentry/conf/types/topic_definition.py | 2 ++ src/sentry/consumers/__init__.py | 9 +++++---- src/sentry/eventstream/kafka/backend.py | 14 +++++++------- src/sentry/issues/attributes.py | 7 ++++--- src/sentry/issues/producer.py | 7 ++++--- src/sentry/monitors/tasks.py | 14 ++++++++------ src/sentry/replays/lib/kafka.py | 5 ++--- src/sentry/replays/usecases/ingest/dom_index.py | 5 ++--- src/sentry/runner/commands/devserver.py | 7 +++---- src/sentry/sentry_metrics/configuration.py | 8 +++++--- .../consumers/indexer/multiprocess.py | 3 ++- src/sentry/usage_accountant/accountant.py | 4 ++-- src/sentry/utils/kafka_config.py | 12 ++++++------ src/sentry/utils/outcomes.py | 5 +++-- tests/sentry/utils/test_outcomes.py | 13 +++++-------- tests/snuba/incidents/test_tasks.py | 7 ++++--- 18 files changed, 73 insertions(+), 61 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 8ca7d3f38e492..08ff865e4ca61 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -21,7 +21,6 @@ from sentry.conf.types.logging_config import LoggingConfig from sentry.conf.types.role_dict import RoleDict from sentry.conf.types.sdk_config import ServerSdkConfig -from sentry.conf.types.topic_definition import TopicDefinition from sentry.utils import json # NOQA (used in getsentry config) from sentry.utils.celery import crontab_with_minute_jitter from sentry.utils.types import Type, type_from_value @@ -3531,9 +3530,16 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] "shared-resources-usage": "default", } +from typing import TypedDict + + +class LegacyTopicDefinition(TypedDict): + cluster: str + + # Cluster configuration for each Kafka topic by name. # DEPRECATED -KAFKA_TOPICS: Mapping[str, TopicDefinition] = { +KAFKA_TOPICS: Mapping[str, LegacyTopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, KAFKA_TRANSACTIONS: {"cluster": "default"}, diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 61820572647de..773272aec17fd 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -70,7 +70,7 @@ class ConsumerDefinition(TypedDict, total=False): synchronize_commit_group_default: str synchronize_commit_log_topic_default: str - dlq_topic: str + dlq_topic: Topic dlq_max_invalid_ratio: float | None dlq_max_consecutive_count: int | None diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index 41992b74d9ad7..bc5aaa44ddef8 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -5,3 +5,5 @@ class TopicDefinition(TypedDict): cluster: str + # The topic name may be overridden from the default via KAFKA_TOPIC_OVERRIDES + real_topic_name: str diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index a0d72d4b8760a..4f70c17fc3d3a 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -298,7 +298,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "ingest_profile": "release-health", }, - "dlq_topic": settings.KAFKA_INGEST_METRICS_DLQ, + "dlq_topic": Topic.INGEST_METRICS_DLQ, "dlq_max_invalid_ratio": 0.01, "dlq_max_consecutive_count": 1000, }, @@ -309,7 +309,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "ingest_profile": "performance", }, - "dlq_topic": settings.KAFKA_INGEST_GENERIC_METRICS_DLQ, + "dlq_topic": Topic.INGEST_GENERIC_METRICS_DLQ, "dlq_max_invalid_ratio": 0.01, "dlq_max_consecutive_count": 1000, }, @@ -517,7 +517,8 @@ def build_consumer_config(group_id: str): f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it" ) from e try: - cluster_setting = get_topic_definition(dlq_topic)["cluster"] + dlq_topic_defn = get_topic_definition(dlq_topic) + cluster_setting = dlq_topic_defn["cluster"] except ValueError as e: raise click.BadParameter( f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment" @@ -527,7 +528,7 @@ def build_consumer_config(group_id: str): dlq_producer = KafkaProducer(producer_config) dlq_policy = DlqPolicy( - KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)), + KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])), DlqLimit( max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"], max_consecutive_count=consumer_definition["dlq_max_consecutive_count"], diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index 4ec2aa728aa37..b5bff45bb4c7e 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -7,9 +7,9 @@ from confluent_kafka import KafkaError from confluent_kafka import Message as KafkaMessage from confluent_kafka import Producer -from django.conf import settings from sentry import options +from sentry.conf.types.kafka_definition import Topic from sentry.eventstream.base import EventStreamEventType, GroupStates from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream from sentry.killswitches import killswitch_matches_context @@ -24,15 +24,15 @@ class KafkaEventStream(SnubaProtocolEventStream): def __init__(self, **options: Any) -> None: - self.topic = settings.KAFKA_EVENTS - self.transactions_topic = settings.KAFKA_TRANSACTIONS - self.issue_platform_topic = settings.KAFKA_EVENTSTREAM_GENERIC - self.__producers: MutableMapping[str, Producer] = {} + self.topic = Topic.EVENTS + self.transactions_topic = Topic.TRANSACTIONS + self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC + self.__producers: MutableMapping[Topic, Producer] = {} - def get_transactions_topic(self, project_id: int) -> str: + def get_transactions_topic(self, project_id: int) -> Topic: return self.transactions_topic - def get_producer(self, topic: str) -> Producer: + def get_producer(self, topic: Topic) -> Producer: if topic not in self.__producers: cluster_name = get_topic_definition(topic)["cluster"] cluster_options = get_kafka_producer_cluster_options(cluster_name) diff --git a/src/sentry/issues/attributes.py b/src/sentry/issues/attributes.py index 5e0e699662f77..9b15f83155856 100644 --- a/src/sentry/issues/attributes.py +++ b/src/sentry/issues/attributes.py @@ -6,7 +6,7 @@ import requests import urllib3 -from arroyo import Topic +from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from django.conf import settings from django.db.models import F, Window @@ -16,6 +16,7 @@ from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot from sentry import options +from sentry.conf.types.kafka_definition import Topic from sentry.models.group import Group from sentry.models.groupassignee import GroupAssignee from sentry.models.groupowner import GroupOwner, GroupOwnerType @@ -44,7 +45,7 @@ class GroupValues: def _get_attribute_snapshot_producer() -> KafkaProducer: - cluster_name = get_topic_definition(settings.KAFKA_GROUP_ATTRIBUTES)["cluster"] + cluster_name = get_topic_definition(Topic.GROUP_ATTRIBUTES)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -122,7 +123,7 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None: raise snuba.SnubaError(err) else: payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), []) - _attribute_snapshot_producer.produce(Topic(settings.KAFKA_GROUP_ATTRIBUTES), payload) + _attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload) def _retrieve_group_values(group_id: int) -> GroupValues: diff --git a/src/sentry/issues/producer.py b/src/sentry/issues/producer.py index 5acfef85adcf1..8cd499bbecae7 100644 --- a/src/sentry/issues/producer.py +++ b/src/sentry/issues/producer.py @@ -4,11 +4,12 @@ from collections.abc import MutableMapping from typing import Any, cast -from arroyo import Topic +from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from arroyo.types import Message, Value from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.run import process_message from sentry.issues.status_change_message import StatusChangeMessage @@ -33,7 +34,7 @@ class PayloadType(ValueEqualityEnum): def _get_occurrence_producer() -> KafkaProducer: - cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"] + cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -68,7 +69,7 @@ def produce_occurrence_to_kafka( process_message(Message(Value(payload=payload, committable={}))) return - _occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload) + _occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload) def _prepare_occurrence_message( diff --git a/src/sentry/monitors/tasks.py b/src/sentry/monitors/tasks.py index 79f86b62a7f40..82fd558235d5a 100644 --- a/src/sentry/monitors/tasks.py +++ b/src/sentry/monitors/tasks.py @@ -7,11 +7,13 @@ import msgpack import sentry_sdk -from arroyo import Partition, Topic +from arroyo import Partition +from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from confluent_kafka.admin import AdminClient, PartitionMetadata from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.constants import ObjectStatus from sentry.monitors.logic.mark_failed import mark_failed from sentry.monitors.schedule import get_prev_schedule @@ -50,7 +52,7 @@ def _get_producer() -> KafkaProducer: - cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"] + cluster_name = get_topic_definition(Topic.INGEST_MONITORS)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer_config.pop("compression.type", None) producer_config.pop("message.max.bytes", None) @@ -62,10 +64,10 @@ def _get_producer() -> KafkaProducer: @lru_cache(maxsize=None) def _get_partitions() -> Mapping[int, PartitionMetadata]: - topic = settings.KAFKA_INGEST_MONITORS - cluster_name = get_topic_definition(topic)["cluster"] + topic_defn = get_topic_definition(Topic.INGEST_MONITORS) + topic = topic_defn["real_topic_name"] - conf = get_kafka_admin_cluster_options(cluster_name) + conf = get_kafka_admin_cluster_options(topic_defn["cluster"]) admin_client = AdminClient(conf) result = admin_client.list_topics(topic) topic_metadata = result.topics.get(topic) @@ -203,7 +205,7 @@ def clock_pulse(current_datetime=None): # topic. This is a requirement to ensure that none of the partitions stall, # since the global clock is tied to the slowest partition. for partition in _get_partitions().values(): - dest = Partition(Topic(settings.KAFKA_INGEST_MONITORS), partition.id) + dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id) _checkin_producer.produce(dest, payload) diff --git a/src/sentry/replays/lib/kafka.py b/src/sentry/replays/lib/kafka.py index 26ab2368e649c..2bde967b5faf0 100644 --- a/src/sentry/replays/lib/kafka.py +++ b/src/sentry/replays/lib/kafka.py @@ -1,5 +1,4 @@ -from django.conf import settings - +from sentry.conf.types.kafka_definition import Topic from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition from sentry.utils.pubsub import KafkaPublisher @@ -10,7 +9,7 @@ def initialize_replays_publisher(is_async=False) -> KafkaPublisher: global replay_publisher if replay_publisher is None: - config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS) + config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS) replay_publisher = KafkaPublisher( get_kafka_producer_cluster_options(config["cluster"]), asynchronous=is_async, diff --git a/src/sentry/replays/usecases/ingest/dom_index.py b/src/sentry/replays/usecases/ingest/dom_index.py index 80b626bb20c98..763162f521e3f 100644 --- a/src/sentry/replays/usecases/ingest/dom_index.py +++ b/src/sentry/replays/usecases/ingest/dom_index.py @@ -8,9 +8,8 @@ from hashlib import md5 from typing import Any, Literal, TypedDict, cast -from django.conf import settings - from sentry import features +from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.replays.usecases.ingest.events import SentryEvent from sentry.replays.usecases.ingest.issue_creation import ( @@ -219,7 +218,7 @@ def _initialize_publisher() -> KafkaPublisher: global replay_publisher if replay_publisher is None: - config = kafka_config.get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS) + config = kafka_config.get_topic_definition(Topic.INGEST_REPLAY_EVENTS) replay_publisher = KafkaPublisher( kafka_config.get_kafka_producer_cluster_options(config["cluster"]) ) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 038a66bfa7223..6e00c6890af29 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -366,12 +366,11 @@ def devserver( from sentry.conf.types.kafka_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics + from sentry.utils.kafka_config import get_topic_definition for topic in Topic: - default_name = topic.value - physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name) - cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name] - create_topics(cluster_name, [physical_name]) + topic_defn = get_topic_definition(topic) + create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]]) if dev_consumer: daemons.append( diff --git a/src/sentry/sentry_metrics/configuration.py b/src/sentry/sentry_metrics/configuration.py index eddebed13a322..a885712f379d6 100644 --- a/src/sentry/sentry_metrics/configuration.py +++ b/src/sentry/sentry_metrics/configuration.py @@ -10,6 +10,8 @@ import sentry_sdk +from sentry.conf.types.kafka_definition import Topic + # The maximum length of a column that is indexed in postgres. It is important to keep this in # sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py MAX_INDEXED_COLUMN_LENGTH = 200 @@ -46,7 +48,7 @@ class MetricsIngestConfiguration: db_backend: IndexerStorage db_backend_options: Mapping[str, Any] input_topic: str - output_topic: str + output_topic: Topic use_case_id: UseCaseKey internal_metrics_tag: str | None writes_limiter_cluster_options: Mapping[str, Any] @@ -79,7 +81,7 @@ def get_ingest_config( db_backend=IndexerStorage.POSTGRES, db_backend_options={}, input_topic=settings.KAFKA_INGEST_METRICS, - output_topic=settings.KAFKA_SNUBA_METRICS, + output_topic=Topic.SNUBA_METRICS, use_case_id=UseCaseKey.RELEASE_HEALTH, internal_metrics_tag="release-health", writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS, @@ -96,7 +98,7 @@ def get_ingest_config( db_backend=IndexerStorage.POSTGRES, db_backend_options={}, input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS, - output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS, + output_topic=Topic.SNUBA_GENERIC_METRICS, use_case_id=UseCaseKey.PERFORMANCE, internal_metrics_tag="perf", writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE, diff --git a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py index dd56520a20f52..8cb2fdd5639b2 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py +++ b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py @@ -10,6 +10,7 @@ from arroyo.types import Commit, FilteredPayload, Message, Partition from confluent_kafka import Producer +from sentry.conf.types.kafka_definition import Topic from sentry.utils import kafka_config, metrics logger = logging.getLogger(__name__) @@ -18,7 +19,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]): def __init__( self, - output_topic: str, + output_topic: Topic, commit_function: Commit, producer: AbstractProducer[KafkaPayload] | None = None, ) -> None: diff --git a/src/sentry/usage_accountant/accountant.py b/src/sentry/usage_accountant/accountant.py index 2ecf3c49f75c0..ee1e98a8c9cc8 100644 --- a/src/sentry/usage_accountant/accountant.py +++ b/src/sentry/usage_accountant/accountant.py @@ -12,9 +12,9 @@ from arroyo.backends.abstract import Producer from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration -from django.conf import settings from usageaccountant import UsageAccumulator, UsageUnit +from sentry.conf.types.kafka_definition import Topic from sentry.options import get from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -71,7 +71,7 @@ def record( if _accountant_backend is None: cluster_name = get_topic_definition( - settings.KAFKA_SHARED_RESOURCES_USAGE, + Topic.SHARED_RESOURCES_USAGE, )["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) producer = KafkaProducer( diff --git a/src/sentry/utils/kafka_config.py b/src/sentry/utils/kafka_config.py index 2ca53a67bf3a4..93e3c4fc87a12 100644 --- a/src/sentry/utils/kafka_config.py +++ b/src/sentry/utils/kafka_config.py @@ -3,6 +3,7 @@ from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.conf.types.topic_definition import TopicDefinition SUPPORTED_KAFKA_CONFIGURATION = ( @@ -96,9 +97,8 @@ def get_kafka_admin_cluster_options( ) -def get_topic_definition(topic: str) -> TopicDefinition: - defn = settings.KAFKA_TOPICS.get(topic) - if defn is not None: - return defn - else: - raise ValueError(f"Unknown {topic=}") +def get_topic_definition(topic: Topic) -> TopicDefinition: + return { + "cluster": settings.KAFKA_TOPIC_TO_CLUSTER[topic.value], + "real_topic_name": settings.KAFKA_TOPIC_OVERRIDES.get(topic.value, topic.value), + } diff --git a/src/sentry/utils/outcomes.py b/src/sentry/utils/outcomes.py index 4aa2951b4bc0a..19774c0a294a0 100644 --- a/src/sentry/utils/outcomes.py +++ b/src/sentry/utils/outcomes.py @@ -6,6 +6,7 @@ from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.constants import DataCategory from sentry.utils import json, kafka_config, metrics from sentry.utils.dates import to_datetime @@ -72,8 +73,8 @@ def track_outcome( assert isinstance(category, (type(None), DataCategory)) assert isinstance(quantity, int) - outcomes_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES) - billing_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES_BILLING) + outcomes_config = kafka_config.get_topic_definition(Topic.OUTCOMES) + billing_config = kafka_config.get_topic_definition(Topic.OUTCOMES_BILLING) use_billing = outcome.is_billing() diff --git a/tests/sentry/utils/test_outcomes.py b/tests/sentry/utils/test_outcomes.py index 65a476143d05e..b5dc3a1bf8aaa 100644 --- a/tests/sentry/utils/test_outcomes.py +++ b/tests/sentry/utils/test_outcomes.py @@ -4,6 +4,7 @@ import pytest from django.conf import settings +from sentry.conf.types.kafka_definition import Topic from sentry.utils import json, kafka_config, outcomes from sentry.utils.outcomes import Outcome, track_outcome @@ -79,9 +80,7 @@ def test_track_outcome_default(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == ( - kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], - ) + assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, payload), _ = setup.mock_publisher.return_value.publish.call_args @@ -117,7 +116,7 @@ def test_track_outcome_billing(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == (kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"],) + assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, _), _ = setup.mock_publisher.return_value.publish.call_args @@ -136,7 +135,7 @@ def test_track_outcome_billing_topic(setup): settings.KAFKA_TOPICS, { settings.KAFKA_OUTCOMES_BILLING: { - "cluster": kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], + "cluster": kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"], } }, ): @@ -148,9 +147,7 @@ def test_track_outcome_billing_topic(setup): ) cluster_args, _ = setup.mock_get_kafka_producer_cluster_options.call_args - assert cluster_args == ( - kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"], - ) + assert cluster_args == (kafka_config.get_topic_definition(Topic.OUTCOMES)["cluster"],) assert outcomes.outcomes_publisher (topic_name, _), _ = setup.mock_publisher.return_value.publish.call_args diff --git a/tests/snuba/incidents/test_tasks.py b/tests/snuba/incidents/test_tasks.py index 9354d9bb79c0e..8b451f6e7ddf5 100644 --- a/tests/snuba/incidents/test_tasks.py +++ b/tests/snuba/incidents/test_tasks.py @@ -8,6 +8,7 @@ from django.conf import settings from django.core import mail +from sentry.conf.types.kafka_definition import Topic from sentry.incidents.action_handlers import ( EmailActionHandler, generate_incident_trigger_email_context, @@ -40,7 +41,7 @@ class HandleSnubaQueryUpdateTest(TestCase): def setUp(self): super().setUp() - self.topic = "metrics-subscription-results" + topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS self.orig_registry = deepcopy(subscriber_registry) cluster_options = kafka_config.get_kafka_admin_cluster_options( @@ -48,8 +49,8 @@ def setUp(self): ) self.admin_client = AdminClient(cluster_options) - kafka_cluster = kafka_config.get_topic_definition(self.topic)["cluster"] - create_topics(kafka_cluster, [self.topic]) + topic_defn = kafka_config.get_topic_definition(topic) + create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]]) def tearDown(self): super().tearDown() From 795c058ad2a498e5dc1c8c3062d5c818a6492310 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 4 Mar 2024 17:58:15 -0800 Subject: [PATCH 2/5] ref: Move query subscriptions over to new-style kafka config --- src/sentry/consumers/__init__.py | 18 +++++------------- .../snuba/query_subscriptions/constants.py | 16 +++++++--------- src/sentry/snuba/query_subscriptions/run.py | 10 ++++++---- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 4f70c17fc3d3a..4eb92820e01ce 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -230,42 +230,34 @@ def ingest_events_options() -> list[click.Option]: "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "events"}, }, "transactions-subscription-results": { "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "transactions"}, }, "generic-metrics-subscription-results": { "topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, "validate_schema": True, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "generic_metrics"}, }, "sessions-subscription-results": { "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(), "static_args": { - "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, + "dataset": "events", }, }, "metrics-subscription-results": { "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "metrics"}, }, "ingest-events": { "topic": settings.KAFKA_INGEST_EVENTS, diff --git a/src/sentry/snuba/query_subscriptions/constants.py b/src/sentry/snuba/query_subscriptions/constants.py index 9da183bcaa9d2..ceb49368ac767 100644 --- a/src/sentry/snuba/query_subscriptions/constants.py +++ b/src/sentry/snuba/query_subscriptions/constants.py @@ -1,14 +1,7 @@ -from django.conf import settings - +from sentry.conf.types.kafka_definition import Topic from sentry.snuba.dataset import Dataset +from sentry.utils.kafka_config import get_topic_definition -topic_to_dataset: dict[str, Dataset] = { - settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: Dataset.Events, - settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: Dataset.Transactions, - settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.PerformanceMetrics, - settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS: Dataset.Sessions, - settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.Metrics, -} dataset_to_logical_topic = { Dataset.Events: "events-subscription-results", Dataset.Transactions: "transactions-subscription-results", @@ -16,3 +9,8 @@ Dataset.Sessions: "sessions-subscription-results", Dataset.Metrics: "metrics-subscription-results", } + +topic_to_dataset = { + get_topic_definition(Topic(logical_topic))["real_topic_name"]: dataset + for (dataset, logical_topic) in dataset_to_logical_topic.items() +} diff --git a/src/sentry/snuba/query_subscriptions/run.py b/src/sentry/snuba/query_subscriptions/run.py index bc7a48da35726..f33a6307bd3aa 100644 --- a/src/sentry/snuba/query_subscriptions/run.py +++ b/src/sentry/snuba/query_subscriptions/run.py @@ -13,10 +13,12 @@ from arroyo.types import BrokerValue, Commit, Message, Partition from sentry_kafka_schemas import get_codec +from sentry.conf.types.kafka_definition import Topic from sentry.features.rollout import in_random_rollout from sentry.snuba.dataset import Dataset -from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset +from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing +from sentry.utils.kafka_config import get_topic_definition logger = logging.getLogger(__name__) @@ -24,7 +26,7 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): def __init__( self, - topic: str, + dataset: str, max_batch_size: int, max_batch_time: int, num_processes: int, @@ -32,9 +34,9 @@ def __init__( output_block_size: int | None, multi_proc: bool = True, ): - self.topic = topic - self.dataset = topic_to_dataset[self.topic] + self.dataset = Dataset(dataset) self.logical_topic = dataset_to_logical_topic[self.dataset] + self.topic = get_topic_definition(Topic(self.logical_topic))["real_topic_name"] self.max_batch_size = max_batch_size self.max_batch_time = max_batch_time self.input_block_size = input_block_size From 3d56b37a55a541de68b05a56921866d3833fd8d3 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 4 Mar 2024 18:17:46 -0800 Subject: [PATCH 3/5] fix test --- tests/snuba/incidents/test_tasks.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/snuba/incidents/test_tasks.py b/tests/snuba/incidents/test_tasks.py index 8b451f6e7ddf5..3bb5185f07275 100644 --- a/tests/snuba/incidents/test_tasks.py +++ b/tests/snuba/incidents/test_tasks.py @@ -41,7 +41,7 @@ class HandleSnubaQueryUpdateTest(TestCase): def setUp(self): super().setUp() - topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS + self.topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS self.orig_registry = deepcopy(subscriber_registry) cluster_options = kafka_config.get_kafka_admin_cluster_options( @@ -49,15 +49,18 @@ def setUp(self): ) self.admin_client = AdminClient(cluster_options) - topic_defn = kafka_config.get_topic_definition(topic) - create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]]) + topic_defn = kafka_config.get_topic_definition(self.topic) + self.real_topic = topic_defn["real_topic_name"] + self.cluster = topic_defn["cluster"] + + create_topics(self.cluster, [self.real_topic]) def tearDown(self): super().tearDown() subscriber_registry.clear() subscriber_registry.update(self.orig_registry) - self.admin_client.delete_topics([self.topic]) + self.admin_client.delete_topics([self.real_topic]) metrics._metrics_backend = None @cached_property @@ -94,9 +97,8 @@ def action(self): @cached_property def producer(self): - cluster_name = kafka_config.get_topic_definition(self.topic)["cluster"] conf = { - "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][ + "bootstrap.servers": settings.KAFKA_CLUSTERS[self.cluster]["common"][ "bootstrap.servers" ], "session.timeout.ms": 6000, @@ -130,7 +132,7 @@ def run_test(self, consumer): "timestamp": "2020-01-01T01:23:45.1234", }, } - self.producer.produce(self.topic, json.dumps(message)) + self.producer.produce(self.real_topic, json.dumps(message)) self.producer.flush() def active_incident(): From 05df7ee1caa6165eb807a1af1599519aea671adc Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 4 Mar 2024 18:21:15 -0800 Subject: [PATCH 4/5] produe to real topic --- src/sentry/eventstream/kafka/backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index b5bff45bb4c7e..97fe9b98de843 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -202,9 +202,11 @@ def _send( assert isinstance(extra_data, tuple) + real_topic = get_topic_definition(topic)["real_topic_name"] + try: producer.produce( - topic=topic, + topic=real_topic, key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None, value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data), on_delivery=self.delivery_callback, From ac9c00955fd5917f374ad8a527c4e8a285d0779a Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 5 Mar 2024 10:33:50 -0800 Subject: [PATCH 5/5] update test --- tests/sentry/snuba/test_query_subscription_consumer.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/sentry/snuba/test_query_subscription_consumer.py b/tests/sentry/snuba/test_query_subscription_consumer.py index 3a8387c9806a0..ceeddb82165dd 100644 --- a/tests/sentry/snuba/test_query_subscription_consumer.py +++ b/tests/sentry/snuba/test_query_subscription_consumer.py @@ -31,6 +31,10 @@ @pytest.mark.snuba_ci class BaseQuerySubscriptionTest: + @cached_property + def dataset(self): + return Dataset.Metrics + @cached_property def topic(self): return settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS @@ -97,7 +101,7 @@ def test_arroyo_consumer(self): commit = mock.Mock() partition = Partition(Topic("test"), 0) strategy = QuerySubscriptionStrategyFactory( - self.topic, + self.dataset.value, 1, 1, 1,