Skip to content

Commit

Permalink
ref: Move the rest of the consumer definitions to the new style (#66343)
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Mar 6, 2024
1 parent 70b9fa5 commit cb2ed30
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 52 deletions.
9 changes: 3 additions & 6 deletions src/sentry/conf/types/kafka_definition.py
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from collections.abc import Mapping, Sequence
from enum import Enum
from typing import Any, Required, TypedDict

Expand Down Expand Up @@ -48,11 +48,8 @@ class Topic(Enum):

class ConsumerDefinition(TypedDict, total=False):

# XXX: Eventually only Topic will be accepted here.
# For backward compatibility with getsentry, we must also
# support the physical override topic name (str, Callable[str], str)
# while the migration is taking place
topic: Required[Topic | str | Callable[[], str]]
# Default topic
topic: Required[Topic]

# Schema validation will be run if true
validate_schema: bool | None
Expand Down
58 changes: 20 additions & 38 deletions src/sentry/consumers/__init__.py
Expand Up @@ -186,14 +186,7 @@ def ingest_events_options() -> list[click.Option]:
),
]


_INGEST_SPANS_OPTIONS = multiprocessing_options(default_max_batch_size=100) + [
click.Option(["--output-topic", "output_topic"], type=str, default="snuba-spans"),
]

