diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 4f70c17fc3d3a..4eb92820e01ce 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -230,42 +230,34 @@ def ingest_events_options() -> list[click.Option]: "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "events"}, }, "transactions-subscription-results": { "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "transactions"}, }, "generic-metrics-subscription-results": { "topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, "validate_schema": True, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "generic_metrics"}, }, "sessions-subscription-results": { "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(), "static_args": { - "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, + "dataset": "events", }, }, "metrics-subscription-results": { "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), - "static_args": { - "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, - }, + "static_args": {"dataset": "metrics"}, }, "ingest-events": { "topic": settings.KAFKA_INGEST_EVENTS, diff --git a/src/sentry/snuba/query_subscriptions/constants.py b/src/sentry/snuba/query_subscriptions/constants.py index 9da183bcaa9d2..ceb49368ac767 100644 --- a/src/sentry/snuba/query_subscriptions/constants.py +++ b/src/sentry/snuba/query_subscriptions/constants.py @@ -1,14 +1,7 @@ -from django.conf import settings - +from sentry.conf.types.kafka_definition import Topic from sentry.snuba.dataset import Dataset +from sentry.utils.kafka_config import get_topic_definition -topic_to_dataset: dict[str, Dataset] = { - settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: Dataset.Events, - settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: Dataset.Transactions, - settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.PerformanceMetrics, - settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS: Dataset.Sessions, - settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.Metrics, -} dataset_to_logical_topic = { Dataset.Events: "events-subscription-results", Dataset.Transactions: "transactions-subscription-results", @@ -16,3 +9,8 @@ Dataset.Sessions: "sessions-subscription-results", Dataset.Metrics: "metrics-subscription-results", } + +topic_to_dataset = { + get_topic_definition(Topic(logical_topic))["real_topic_name"]: dataset + for (dataset, logical_topic) in dataset_to_logical_topic.items() +} diff --git a/src/sentry/snuba/query_subscriptions/run.py b/src/sentry/snuba/query_subscriptions/run.py index bc7a48da35726..f33a6307bd3aa 100644 --- a/src/sentry/snuba/query_subscriptions/run.py +++ b/src/sentry/snuba/query_subscriptions/run.py @@ -13,10 +13,12 @@ from arroyo.types import BrokerValue, Commit, Message, Partition from sentry_kafka_schemas import get_codec +from sentry.conf.types.kafka_definition import Topic from sentry.features.rollout import in_random_rollout from sentry.snuba.dataset import Dataset -from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset +from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing +from sentry.utils.kafka_config import get_topic_definition logger = logging.getLogger(__name__) @@ -24,7 +26,7 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): def __init__( self, - topic: str, + dataset: str, max_batch_size: int, max_batch_time: int, num_processes: int, @@ -32,9 +34,9 @@ def __init__( output_block_size: int | None, multi_proc: bool = True, ): - self.topic = topic - self.dataset = topic_to_dataset[self.topic] + self.dataset = Dataset(dataset) self.logical_topic = dataset_to_logical_topic[self.dataset] + self.topic = get_topic_definition(Topic(self.logical_topic))["real_topic_name"] self.max_batch_size = max_batch_size self.max_batch_time = max_batch_time self.input_block_size = input_block_size diff --git a/tests/sentry/snuba/test_query_subscription_consumer.py b/tests/sentry/snuba/test_query_subscription_consumer.py index 3a8387c9806a0..ceeddb82165dd 100644 --- a/tests/sentry/snuba/test_query_subscription_consumer.py +++ b/tests/sentry/snuba/test_query_subscription_consumer.py @@ -31,6 +31,10 @@ @pytest.mark.snuba_ci class BaseQuerySubscriptionTest: + @cached_property + def dataset(self): + return Dataset.Metrics + @cached_property def topic(self): return settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS @@ -97,7 +101,7 @@ def test_arroyo_consumer(self): commit = mock.Mock() partition = Partition(Topic("test"), 0) strategy = QuerySubscriptionStrategyFactory( - self.topic, + self.dataset.value, 1, 1, 1,