Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Move the rest of the consumer definitions to the new style #66343

Merged
merged 6 commits into from Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/sentry/conf/types/kafka_definition.py
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from collections.abc import Mapping, Sequence
from enum import Enum
from typing import Any, Required, TypedDict

Expand Down Expand Up @@ -48,11 +48,8 @@ class Topic(Enum):

class ConsumerDefinition(TypedDict, total=False):

# XXX: Eventually only Topic will be accepted here.
# For backward compatibility with getsentry, we must also
# support the physical override topic name (str, Callable[str], str)
# while the migration is taking place
topic: Required[Topic | str | Callable[[], str]]
# Default topic
topic: Required[Topic]

# Schema validation will be run if true
validate_schema: bool | None
Expand Down
58 changes: 20 additions & 38 deletions src/sentry/consumers/__init__.py
Expand Up @@ -186,14 +186,7 @@ def ingest_events_options() -> list[click.Option]:
),
]


_INGEST_SPANS_OPTIONS = multiprocessing_options(default_max_batch_size=100) + [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to clarify, did u remove this because it's not being used anywhere, unrelated to the control plane work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's dead code so i thought i'd clean it up

click.Option(["--output-topic", "output_topic"], type=str, default="snuba-spans"),
]

# consumer name -> consumer definition
# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden
# string. We support both currently for backward compatibility.
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = {
"ingest-profiles": {
"topic": Topic.PROFILES,
Expand All @@ -210,30 +203,30 @@ def ingest_events_options() -> list[click.Option]:
"click_options": ingest_replay_recordings_buffered_options(),
},
"ingest-monitors": {
"topic": settings.KAFKA_INGEST_MONITORS,
"topic": Topic.INGEST_MONITORS,
"strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory",
"click_options": ingest_monitors_options(),
},
"billing-metrics-consumer": {
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
"topic": Topic.SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory",
},
# Known differences to 'sentry run occurrences-ingest-consumer':
# - ingest_consumer_types metric tag is missing. Use the kafka_topic and
# group_id tags provided by run_basic_consumer instead
"ingest-occurrences": {
"topic": settings.KAFKA_INGEST_OCCURRENCES,
"topic": Topic.INGEST_OCCURRENCES,
"strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=20),
},
"events-subscription-results": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "events"},
},
"transactions-subscription-results": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "transactions"},
Expand All @@ -246,45 +239,45 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {"dataset": "generic_metrics"},
},
"sessions-subscription-results": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(),
"static_args": {
"dataset": "events",
},
},
"metrics-subscription-results": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {"dataset": "metrics"},
},
"ingest-events": {
"topic": settings.KAFKA_INGEST_EVENTS,
"topic": Topic.INGEST_EVENTS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": ingest_events_options(),
"static_args": {
"consumer_type": "events",
},
},
"ingest-attachments": {
"topic": settings.KAFKA_INGEST_ATTACHMENTS,
"topic": Topic.INGEST_ATTACHMENTS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": ingest_events_options(),
"static_args": {
"consumer_type": "attachments",
},
},
"ingest-transactions": {
"topic": settings.KAFKA_INGEST_TRANSACTIONS,
"topic": Topic.INGEST_TRANSACTIONS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": ingest_events_options(),
"static_args": {
"consumer_type": "transactions",
},
},
"ingest-metrics": {
"topic": settings.KAFKA_INGEST_METRICS,
"topic": Topic.INGEST_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"click_options": _METRICS_INDEXER_OPTIONS,
"static_args": {
Expand All @@ -295,7 +288,7 @@ def ingest_events_options() -> list[click.Option]:
"dlq_max_consecutive_count": 1000,
},
"ingest-generic-metrics": {
"topic": settings.KAFKA_INGEST_PERFORMANCE_METRICS,
"topic": Topic.INGEST_PERFORMANCE_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"click_options": _METRICS_INDEXER_OPTIONS,
"static_args": {
Expand All @@ -306,44 +299,44 @@ def ingest_events_options() -> list[click.Option]:
"dlq_max_consecutive_count": 1000,
},
"generic-metrics-last-seen-updater": {
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
"topic": Topic.SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory",
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS,
"static_args": {
"ingest_profile": "performance",
},
},
"metrics-last-seen-updater": {
"topic": settings.KAFKA_SNUBA_METRICS,
"topic": Topic.SNUBA_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory",
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS,
"static_args": {
"ingest_profile": "release-health",
},
},
"post-process-forwarder-issue-platform": {
"topic": settings.KAFKA_EVENTSTREAM_GENERIC,
"topic": Topic.EVENTSTREAM_GENERIC,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-generic-events-commit-log",
"synchronize_commit_group_default": "generic_events_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"post-process-forwarder-transactions": {
"topic": settings.KAFKA_TRANSACTIONS,
"topic": Topic.TRANSACTIONS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-transactions-commit-log",
"synchronize_commit_group_default": "transactions_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"post-process-forwarder-errors": {
"topic": settings.KAFKA_EVENTS,
"topic": Topic.EVENTS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-commit-log",
"synchronize_commit_group_default": "snuba-consumers",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"process-spans": {
"topic": settings.KAFKA_SNUBA_SPANS,
"topic": Topic.SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
**settings.SENTRY_KAFKA_CONSUMERS,
Expand Down Expand Up @@ -397,15 +390,8 @@ def get_stream_processor(

strategy_factory_cls = import_string(consumer_definition["strategy_factory"])
consumer_topic = consumer_definition["topic"]
if isinstance(consumer_topic, Topic):
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)
else:
# TODO: Deprecated, remove once this way is no longer used
if not isinstance(consumer_topic, str):
real_topic = consumer_topic()
else:
real_topic = consumer_topic
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)

if topic is None:
topic = real_topic
Expand Down Expand Up @@ -488,10 +474,6 @@ def build_consumer_config(group_id: str):
validate_schema = consumer_definition.get("validate_schema") or False

if validate_schema:
# TODO: Remove this later but for now we can only validate if `topic_def` is
# the logical topic and not the legacy override topic
assert isinstance(consumer_topic, Topic)

strategy_factory = ValidateSchemaStrategyFactoryWrapper(
consumer_topic.value, validate_schema, strategy_factory
)
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/conf/test_kafka_definition.py
Expand Up @@ -48,7 +48,7 @@ class ConsumersDefinitionTest(TestCase):
def test_exception_on_invalid_consumer_definition(self):
invalid_definitions: list[ConsumerDefinition] = [
{
"topic": "topic",
"topic": Topic.INGEST_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"static_args": {
"ingest_profile": "release-health",
Expand Down
9 changes: 2 additions & 7 deletions tests/sentry/consumers/test_run.py
Expand Up @@ -2,7 +2,7 @@
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory

from sentry import consumers
from sentry.conf.types.kafka_definition import ConsumerDefinition, Topic
from sentry.conf.types.kafka_definition import ConsumerDefinition
from sentry.utils.imports import import_string


Expand All @@ -16,9 +16,4 @@ def test_all_importable(consumer_def, settings):
assert issubclass(factory, ProcessingStrategyFactory)

topic = defn["topic"]
if isinstance(topic, Topic):
assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER
else:
# TODO: Legacy way, will be deprecated once all consumer definitions
# are migrated
assert topic in settings.KAFKA_TOPICS
assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER
Expand Up @@ -62,6 +62,7 @@ def setUp(self) -> None:
KAFKA_TOPICS={
self.events_topic: {"cluster": "default"},
},
KAFKA_TOPIC_OVERRIDES={"events": self.events_topic, "transactions": self.events_topic},
)

self.override_settings_cm.__enter__()
Expand Down