# consumer name -> consumer definition
# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden
# string. We support both currently for backward compatibility.
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = {
"ingest-profiles": {
"topic": Topic.PROFILES,
Expand All @@ -210,32 +203,32 @@ def ingest_events_options() -> list[click.Option]:
"click_options": ingest_replay_recordings_buffered_options(),
},
"ingest-monitors": {
"topic": settings.KAFKA_INGEST_MONITORS,
"topic": Topic.INGEST_MONITORS,
"strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory",
"click_options": ingest_monitors_options(),
},
"billing-metrics-consumer": {
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
"topic": Topic.SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory",
},
# Known differences to 'sentry run occurrences-ingest-consumer':
# - ingest_consumer_types metric tag is missing. Use the kafka_topic and
# group_id tags provided by run_basic_consumer instead
"ingest-occurrences": {
"topic": settings.KAFKA_INGEST_OCCURRENCES,
"topic": Topic.INGEST_OCCURRENCES,
"strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=20),
},
"events-subscription-results": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
},
},
"transactions-subscription-results": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
Expand All @@ -252,47 +245,47 @@ def ingest_events_options() -> list[click.Option]:
},
},
"sessions-subscription-results": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(),
"static_args": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
},
},
"metrics-subscription-results": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
},
},
"ingest-events": {
"topic": settings.KAFKA_INGEST_EVENTS,
"topic": Topic.INGEST_EVENTS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": ingest_events_options(),
"static_args": {
"consumer_type": "events",
},
},
"ingest-attachments": {
"topic": settings.KAFKA_INGEST_ATTACHMENTS,
"topic": Topic.INGEST_ATTACHMENTS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": ingest_events_options(),
"static_args": {
"consumer_type": "attachments",
},
},
"ingest-transactions": {
"topic": settings.KAFKA_INGEST_TRANSACTIONS,
"topic": Topic.INGEST_TRANSACTIONS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": ingest_events_options(),
"static_args": {
"consumer_type": "transactions",
},
},
"ingest-metrics": {
"topic": settings.KAFKA_INGEST_METRICS,
"topic": Topic.INGEST_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"click_options": _METRICS_INDEXER_OPTIONS,
"static_args": {
Expand All @@ -303,7 +296,7 @@ def ingest_events_options() -> list[click.Option]:
"dlq_max_consecutive_count": 1000,
},
"ingest-generic-metrics": {
"topic": settings.KAFKA_INGEST_PERFORMANCE_METRICS,
"topic": Topic.INGEST_PERFORMANCE_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"click_options": _METRICS_INDEXER_OPTIONS,
"static_args": {
Expand All @@ -314,44 +307,44 @@ def ingest_events_options() -> list[click.Option]:
"dlq_max_consecutive_count": 1000,
},
"generic-metrics-last-seen-updater": {
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
"topic": Topic.SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory",
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS,
"static_args": {
"ingest_profile": "performance",
},
},
"metrics-last-seen-updater": {
"topic": settings.KAFKA_SNUBA_METRICS,
"topic": Topic.SNUBA_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory",
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS,
"static_args": {
"ingest_profile": "release-health",
},
},
"post-process-forwarder-issue-platform": {
"topic": settings.KAFKA_EVENTSTREAM_GENERIC,
"topic": Topic.EVENTSTREAM_GENERIC,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-generic-events-commit-log",
"synchronize_commit_group_default": "generic_events_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"post-process-forwarder-transactions": {
"topic": settings.KAFKA_TRANSACTIONS,
"topic": Topic.TRANSACTIONS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-transactions-commit-log",
"synchronize_commit_group_default": "transactions_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"post-process-forwarder-errors": {
"topic": settings.KAFKA_EVENTS,
"topic": Topic.EVENTS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-commit-log",
"synchronize_commit_group_default": "snuba-consumers",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"process-spans": {
"topic": settings.KAFKA_SNUBA_SPANS,
"topic": Topic.SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
**settings.SENTRY_KAFKA_CONSUMERS,
Expand Down Expand Up @@ -405,15 +398,8 @@ def get_stream_processor(

strategy_factory_cls = import_string(consumer_definition["strategy_factory"])
consumer_topic = consumer_definition["topic"]
if isinstance(consumer_topic, Topic):
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)
else:
# TODO: Deprecated, remove once this way is no longer used
if not isinstance(consumer_topic, str):
real_topic = consumer_topic()
else:
real_topic = consumer_topic
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)

if topic is None:
topic = real_topic
Expand Down Expand Up @@ -496,10 +482,6 @@ def build_consumer_config(group_id: str):
validate_schema = consumer_definition.get("validate_schema") or False

if validate_schema:
# TODO: Remove this later but for now we can only validate if `topic_def` is
# the logical topic and not the legacy override topic
assert isinstance(consumer_topic, Topic)

strategy_factory = ValidateSchemaStrategyFactoryWrapper(
consumer_topic.value, validate_schema, strategy_factory
)
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/conf/test_kafka_definition.py
Expand Up @@ -48,7 +48,7 @@ class ConsumersDefinitionTest(TestCase):
def test_exception_on_invalid_consumer_definition(self):
invalid_definitions: list[ConsumerDefinition] = [
{
"topic": "topic",
"topic": Topic.INGEST_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"static_args": {
"ingest_profile": "release-health",
Expand Down
9 changes: 2 additions & 7 deletions tests/sentry/consumers/test_run.py
Expand Up @@ -2,7 +2,7 @@
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory

from sentry import consumers
from sentry.conf.types.kafka_definition import ConsumerDefinition, Topic
from sentry.conf.types.kafka_definition import ConsumerDefinition
from sentry.utils.imports import import_string


Expand All @@ -16,9 +16,4 @@ def test_all_importable(consumer_def, settings):
assert issubclass(factory, ProcessingStrategyFactory)

topic = defn["topic"]
if isinstance(topic, Topic):
assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER
else:
# TODO: Legacy way, will be deprecated once all consumer definitions
# are migrated
assert topic in settings.KAFKA_TOPICS
assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER
Expand Up @@ -62,6 +62,7 @@ def setUp(self) -> None:
KAFKA_TOPICS={
self.events_topic: {"cluster": "default"},
},
KAFKA_TOPIC_OVERRIDES={"events": self.events_topic, "transactions": self.events_topic},
)

self.override_settings_cm.__enter__()
Expand Down

0 comments on commit cb2ed30

Please sign in to comment.