Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Unrevert #66287 #66383

Merged
merged 4 commits into from Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 5 additions & 13 deletions src/sentry/consumers/__init__.py
Expand Up @@ -230,42 +230,34 @@ def ingest_events_options() -> list[click.Option]:
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "events"},
},
"transactions-subscription-results": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "transactions"},
},
"generic-metrics-subscription-results": {
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"validate_schema": True,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "generic_metrics"},
},
"sessions-subscription-results": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(),
"static_args": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"dataset": "events",
},
},
"metrics-subscription-results": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
},
"static_args": {"dataset": "metrics"},
},
"ingest-events": {
"topic": settings.KAFKA_INGEST_EVENTS,
Expand Down
16 changes: 7 additions & 9 deletions src/sentry/snuba/query_subscriptions/constants.py
@@ -1,18 +1,16 @@
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.snuba.dataset import Dataset
from sentry.utils.kafka_config import get_topic_definition

topic_to_dataset: dict[str, Dataset] = {
settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: Dataset.Events,
settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: Dataset.Transactions,
settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.PerformanceMetrics,
settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS: Dataset.Sessions,
settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.Metrics,
}
dataset_to_logical_topic = {
Dataset.Events: "events-subscription-results",
Dataset.Transactions: "transactions-subscription-results",
Dataset.PerformanceMetrics: "generic-metrics-subscription-results",
Dataset.Sessions: "sessions-subscription-results",
Dataset.Metrics: "metrics-subscription-results",
}

topic_to_dataset = {
get_topic_definition(Topic(logical_topic))["real_topic_name"]: dataset
for (dataset, logical_topic) in dataset_to_logical_topic.items()
}
10 changes: 6 additions & 4 deletions src/sentry/snuba/query_subscriptions/run.py
Expand Up @@ -13,28 +13,30 @@
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec

from sentry.conf.types.kafka_definition import Topic
from sentry.features.rollout import in_random_rollout
from sentry.snuba.dataset import Dataset
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic
from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
from sentry.utils.kafka_config import get_topic_definition

logger = logging.getLogger(__name__)


class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
topic: str,
dataset: str,
max_batch_size: int,
max_batch_time: int,
num_processes: int,
input_block_size: int | None,
output_block_size: int | None,
multi_proc: bool = True,
):
self.topic = topic
self.dataset = topic_to_dataset[self.topic]
self.dataset = Dataset(dataset)
self.logical_topic = dataset_to_logical_topic[self.dataset]
self.topic = get_topic_definition(Topic(self.logical_topic))["real_topic_name"]
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.input_block_size = input_block_size
Expand Down
6 changes: 5 additions & 1 deletion tests/sentry/snuba/test_query_subscription_consumer.py
Expand Up @@ -31,6 +31,10 @@

@pytest.mark.snuba_ci
class BaseQuerySubscriptionTest:
@cached_property
def dataset(self):
return Dataset.Metrics

@cached_property
def topic(self):
return settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS
Expand Down Expand Up @@ -97,7 +101,7 @@ def test_arroyo_consumer(self):
commit = mock.Mock()
partition = Partition(Topic("test"), 0)
strategy = QuerySubscriptionStrategyFactory(
self.topic,
self.dataset.value,
1,
1,
1,
Expand Down