Skip to content

Commit

Permalink
Revert "ref: Move query subscriptions over to new-style kafka config (#…
Browse files Browse the repository at this point in the history
…66287)"

This reverts commit f9c2ffd.

Co-authored-by: IanWoodard <17186604+IanWoodard@users.noreply.github.com>
  • Loading branch information
getsentry-bot and IanWoodard committed Mar 5, 2024
1 parent 622827e commit c94ad9e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 23 deletions.
18 changes: 13 additions & 5 deletions src/sentry/consumers/__init__.py
Expand Up @@ -230,34 +230,42 @@ def ingest_events_options() -> list[click.Option]:
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "events"},
"static_args": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
},
},
"transactions-subscription-results": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "transactions"},
"static_args": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
},
},
"generic-metrics-subscription-results": {
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"validate_schema": True,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "generic_metrics"},
"static_args": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
},
},
"sessions-subscription-results": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(),
"static_args": {
"dataset": "events",
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
},
},
"metrics-subscription-results": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "metrics"},
"static_args": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
},
},
"ingest-events": {
"topic": settings.KAFKA_INGEST_EVENTS,
Expand Down
16 changes: 9 additions & 7 deletions src/sentry/snuba/query_subscriptions/constants.py
@@ -1,16 +1,18 @@
from sentry.conf.types.kafka_definition import Topic
from django.conf import settings

from sentry.snuba.dataset import Dataset
from sentry.utils.kafka_config import get_topic_definition

topic_to_dataset: dict[str, Dataset] = {
settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: Dataset.Events,
settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: Dataset.Transactions,
settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.PerformanceMetrics,
settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS: Dataset.Sessions,
settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.Metrics,
}
dataset_to_logical_topic = {
Dataset.Events: "events-subscription-results",
Dataset.Transactions: "transactions-subscription-results",
Dataset.PerformanceMetrics: "generic-metrics-subscription-results",
Dataset.Sessions: "sessions-subscription-results",
Dataset.Metrics: "metrics-subscription-results",
}

topic_to_dataset = {
get_topic_definition(Topic(logical_topic))["real_topic_name"]: dataset
for (dataset, logical_topic) in dataset_to_logical_topic.items()
}
10 changes: 4 additions & 6 deletions src/sentry/snuba/query_subscriptions/run.py
Expand Up @@ -13,30 +13,28 @@
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec

from sentry.conf.types.kafka_definition import Topic
from sentry.features.rollout import in_random_rollout
from sentry.snuba.dataset import Dataset
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset
from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
from sentry.utils.kafka_config import get_topic_definition

logger = logging.getLogger(__name__)


class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
dataset: str,
topic: str,
max_batch_size: int,
max_batch_time: int,
num_processes: int,
input_block_size: int | None,
output_block_size: int | None,
multi_proc: bool = True,
):
self.dataset = Dataset(dataset)
self.topic = topic
self.dataset = topic_to_dataset[self.topic]
self.logical_topic = dataset_to_logical_topic[self.dataset]
self.topic = get_topic_definition(Topic(self.logical_topic))["real_topic_name"]
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.input_block_size = input_block_size
Expand Down
6 changes: 1 addition & 5 deletions tests/sentry/snuba/test_query_subscription_consumer.py
Expand Up @@ -31,10 +31,6 @@

@pytest.mark.snuba_ci
class BaseQuerySubscriptionTest:
@cached_property
def dataset(self):
return Dataset.Metrics

@cached_property
def topic(self):
return settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS
Expand Down Expand Up @@ -101,7 +97,7 @@ def test_arroyo_consumer(self):
commit = mock.Mock()
partition = Partition(Topic("test"), 0)
strategy = QuerySubscriptionStrategyFactory(
self.dataset.value,
self.topic,
1,
1,
1,
Expand Down

0 comments on commit c94ad9e

Please sign in to comment.