Skip to content

Commit

Permalink
ref: Unrevert #66287 (#66383)
Browse files Browse the repository at this point in the history
This brings back #66287. It was
reverted before as it depended on an earlier commit which was also
reverted. Nothing has changed though.

Depends on #66381
  • Loading branch information
lynnagara committed Mar 7, 2024
1 parent 866a951 commit 86291d0
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 27 deletions.
18 changes: 5 additions & 13 deletions src/sentry/consumers/__init__.py
Expand Up @@ -223,42 +223,34 @@ def ingest_events_options() -> list[click.Option]:
"topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "events"},
},
"transactions-subscription-results": {
"topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "transactions"},
},
"generic-metrics-subscription-results": {
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"validate_schema": True,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "generic_metrics"},
},
"sessions-subscription-results": {
"topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(),
"static_args": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"dataset": "events",
},
},
"metrics-subscription-results": {
"topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "metrics"},
},
"ingest-events": {
"topic": Topic.INGEST_EVENTS,
Expand Down
16 changes: 7 additions & 9 deletions src/sentry/snuba/query_subscriptions/constants.py
@@ -1,18 +1,16 @@
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.snuba.dataset import Dataset
from sentry.utils.kafka_config import get_topic_definition

topic_to_dataset: dict[str, Dataset] = {
settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: Dataset.Events,
settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: Dataset.Transactions,
settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.PerformanceMetrics,
settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS: Dataset.Sessions,
settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.Metrics,
}
dataset_to_logical_topic = {
Dataset.Events: "events-subscription-results",
Dataset.Transactions: "transactions-subscription-results",
Dataset.PerformanceMetrics: "generic-metrics-subscription-results",
Dataset.Sessions: "sessions-subscription-results",
Dataset.Metrics: "metrics-subscription-results",
}

topic_to_dataset = {
get_topic_definition(Topic(logical_topic))["real_topic_name"]: dataset
for (dataset, logical_topic) in dataset_to_logical_topic.items()
}
10 changes: 6 additions & 4 deletions src/sentry/snuba/query_subscriptions/run.py
Expand Up @@ -13,28 +13,30 @@
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec

from sentry.conf.types.kafka_definition import Topic
from sentry.features.rollout import in_random_rollout
from sentry.snuba.dataset import Dataset
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic
from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
from sentry.utils.kafka_config import get_topic_definition

logger = logging.getLogger(__name__)


class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
topic: str,
dataset: str,
max_batch_size: int,
max_batch_time: int,
num_processes: int,
input_block_size: int | None,
output_block_size: int | None,
multi_proc: bool = True,
):
self.topic = topic
self.dataset = topic_to_dataset[self.topic]
self.dataset = Dataset(dataset)
self.logical_topic = dataset_to_logical_topic[self.dataset]
self.topic = get_topic_definition(Topic(self.logical_topic))["real_topic_name"]
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.input_block_size = input_block_size
Expand Down
6 changes: 5 additions & 1 deletion tests/sentry/snuba/test_query_subscription_consumer.py
Expand Up @@ -31,6 +31,10 @@

@pytest.mark.snuba_ci
class BaseQuerySubscriptionTest:
@cached_property
def dataset(self):
return Dataset.Metrics

@cached_property
def topic(self):
return settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS
Expand Down Expand Up @@ -97,7 +101,7 @@ def test_arroyo_consumer(self):
commit = mock.Mock()
partition = Partition(Topic("test"), 0)
strategy = QuerySubscriptionStrategyFactory(
self.topic,
self.dataset.value,
1,
1,
1,
Expand Down

0 comments on commit 86291d0

Please sign in to comment.