From b273b38355f6582356c550adf2b41e48eb9fde2b Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 4 Mar 2024 09:39:09 -0800 Subject: [PATCH] ref: Rationalize Kafka topic config (#65793) This brings the concept of logical vs physical topic override to Sentry, which is fairly closely aligned with how topic management is done in Snuba. Both the old way (KAFKA_TOPICS) and new way (KAFKA_TOPIC_OVERRIDES and KAFKA_TOPIC_TO_CLUSTER) will be supported until we have fully migrated over in production and all client code in this repository no longer reference the old variables. --- src/sentry/conf/server.py | 55 +++++++++++- src/sentry/conf/types/consumer_definition.py | 39 --------- src/sentry/conf/types/kafka_definition.py | 85 +++++++++++++++++++ src/sentry/consumers/__init__.py | 55 +++++++----- src/sentry/runner/commands/devserver.py | 9 +- .../sentry/conf/test_consumer_definitions.py | 27 ------ tests/sentry/conf/test_kafka_definition.py | 66 ++++++++++++++ tests/sentry/consumers/test_run.py | 9 +- 8 files changed, 252 insertions(+), 93 deletions(-) delete mode 100644 src/sentry/conf/types/consumer_definition.py create mode 100644 src/sentry/conf/types/kafka_definition.py delete mode 100644 tests/sentry/conf/test_consumer_definitions.py create mode 100644 tests/sentry/conf/test_kafka_definition.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 781e23d3a329a7..d6b67a8f662d9e 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -17,7 +17,7 @@ from urllib.parse import urlparse import sentry -from sentry.conf.types.consumer_definition import ConsumerDefinition +from sentry.conf.types.kafka_definition import ConsumerDefinition from sentry.conf.types.logging_config import LoggingConfig from sentry.conf.types.role_dict import RoleDict from sentry.conf.types.sdk_config import ServerSdkConfig @@ -3439,6 +3439,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # `prod.py`, also override the entirety of `KAFKA_TOPICS` to ensure the keys # pick up the change. +# START DEPRECATED SECTION KAFKA_EVENTS = "events" KAFKA_EVENTS_COMMIT_LOG = "snuba-commit-log" KAFKA_TRANSACTIONS = "transactions" @@ -3473,8 +3474,59 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # spans KAFKA_SNUBA_SPANS = "snuba-spans" +# END DEPRECATED SECTION + + +# Mapping of default Kafka topic name to custom names +KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = { + # TODO: This is temporary while we migrate between the old and new way of defining overrides. + # To be removed once this is defined in prod, along with KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS + # variable which will no longer be needed + "generic-metrics-subscription-results": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS +} + + +# Mapping of default Kafka topic name to cluster name +# as per KAFKA_CLUSTERS. +# This must be the default name that matches the topic +# in sentry.conf.types.kafka_definition and sentry-kafka-schemas +# and not any environment-specific override value +KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = { + "events": "default", + "ingest-events-dlq": "default", + "snuba-commit-log": "default", + "transactions": "default", + "snuba-transactions-commit-log": "default", + "outcomes": "default", + "outcomes-billing": "default", + "events-subscription-results": "default", + "transactions-subscription-results": "default", + "generic-metrics-subscription-results": "default", + "sessions-subscription-results": "default", + "metrics-subscription-results": "default", + "ingest-events": "default", + "ingest-attachments": "default", + "ingest-transactions": "default", + "ingest-metrics": "default", + "ingest-metrics-dlq": "default", + "snuba-metrics": "default", + "profiles": "default", + "ingest-performance-metrics": "default", + "ingest-generic-metrics-dlq": "default", + "snuba-generic-metrics": "default", + "ingest-replay-events": "default", + "ingest-replay-recordings": "default", + "ingest-occurrences": "default", + "ingest-monitors": "default", + "generic-events": "default", + "snuba-generic-events-commit-log": "default", + "group-attributes": "default", + "snuba-spans": "default", + "shared-resources-usage": "default", +} # Cluster configuration for each Kafka topic by name. +# DEPRECATED KAFKA_TOPICS: Mapping[str, TopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, @@ -3517,7 +3569,6 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"}, } - # If True, sentry.utils.arroyo.RunTaskWithMultiprocessing will actually be # single-threaded under the hood for performance KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING = False diff --git a/src/sentry/conf/types/consumer_definition.py b/src/sentry/conf/types/consumer_definition.py deleted file mode 100644 index 7f2d51b93af202..00000000000000 --- a/src/sentry/conf/types/consumer_definition.py +++ /dev/null @@ -1,39 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Mapping, Sequence -from typing import Any, Required, TypedDict - -import click - - -class ConsumerDefinition(TypedDict, total=False): - # Which logical topic from settings to use. - topic: Required[str | Callable[[], str]] - default_topic: str - - strategy_factory: Required[str] - - # Additional CLI options the consumer should accept. These arguments are - # passed as kwargs to the strategy_factory. - click_options: Sequence[click.Option] - - # Hardcoded additional kwargs for strategy_factory - static_args: Mapping[str, Any] - - require_synchronization: bool - synchronize_commit_group_default: str - synchronize_commit_log_topic_default: str - - dlq_topic: str - dlq_max_invalid_ratio: float | None - dlq_max_consecutive_count: int | None - - -def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None: - if "dlq_topic" not in consumer_definition and ( - "dlq_max_invalid_ratio" in consumer_definition - or "dlq_max_consecutive_count" in consumer_definition - ): - raise ValueError( - "Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not" - ) diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py new file mode 100644 index 00000000000000..61820572647de8 --- /dev/null +++ b/src/sentry/conf/types/kafka_definition.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from collections.abc import Callable, Mapping, Sequence +from enum import Enum +from typing import Any, Required, TypedDict + +import click + + +class Topic(Enum): + """ + These are the default topic names used by Sentry. They must match + the registered values in sentry-kafka-schemas. + """ + + EVENTS = "events" + EVENTS_COMMIT_LOG = "snuba-commit-log" + TRANSACTIONS = "transactions" + TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log" + OUTCOMES = "outcomes" + OUTCOMES_BILLING = "outcomes-billing" + EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results" + TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results" + GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results" + SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" + METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" + INGEST_EVENTS = "ingest-events" + INGEST_EVENTS_DLQ = "ingest-events-dlq" + INGEST_ATTACHMENTS = "ingest-attachments" + INGEST_TRANSACTIONS = "ingest-transactions" + INGEST_METRICS = "ingest-metrics" + INGEST_METRICS_DLQ = "ingest-metrics-dlq" + SNUBA_METRICS = "snuba-metrics" + PROFILES = "profiles" + INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" + INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" + SNUBA_GENERIC_METRICS = "snuba-generic-metrics" + INGEST_REPLAY_EVENTS = "ingest-replay-events" + INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" + INGEST_OCCURRENCES = "ingest-occurrences" + INGEST_MONITORS = "ingest-monitors" + EVENTSTREAM_GENERIC = "generic-events" + GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log" + GROUP_ATTRIBUTES = "group-attributes" + SHARED_RESOURCES_USAGE = "shared-resources-usage" + SNUBA_SPANS = "snuba-spans" + + +class ConsumerDefinition(TypedDict, total=False): + + # XXX: Eventually only Topic will be accepted here. + # For backward compatibility with getsentry, we must also + # support the physical override topic name (str, Callable[str], str) + # while the migration is taking place + topic: Required[Topic | str | Callable[[], str]] + + # Schema validation will be run if true + validate_schema: bool | None + + strategy_factory: Required[str] + + # Additional CLI options the consumer should accept. These arguments are + # passed as kwargs to the strategy_factory. + click_options: Sequence[click.Option] + + # Hardcoded additional kwargs for strategy_factory + static_args: Mapping[str, Any] + + require_synchronization: bool + synchronize_commit_group_default: str + synchronize_commit_log_topic_default: str + + dlq_topic: str + dlq_max_invalid_ratio: float | None + dlq_max_consecutive_count: int | None + + +def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None: + if "dlq_topic" not in consumer_definition and ( + "dlq_max_invalid_ratio" in consumer_definition + or "dlq_max_consecutive_count" in consumer_definition + ): + raise ValueError( + "Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not" + ) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 92215abdd42352..400311869c8082 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -13,7 +13,11 @@ from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from django.conf import settings -from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition +from sentry.conf.types.kafka_definition import ( + ConsumerDefinition, + Topic, + validate_consumer_definition, +) from sentry.consumers.validate_schema import ValidateSchema from sentry.utils.imports import import_string from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -184,10 +188,8 @@ def ingest_events_options() -> list[click.Option]: ] # consumer name -> consumer definition -# XXX: default_topic is needed to lookup the schema even if the actual topic name has been -# overridden. This is because the current topic override mechanism means the default topic name -# is no longer available anywhere in code. We should probably fix this later so we don't need both -# "topic" and "default_topic" here though. +# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden +# string. We support both currently for backward compatibility. KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = { "ingest-profiles": { "topic": settings.KAFKA_PROFILES, @@ -237,8 +239,8 @@ def ingest_events_options() -> list[click.Option]: }, }, "generic-metrics-subscription-results": { - "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, - "default_topic": "generic-metrics-subscription-results", + "topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, + "validate_schema": True, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -394,12 +396,19 @@ def get_stream_processor( ) from e strategy_factory_cls = import_string(consumer_definition["strategy_factory"]) - logical_topic = consumer_definition["topic"] - if not isinstance(logical_topic, str): - logical_topic = logical_topic() + consumer_topic = consumer_definition["topic"] + if isinstance(consumer_topic, Topic): + default_topic = consumer_topic.value + real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic) + else: + # TODO: Deprecated, remove once this way is no longer used + if not isinstance(consumer_topic, str): + real_topic = consumer_topic() + else: + real_topic = consumer_topic if topic is None: - topic = logical_topic + topic = real_topic cmd = click.Command( name=consumer_name, params=list(consumer_definition.get("click_options") or ()) @@ -412,13 +421,14 @@ def get_stream_processor( from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration from arroyo.backends.kafka.consumer import KafkaConsumer from arroyo.commit import ONCE_PER_SECOND - from arroyo.types import Topic + from arroyo.types import Topic as ArroyoTopic from django.conf import settings from sentry.utils import kafka_config - topic_def = settings.KAFKA_TOPICS[logical_topic] + topic_def = settings.KAFKA_TOPICS[real_topic] assert topic_def is not None + if cluster is None: cluster = topic_def["cluster"] @@ -474,7 +484,7 @@ def build_consumer_config(group_id: str): consumer = SynchronizedConsumer( consumer=consumer, commit_log_consumer=commit_log_consumer, - commit_log_topic=Topic(synchronize_commit_log_topic), + commit_log_topic=ArroyoTopic(synchronize_commit_log_topic), commit_log_groups={synchronize_commit_group}, ) elif consumer_definition.get("require_synchronization"): @@ -482,11 +492,16 @@ def build_consumer_config(group_id: str): "--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer" ) - # Validate schema if "default_topic" is set - default_topic = consumer_definition.get("default_topic") - if default_topic: + # Validate schema if "validate_schema" is set + validate_schema = consumer_definition.get("validate_schema") or False + + if validate_schema: + # TODO: Remove this later but for now we can only validate if `topic_def` is + # the logical topic and not the legacy override topic + assert isinstance(consumer_topic, Topic) + strategy_factory = ValidateSchemaStrategyFactoryWrapper( - default_topic, validate_schema, strategy_factory + consumer_topic.value, validate_schema, strategy_factory ) if healthcheck_file_path is not None: @@ -512,7 +527,7 @@ def build_consumer_config(group_id: str): dlq_producer = KafkaProducer(producer_config) dlq_policy = DlqPolicy( - KafkaDlqProducer(dlq_producer, Topic(dlq_topic)), + KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)), DlqLimit( max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"], max_consecutive_count=consumer_definition["dlq_max_consecutive_count"], @@ -524,7 +539,7 @@ def build_consumer_config(group_id: str): return StreamProcessor( consumer=consumer, - topic=Topic(topic), + topic=ArroyoTopic(topic), processor_factory=strategy_factory, commit_policy=ONCE_PER_SECOND, join_timeout=join_timeout, diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index f0698fcaa9c1f8..038a66bfa72238 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -364,11 +364,14 @@ def devserver( """ ) + from sentry.conf.types.kafka_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics - for topic_name, topic_data in settings.KAFKA_TOPICS.items(): - if topic_data is not None: - create_topics(topic_data["cluster"], [topic_name]) + for topic in Topic: + default_name = topic.value + physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name) + cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name] + create_topics(cluster_name, [physical_name]) if dev_consumer: daemons.append( diff --git a/tests/sentry/conf/test_consumer_definitions.py b/tests/sentry/conf/test_consumer_definitions.py deleted file mode 100644 index c2e0771df7f0bf..00000000000000 --- a/tests/sentry/conf/test_consumer_definitions.py +++ /dev/null @@ -1,27 +0,0 @@ -import pytest - -from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition -from sentry.consumers import KAFKA_CONSUMERS -from sentry.testutils.cases import TestCase - - -class ConsumersDefinitionTest(TestCase): - def test_exception_on_invalid_consumer_definition(self): - invalid_definitions: list[ConsumerDefinition] = [ - { - "topic": "topic", - "strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", - "static_args": { - "ingest_profile": "release-health", - }, - "dlq_max_invalid_ratio": 0.01, - "dlq_max_consecutive_count": 1000, - } - ] - for invalid_definition in invalid_definitions: - with pytest.raises(ValueError): - validate_consumer_definition(invalid_definition) - - def test_kafka_consumer_definition_validity(self): - for definition in KAFKA_CONSUMERS.values(): - validate_consumer_definition(definition) diff --git a/tests/sentry/conf/test_kafka_definition.py b/tests/sentry/conf/test_kafka_definition.py new file mode 100644 index 00000000000000..e1cbebe8c2e759 --- /dev/null +++ b/tests/sentry/conf/test_kafka_definition.py @@ -0,0 +1,66 @@ +import pytest +import sentry_kafka_schemas +from django.conf import settings + +from sentry.conf.types.kafka_definition import ( + ConsumerDefinition, + Topic, + validate_consumer_definition, +) +from sentry.consumers import KAFKA_CONSUMERS +from sentry.testutils.cases import TestCase + + +def test_topic_definition() -> None: + # All topic are registered + # TODO: Remove this once these topics are actually registered in sentry-kafka-schemas + currently_unregistered_topics = [ + "outcomes-billing", + "ingest-events", + "ingest-events-dlq", + "ingest-attachments", + "ingest-transactions", + "ingest-metrics-dlq", + "profiles", + "ingest-generic-metrics-dlq", + "ingest-occurrences", + "ingest-monitors", + ] + + for topic in Topic: + if topic.value not in currently_unregistered_topics: + assert sentry_kafka_schemas.get_topic(topic.value) is not None + + for topic in Topic: + cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value] + assert ( + cluster_name in settings.KAFKA_CLUSTERS + ), f"{cluster_name} is not defined in KAFKA_CLUSTERS" + + for default_topic in settings.KAFKA_TOPIC_OVERRIDES: + # Ensure all override topics are in the enum + Topic(default_topic) + + assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) + + +class ConsumersDefinitionTest(TestCase): + def test_exception_on_invalid_consumer_definition(self): + invalid_definitions: list[ConsumerDefinition] = [ + { + "topic": "topic", + "strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", + "static_args": { + "ingest_profile": "release-health", + }, + "dlq_max_invalid_ratio": 0.01, + "dlq_max_consecutive_count": 1000, + } + ] + for invalid_definition in invalid_definitions: + with pytest.raises(ValueError): + validate_consumer_definition(invalid_definition) + + def test_kafka_consumer_definition_validity(self): + for definition in KAFKA_CONSUMERS.values(): + validate_consumer_definition(definition) diff --git a/tests/sentry/consumers/test_run.py b/tests/sentry/consumers/test_run.py index a20c578480b786..7b1e6236d26375 100644 --- a/tests/sentry/consumers/test_run.py +++ b/tests/sentry/consumers/test_run.py @@ -2,7 +2,7 @@ from arroyo.processing.strategies.abstract import ProcessingStrategyFactory from sentry import consumers -from sentry.conf.types.consumer_definition import ConsumerDefinition +from sentry.conf.types.kafka_definition import ConsumerDefinition, Topic from sentry.utils.imports import import_string @@ -16,4 +16,9 @@ def test_all_importable(consumer_def, settings): assert issubclass(factory, ProcessingStrategyFactory) topic = defn["topic"] - assert topic is None or topic in settings.KAFKA_TOPICS + if isinstance(topic, Topic): + assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER + else: + # TODO: Legacy way, will be deprecated once all consumer definitions + # are migrated + assert topic in settings.KAFKA_TOPICS