From 40bf48466b8a36638481c7f88548e9afe0cc704e Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 09:42:29 -0800 Subject: [PATCH 01/42] ref: Rationalize Kafka topic config --- src/sentry/conf/server.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 2b072f88d644b6..7f0b37ef63015e 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3458,16 +3458,16 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # spans KAFKA_SNUBA_SPANS = "snuba-spans" -KAFKA_SUBSCRIPTION_RESULT_TOPICS = { - "events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, - "transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, - "generic-metrics": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, - "sessions": KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, - "metrics": KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, -} +# Mapping of default Kafka topic name to custom names +KAFKA_TOPIC_MAP: Mapping[str, str] = {} + + +# Mapping of default Kafka topic name to broker config +KAFKA_BROKER_CONFIG: Mapping[str, Mapping[str, TopicDefinition | None]] = {} # Cluster configuration for each Kafka topic by name. +# Deprecated KAFKA_TOPICS: Mapping[str, TopicDefinition | None] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, From 112473d8e22d6b4f8b93c964eb7f8c4d5c1cbe60 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 10:01:32 -0800 Subject: [PATCH 02/42] add default --- src/sentry/conf/server.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 7f0b37ef63015e..322361ab562a9c 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3461,9 +3461,21 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # Mapping of default Kafka topic name to custom names KAFKA_TOPIC_MAP: Mapping[str, str] = {} +DEFAULT_BROKER_CONFIG: Mapping[str, Any] = { + "bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "127.0.0.1:9092"), + "security.protocol": os.environ.get("KAFKA_SECURITY_PROTOCOL", "plaintext"), + "ssl.ca.location": os.environ.get("KAFKA_SSL_CA_PATH", ""), + "ssl.certificate.location": os.environ.get("KAFKA_SSL_CERT_PATH", ""), + "ssl.key.location": os.environ.get("KAFKA_SSL_KEY_PATH", ""), + "sasl.mechanism": os.environ.get("KAFKA_SASL_MECHANISM", None), + "sasl.username": os.environ.get("KAFKA_SASL_USERNAME", None), + "sasl.password": os.environ.get("KAFKA_SASL_PASSWORD", None), +} # Mapping of default Kafka topic name to broker config -KAFKA_BROKER_CONFIG: Mapping[str, Mapping[str, TopicDefinition | None]] = {} +KAFKA_BROKER_CONFIG: Mapping[str, Mapping[str, TopicDefinition | None]] = { + +} # Cluster configuration for each Kafka topic by name. From 05cfb0a5ba89e968d1a9a5e56733b23798b150fd Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Mon, 26 Feb 2024 18:02:28 +0000 Subject: [PATCH 03/42] :hammer_and_wrench: apply pre-commit fixes --- src/sentry/conf/server.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 322361ab562a9c..15eedcfa55c33d 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3473,9 +3473,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] } # Mapping of default Kafka topic name to broker config -KAFKA_BROKER_CONFIG: Mapping[str, Mapping[str, TopicDefinition | None]] = { - -} +KAFKA_BROKER_CONFIG: Mapping[str, Mapping[str, TopicDefinition | None]] = {} # Cluster configuration for each Kafka topic by name. From 977df66f725d7311d58e8602f873673a30435bfe Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 10:16:24 -0800 Subject: [PATCH 04/42] . --- src/sentry/conf/server.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 15eedcfa55c33d..9d62f743efa88b 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3459,21 +3459,12 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] KAFKA_SNUBA_SPANS = "snuba-spans" # Mapping of default Kafka topic name to custom names -KAFKA_TOPIC_MAP: Mapping[str, str] = {} - -DEFAULT_BROKER_CONFIG: Mapping[str, Any] = { - "bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "127.0.0.1:9092"), - "security.protocol": os.environ.get("KAFKA_SECURITY_PROTOCOL", "plaintext"), - "ssl.ca.location": os.environ.get("KAFKA_SSL_CA_PATH", ""), - "ssl.certificate.location": os.environ.get("KAFKA_SSL_CERT_PATH", ""), - "ssl.key.location": os.environ.get("KAFKA_SSL_KEY_PATH", ""), - "sasl.mechanism": os.environ.get("KAFKA_SASL_MECHANISM", None), - "sasl.username": os.environ.get("KAFKA_SASL_USERNAME", None), - "sasl.password": os.environ.get("KAFKA_SASL_PASSWORD", None), -} +KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {} # Mapping of default Kafka topic name to broker config -KAFKA_BROKER_CONFIG: Mapping[str, Mapping[str, TopicDefinition | None]] = {} +KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition | None]] = { + {"cluster": "default"} +} # Cluster configuration for each Kafka topic by name. From 80257121abd258bb338713c2e7a7396e22beea73 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 10:19:25 -0800 Subject: [PATCH 05/42] cleanup topic autocreation --- src/sentry/conf/server.py | 3 --- src/sentry/runner/commands/devserver.py | 2 +- src/sentry/utils/batching_kafka_consumer.py | 10 ++++------ 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 9d62f743efa88b..fe544179a682a0 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3511,9 +3511,6 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"}, } - -# If True, consumers will create the topics if they don't exist -KAFKA_CONSUMER_AUTO_CREATE_TOPICS = True # If True, sentry.utils.arroyo.RunTaskWithMultiprocessing will actually be # single-threaded under the hood for performance KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING = False diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 76db901ca8d598..f0698fcaa9c1f8 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -368,7 +368,7 @@ def devserver( for topic_name, topic_data in settings.KAFKA_TOPICS.items(): if topic_data is not None: - create_topics(topic_data["cluster"], [topic_name], force=True) + create_topics(topic_data["cluster"], [topic_name]) if dev_consumer: daemons.append( diff --git a/src/sentry/utils/batching_kafka_consumer.py b/src/sentry/utils/batching_kafka_consumer.py index 91bf75dfd3f45b..d7f1a9d40f6259 100644 --- a/src/sentry/utils/batching_kafka_consumer.py +++ b/src/sentry/utils/batching_kafka_consumer.py @@ -3,7 +3,6 @@ from confluent_kafka import KafkaError from confluent_kafka.admin import AdminClient -from django.conf import settings from sentry.utils import kafka_config @@ -53,8 +52,7 @@ def create_topics(cluster_name: str, topics: list[str], force: bool = False) -> topics must be from the same cluster. """ - if settings.KAFKA_CONSUMER_AUTO_CREATE_TOPICS or force: - conf = kafka_config.get_kafka_admin_cluster_options(cluster_name) - # Topics are implicitly created here - admin_client = AdminClient(conf) - wait_for_topics(admin_client, topics) + conf = kafka_config.get_kafka_admin_cluster_options(cluster_name) + # Topics are implicitly created here + admin_client = AdminClient(conf) + wait_for_topics(admin_client, topics) From 3a510f22ae389ec0db859a81d4b9fbf4ea5a9893 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 17:08:16 -0800 Subject: [PATCH 06/42] add all the default topic names --- src/sentry/conf/types/topic_definition.py | 36 +++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index 41992b74d9ad78..c241b653bb2035 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -1,7 +1,39 @@ -from __future__ import annotations - +from enum import Enum from typing import TypedDict +class Topic(Enum): + EVENTS = "events" + EVENTS_COMMIT_LOG = "snuba-commit-log" + TRANSACTIONS = "transactions" + TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log" + OUTCOMES = "outcomes" + OUTCOMES_BILLING = "outcomes-billing" + EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results" + TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results" + GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results" + SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" + METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" + INGEST_EVENTS = "ingest-events" + INGEST_ATTACHMENTS = "ingest-attachments" + INGEST_TRANSACTIONS = "ingest-transactions" + INGEST_METRICS = "ingest-metrics" + INGEST_METRICS_DLQ = "ingest-metrics-dlq" + SNUBA_METRICS = "snuba-metrics" + KAFKA_PROFILES = "profiles" + INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" + INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" + SNUBA_GENERIC_METRICS = "snuba-generic-metrics" + INGEST_REPLAY_EVENTS = "ingest-replay-events" + INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" + INGEST_OCCURRENCES = "ingest-occurrences" + INGEST_MONITORS = "ingest-monitors" + EVENTSTREAM_GENERIC = "generic-events" + GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log" + GROUP_ATTRIBUTES = "group-attributes" + SHARED_RESOURCES_USAGE = "shared-resources-usage" + SNUBA_SPANS = "snuba-spans" + + class TopicDefinition(TypedDict): cluster: str From 348c290e3c098d8c678d3ef9bd5820866043a5f5 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 17:24:37 -0800 Subject: [PATCH 07/42] add topics --- src/sentry/conf/types/consumer_definition.py | 10 ++- src/sentry/conf/types/topic_definition.py | 2 +- src/sentry/consumers/__init__.py | 81 +++++++++++++------- 3 files changed, 61 insertions(+), 32 deletions(-) diff --git a/src/sentry/conf/types/consumer_definition.py b/src/sentry/conf/types/consumer_definition.py index 7f2d51b93af202..c54b6eec4c94a5 100644 --- a/src/sentry/conf/types/consumer_definition.py +++ b/src/sentry/conf/types/consumer_definition.py @@ -5,11 +5,17 @@ import click +from sentry.conf.types.topic_definition import Topic + class ConsumerDefinition(TypedDict, total=False): # Which logical topic from settings to use. - topic: Required[str | Callable[[], str]] - default_topic: str + topic: Topic + + # Override topic. To be deprecated + topic_override: Required[str | Callable[[], str]] + + validate_schema: str strategy_factory: Required[str] diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index c241b653bb2035..57d1c1e2cffff1 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -20,7 +20,7 @@ class Topic(Enum): INGEST_METRICS = "ingest-metrics" INGEST_METRICS_DLQ = "ingest-metrics-dlq" SNUBA_METRICS = "snuba-metrics" - KAFKA_PROFILES = "profiles" + PROFILES = "profiles" INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" SNUBA_GENERIC_METRICS = "snuba-generic-metrics" diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 3b0c485d9d6e56..6aa2763b113b54 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -14,6 +14,7 @@ from django.conf import settings from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition +from sentry.conf.types.topic_definition import Topic from sentry.consumers.validate_schema import ValidateSchema from sentry.utils.imports import import_string from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -163,44 +164,49 @@ def ingest_monitors_options() -> list[click.Option]: ] # consumer name -> consumer definition -# XXX: default_topic is needed to lookup the schema even if the actual topic name has been -# overridden. This is because the current topic override mechanism means the default topic name -# is no longer available anywhere in code. We should probably fix this later so we don't need both -# "topic" and "default_topic" here though. +# XXX: topic_override will be deprecated here in favor of KAFKA_TOPIC_OVERRIDE once all overrides in +# prod have been migrated KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = { "ingest-profiles": { - "topic": settings.KAFKA_PROFILES, + "topic": Topic.PROFILES, + "topic_override": settings.KAFKA_PROFILES, "strategy_factory": "sentry.profiles.consumers.process.factory.ProcessProfileStrategyFactory", }, "ingest-replay-recordings": { - "topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, + "topic": Topic.EVENTS, + "topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, "strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory", "click_options": ingest_replay_recordings_options(), }, "ingest-replay-recordings-buffered": { - "topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, + "topic": Topic.INGEST_REPLAYS_RECORDINGS, + "topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, "strategy_factory": "sentry.replays.consumers.recording_buffered.RecordingBufferedStrategyFactory", "click_options": ingest_replay_recordings_buffered_options(), }, "ingest-monitors": { - "topic": settings.KAFKA_INGEST_MONITORS, + "topic": Topic.INGEST_MONITORS, + "topic_override": settings.KAFKA_INGEST_MONITORS, "strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory", "click_options": ingest_monitors_options(), }, "billing-metrics-consumer": { - "topic": settings.KAFKA_SNUBA_GENERIC_METRICS, + "topic": Topic.SNUBA_GENERIC_METRICS, + "topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS, "strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory", }, # Known differences to 'sentry run occurrences-ingest-consumer': # - ingest_consumer_types metric tag is missing. Use the kafka_topic and # group_id tags provided by run_basic_consumer instead "ingest-occurrences": { - "topic": settings.KAFKA_INGEST_OCCURRENCES, + "topic": Topic.INGEST_OCCURRENCES, + "topic_override": settings.KAFKA_INGEST_OCCURRENCES, "strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=20), }, "events-subscription-results": { - "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, + "topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS, + "topic_override": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -208,7 +214,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "transactions-subscription-results": { - "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, + "topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS, + "topic_override": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -216,8 +223,9 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "generic-metrics-subscription-results": { - "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, - "default_topic": "generic-metrics-subscription-results", + "topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, + "topic_override": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, + "validate_schema": True, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -225,7 +233,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "sessions-subscription-results": { - "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, + "topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS, + "topic_override": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(), "static_args": { @@ -233,7 +242,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "metrics-subscription-results": { - "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, + "topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS, + "topic_override": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -241,7 +251,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-events": { - "topic": settings.KAFKA_INGEST_EVENTS, + "topic": Topic.INGEST_EVENTS, + "topic_override": settings.KAFKA_INGEST_EVENTS, "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -249,7 +260,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-attachments": { - "topic": settings.KAFKA_INGEST_ATTACHMENTS, + "topic": Topic.INGEST_ATTACHMENTS, + "topic_override": settings.KAFKA_INGEST_ATTACHMENTS, "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -257,7 +269,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-transactions": { - "topic": settings.KAFKA_INGEST_TRANSACTIONS, + "topic": Topic.INGEST_TRANSACTIONS, + "topic_override": settings.KAFKA_INGEST_TRANSACTIONS, "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -265,7 +278,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-metrics": { - "topic": settings.KAFKA_INGEST_METRICS, + "topic": Topic.INGEST_METRICS, + "topic_override": settings.KAFKA_INGEST_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", "click_options": _METRICS_INDEXER_OPTIONS, "static_args": { @@ -276,7 +290,8 @@ def ingest_monitors_options() -> list[click.Option]: "dlq_max_consecutive_count": 1000, }, "ingest-generic-metrics": { - "topic": settings.KAFKA_INGEST_PERFORMANCE_METRICS, + "topic": Topic.INGEST_PERFORMANCE_METRICS, + "topic_override": settings.KAFKA_INGEST_PERFORMANCE_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", "click_options": _METRICS_INDEXER_OPTIONS, "static_args": { @@ -287,7 +302,8 @@ def ingest_monitors_options() -> list[click.Option]: "dlq_max_consecutive_count": 1000, }, "generic-metrics-last-seen-updater": { - "topic": settings.KAFKA_SNUBA_GENERIC_METRICS, + "topic": Topic.SNUBA_GENERIC_METRICS, + "topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory", "click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS, "static_args": { @@ -295,7 +311,8 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "metrics-last-seen-updater": { - "topic": settings.KAFKA_SNUBA_METRICS, + "topic": Topic.SNUBA_METRICS, + "topic_override": settings.KAFKA_SNUBA_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory", "click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS, "static_args": { @@ -303,30 +320,35 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "post-process-forwarder-issue-platform": { - "topic": settings.KAFKA_EVENTSTREAM_GENERIC, + "topic": Topic.EVENTSTREAM_GENERIC, + "topic_override": settings.KAFKA_EVENTSTREAM_GENERIC, "strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", "synchronize_commit_log_topic_default": "snuba-generic-events-commit-log", "synchronize_commit_group_default": "generic_events_group", "click_options": _POST_PROCESS_FORWARDER_OPTIONS, }, "post-process-forwarder-transactions": { - "topic": settings.KAFKA_TRANSACTIONS, + "topic": Topic.TRANSACTIONS, + "topic_override": settings.KAFKA_TRANSACTIONS, "strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", "synchronize_commit_log_topic_default": "snuba-transactions-commit-log", "synchronize_commit_group_default": "transactions_group", "click_options": _POST_PROCESS_FORWARDER_OPTIONS, }, "post-process-forwarder-errors": { - "topic": settings.KAFKA_EVENTS, + "topic": Topic.EVENTS, + "topic_override": settings.KAFKA_EVENTS, "strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", "synchronize_commit_log_topic_default": "snuba-commit-log", "synchronize_commit_group_default": "snuba-consumers", "click_options": _POST_PROCESS_FORWARDER_OPTIONS, }, "process-spans": { - "topic": settings.KAFKA_SNUBA_SPANS, + "topic": Topic.SNUBA_SPANS, + "topic_override": settings.KAFKA_SNUBA_SPANS, "strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", }, + # TODO: This has to be co-ordinated with getsentry **settings.SENTRY_KAFKA_CONSUMERS, } @@ -396,6 +418,7 @@ def get_stream_processor( from sentry.utils import kafka_config + # Still supported for now until we have moved everything in prod over topic_def = settings.KAFKA_TOPICS[logical_topic] assert topic_def is not None if cluster is None: @@ -461,8 +484,8 @@ def build_consumer_config(group_id: str): "--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer" ) - # Validate schema if "default_topic" is set - default_topic = consumer_definition.get("default_topic") + # Validate schema if "validate_schema" is set + default_topic = consumer_definition.get("validate_schema") if default_topic: strategy_factory = ValidateSchemaStrategyFactoryWrapper( default_topic, validate_schema, strategy_factory From 9f7206ef1a47809592237629bbc4b7b9b68cc4c6 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 26 Feb 2024 17:25:48 -0800 Subject: [PATCH 08/42] comment no longer applies --- src/sentry/utils/batching_kafka_consumer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sentry/utils/batching_kafka_consumer.py b/src/sentry/utils/batching_kafka_consumer.py index d7f1a9d40f6259..5500cf23d8255d 100644 --- a/src/sentry/utils/batching_kafka_consumer.py +++ b/src/sentry/utils/batching_kafka_consumer.py @@ -53,6 +53,5 @@ def create_topics(cluster_name: str, topics: list[str], force: bool = False) -> topics must be from the same cluster. """ conf = kafka_config.get_kafka_admin_cluster_options(cluster_name) - # Topics are implicitly created here admin_client = AdminClient(conf) wait_for_topics(admin_client, topics) From e7f1e9a9db376f25db908f79014ec4fa86fe4231 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 27 Feb 2024 13:07:15 -0800 Subject: [PATCH 09/42] fix comment --- src/sentry/conf/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index fe544179a682a0..4edbe28a430a2c 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3461,7 +3461,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # Mapping of default Kafka topic name to custom names KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {} -# Mapping of default Kafka topic name to broker config +# Mapping of default Kafka topic name to topic definition KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition | None]] = { {"cluster": "default"} } From 7bb9d4f7ce30990e5dcf1ffe3805ce59fa4b3628 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 27 Feb 2024 13:26:07 -0800 Subject: [PATCH 10/42] validate_schema is bool --- src/sentry/conf/types/consumer_definition.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sentry/conf/types/consumer_definition.py b/src/sentry/conf/types/consumer_definition.py index c54b6eec4c94a5..06c0172b9d133c 100644 --- a/src/sentry/conf/types/consumer_definition.py +++ b/src/sentry/conf/types/consumer_definition.py @@ -15,7 +15,8 @@ class ConsumerDefinition(TypedDict, total=False): # Override topic. To be deprecated topic_override: Required[str | Callable[[], str]] - validate_schema: str + # Schema validation will be run if true + validate_schema: bool | None strategy_factory: Required[str] From 5635beee026c93009cc044691642389cf723d0ab Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 12:34:06 -0800 Subject: [PATCH 11/42] ref: Fix billing kafka config We should not special case the kafka config for this particular application feature. --- src/sentry/conf/server.py | 6 ++---- src/sentry/utils/outcomes.py | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 632866f514812d..c82a40e2090267 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3465,15 +3465,13 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # Cluster configuration for each Kafka topic by name. -KAFKA_TOPICS: Mapping[str, TopicDefinition | None] = { +KAFKA_TOPICS: Mapping[str, TopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, KAFKA_TRANSACTIONS: {"cluster": "default"}, KAFKA_TRANSACTIONS_COMMIT_LOG: {"cluster": "default"}, KAFKA_OUTCOMES: {"cluster": "default"}, - # When OUTCOMES_BILLING is None, it inherits from OUTCOMES and does not - # create a separate producer. Check ``track_outcome`` for details. - KAFKA_OUTCOMES_BILLING: None, + KAFKA_OUTCOMES_BILLING: {"cluster": "default"}, KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: {"cluster": "default"}, KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: {"cluster": "default"}, KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS: {"cluster": "default"}, diff --git a/src/sentry/utils/outcomes.py b/src/sentry/utils/outcomes.py index 183f6286c6bbe3..e064e6d32b15e9 100644 --- a/src/sentry/utils/outcomes.py +++ b/src/sentry/utils/outcomes.py @@ -74,9 +74,8 @@ def track_outcome( outcomes_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES) billing_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES_BILLING) - use_billing = ( - outcome.is_billing() and settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] is not None - ) + + use_billing = outcome.is_billing() and outcomes_config != billing_config # Create a second producer instance only if the cluster differs. Otherwise, # reuse the same producer and just send to the other topic. From 4880de475895d88137bd23db7af523287219aa7d Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 12:43:53 -0800 Subject: [PATCH 12/42] remove more special casing --- src/sentry/utils/kafka_config.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/sentry/utils/kafka_config.py b/src/sentry/utils/kafka_config.py index af809d8ebd0136..392e641f9c0a74 100644 --- a/src/sentry/utils/kafka_config.py +++ b/src/sentry/utils/kafka_config.py @@ -106,11 +106,9 @@ def _validate_topic_definitions() -> None: _validate_topic_definitions() -def get_topic_definition(cluster: str) -> TopicDefinition: - defn = settings.KAFKA_TOPICS.get(cluster) +def get_topic_definition(topic: str) -> TopicDefinition: + defn = settings.KAFKA_TOPICS.get(topic) if defn is not None: return defn - elif cluster == settings.KAFKA_OUTCOMES_BILLING: - return get_topic_definition(settings.KAFKA_OUTCOMES) else: - raise ValueError(f"Unknown {cluster=}") + raise ValueError(f"Unknown {topic=}") From 334b83d4eeb30e8d6b11fcb17108adbdcf3f1060 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 12:46:06 -0800 Subject: [PATCH 13/42] more cleanup of special cased things --- src/sentry/utils/kafka_config.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/sentry/utils/kafka_config.py b/src/sentry/utils/kafka_config.py index 392e641f9c0a74..2ca53a67bf3a47 100644 --- a/src/sentry/utils/kafka_config.py +++ b/src/sentry/utils/kafka_config.py @@ -96,16 +96,6 @@ def get_kafka_admin_cluster_options( ) -def _validate_topic_definitions() -> None: - for cluster, defn in settings.KAFKA_TOPICS.items(): - # If this assertion fails on import, get_topic_definition needs to be - # updated with new logic. - assert cluster == settings.KAFKA_OUTCOMES_BILLING or defn is not None - - -_validate_topic_definitions() - - def get_topic_definition(topic: str) -> TopicDefinition: defn = settings.KAFKA_TOPICS.get(topic) if defn is not None: From 4f82e7ef05a7605b577736821aa076f24a2697d5 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 12:56:00 -0800 Subject: [PATCH 14/42] fix types --- src/sentry/conf/server.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index ee5ee50fc4d334..d72345a58052a1 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3459,14 +3459,12 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {} # Mapping of default Kafka topic name to topic definition -KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition | None]] = { - {"cluster": "default"} -} +KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition]] = {{"cluster": "default"}} # Cluster configuration for each Kafka topic by name. # Deprecated -KAFKA_TOPICS: Mapping[str, TopicDefinition | None] = { +KAFKA_TOPICS: Mapping[str, TopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, KAFKA_TRANSACTIONS: {"cluster": "default"}, From 855805d0d0f5652b17124a7272f0eb17fe7651a9 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 13:08:00 -0800 Subject: [PATCH 15/42] add the topic cluster mapping --- src/sentry/conf/server.py | 42 +++++++++++++++++++++++-- src/sentry/runner/commands/devserver.py | 5 +++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index d72345a58052a1..61824912e0fda9 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3421,6 +3421,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # `prod.py`, also override the entirety of `KAFKA_TOPICS` to ensure the keys # pick up the change. +# START DEPRECATED SECTION KAFKA_EVENTS = "events" KAFKA_EVENTS_COMMIT_LOG = "snuba-commit-log" KAFKA_TRANSACTIONS = "transactions" @@ -3454,16 +3455,51 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # spans KAFKA_SNUBA_SPANS = "snuba-spans" +# END DEPRECATED SECTION + # Mapping of default Kafka topic name to custom names KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {} -# Mapping of default Kafka topic name to topic definition -KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition]] = {{"cluster": "default"}} + +# Mapping of default Kafka topic name to cluster name. +KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, str]] = { + "events": "default", + "snuba-commit-log": "default", + "transactions": "default", + "snuba-transactions-commit-log": "default", + "outcomes": "default", + "outcomes-billing": "default", + "events-subscription-results": "default", + "transactions-subscription-results": "default", + "generic-metrics-subscription-results": "default", + "sessions-subscription-results": "default", + "metrics-subscription-results": "default", + "ingest-events": "default", + "ingest-attachments": "default", + "ingest-transactions": "default", + "ingest-metrics": "default", + "ingest-metrics-dlq": "default", + "snuba-metrics": "default", + "profiles": "default", + "ingest-performance-metrics": "default", + "ingest-generic-metrics-dlq": "default", + "snuba-generic-metrics": "default", + "ingest-replay-events": "default", + "ingest-replay-recordings": "default", + "ingest-occurrences": "default", + "ingest-monitors": "default", + "generic-events": "default", + "snuba-generic-events-commit-log": "default", + "group-attributes": "default", + "shared-resources-usage": "default", + "events": "default", + "snuba-spans": "default", +} # Cluster configuration for each Kafka topic by name. -# Deprecated +# DEPRECATED KAFKA_TOPICS: Mapping[str, TopicDefinition] = { KAFKA_EVENTS: {"cluster": "default"}, KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index f0698fcaa9c1f8..141b3c78662b21 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -364,8 +364,13 @@ def devserver( """ ) + from sentry.conf.types.topic_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics + # TODO: Implement this + for topic in Topic: + print(topic.name) + for topic_name, topic_data in settings.KAFKA_TOPICS.items(): if topic_data is not None: create_topics(topic_data["cluster"], [topic_name]) From 12f1f90720333e2bf4f377cf65ec9ff592773acc Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 13:18:59 -0800 Subject: [PATCH 16/42] devserver uses the new way --- src/sentry/runner/commands/devserver.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 141b3c78662b21..84d6810d29ccb9 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -367,13 +367,11 @@ def devserver( from sentry.conf.types.topic_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics - # TODO: Implement this for topic in Topic: - print(topic.name) - - for topic_name, topic_data in settings.KAFKA_TOPICS.items(): - if topic_data is not None: - create_topics(topic_data["cluster"], [topic_name]) + default_name = topic.value + physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name) + cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name] + create_topics(cluster_name, [physical_name]) if dev_consumer: daemons.append( From 5307df707b12507de48a529fd5c491b35d2bb658 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 13:23:16 -0800 Subject: [PATCH 17/42] test --- tests/sentry/conf/test_topic_definition.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 tests/sentry/conf/test_topic_definition.py diff --git a/tests/sentry/conf/test_topic_definition.py b/tests/sentry/conf/test_topic_definition.py new file mode 100644 index 00000000000000..3f83f8bf352ac8 --- /dev/null +++ b/tests/sentry/conf/test_topic_definition.py @@ -0,0 +1,10 @@ +# Every topic is mapped to a cluster +from django.conf import settings + +from sentry.conf.types.topic_definition import Topic + + +def test_topic_definition() -> None: + for topic in Topic: + assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER + assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) From d6f8cda9fde01d53e3a84fbb1617a7623fec2367 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 14:11:26 -0800 Subject: [PATCH 18/42] fix --- src/sentry/utils/outcomes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/utils/outcomes.py b/src/sentry/utils/outcomes.py index e064e6d32b15e9..4aa2951b4bc0aa 100644 --- a/src/sentry/utils/outcomes.py +++ b/src/sentry/utils/outcomes.py @@ -75,7 +75,7 @@ def track_outcome( outcomes_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES) billing_config = kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES_BILLING) - use_billing = outcome.is_billing() and outcomes_config != billing_config + use_billing = outcome.is_billing() # Create a second producer instance only if the cluster differs. Otherwise, # reuse the same producer and just send to the other topic. From 0796771f719820e3fd4c6be6cf6d59e05354d6ba Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 14:24:11 -0800 Subject: [PATCH 19/42] build it in a more backward compatible way --- src/sentry/conf/types/consumer_definition.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/sentry/conf/types/consumer_definition.py b/src/sentry/conf/types/consumer_definition.py index 06c0172b9d133c..7e62b3dee8c5a6 100644 --- a/src/sentry/conf/types/consumer_definition.py +++ b/src/sentry/conf/types/consumer_definition.py @@ -9,11 +9,12 @@ class ConsumerDefinition(TypedDict, total=False): - # Which logical topic from settings to use. - topic: Topic - # Override topic. To be deprecated - topic_override: Required[str | Callable[[], str]] + # XXX: Eventually only Topic will be accepted here. + # For backward compatibility with getsentry, we must also + # support the physical override topic name (str, Callable[str], str) + # while the migration is taking place + topic: Required[Topic | str | Callable[[], str]] # Schema validation will be run if true validate_schema: bool | None From 679da0908384ffe10d59b067a1f0d0559ab291a0 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 28 Feb 2024 14:33:39 -0800 Subject: [PATCH 20/42] revert for backward compatibility --- src/sentry/consumers/__init__.py | 69 +++++++++++--------------------- 1 file changed, 23 insertions(+), 46 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 6aa2763b113b54..2972694bedc29e 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -14,7 +14,6 @@ from django.conf import settings from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition -from sentry.conf.types.topic_definition import Topic from sentry.consumers.validate_schema import ValidateSchema from sentry.utils.imports import import_string from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -164,49 +163,42 @@ def ingest_monitors_options() -> list[click.Option]: ] # consumer name -> consumer definition -# XXX: topic_override will be deprecated here in favor of KAFKA_TOPIC_OVERRIDE once all overrides in -# prod have been migrated +# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden +# string. We support both currently for backward compatibility. KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = { "ingest-profiles": { - "topic": Topic.PROFILES, - "topic_override": settings.KAFKA_PROFILES, + "topic": settings.KAFKA_PROFILES, "strategy_factory": "sentry.profiles.consumers.process.factory.ProcessProfileStrategyFactory", }, "ingest-replay-recordings": { - "topic": Topic.EVENTS, - "topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, + "topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, "strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory", "click_options": ingest_replay_recordings_options(), }, "ingest-replay-recordings-buffered": { - "topic": Topic.INGEST_REPLAYS_RECORDINGS, - "topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, + "topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, "strategy_factory": "sentry.replays.consumers.recording_buffered.RecordingBufferedStrategyFactory", "click_options": ingest_replay_recordings_buffered_options(), }, "ingest-monitors": { - "topic": Topic.INGEST_MONITORS, "topic_override": settings.KAFKA_INGEST_MONITORS, "strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory", "click_options": ingest_monitors_options(), }, "billing-metrics-consumer": { - "topic": Topic.SNUBA_GENERIC_METRICS, - "topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS, + "topic": settings.KAFKA_SNUBA_GENERIC_METRICS, "strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory", }, # Known differences to 'sentry run occurrences-ingest-consumer': # - ingest_consumer_types metric tag is missing. Use the kafka_topic and # group_id tags provided by run_basic_consumer instead "ingest-occurrences": { - "topic": Topic.INGEST_OCCURRENCES, - "topic_override": settings.KAFKA_INGEST_OCCURRENCES, + "topic": settings.KAFKA_INGEST_OCCURRENCES, "strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=20), }, "events-subscription-results": { - "topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS, - "topic_override": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, + "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -214,8 +206,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "transactions-subscription-results": { - "topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS, - "topic_override": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, + "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -223,8 +214,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "generic-metrics-subscription-results": { - "topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, - "topic_override": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, + "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, "validate_schema": True, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), @@ -233,8 +223,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "sessions-subscription-results": { - "topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS, - "topic_override": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, + "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(), "static_args": { @@ -242,8 +231,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "metrics-subscription-results": { - "topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS, - "topic_override": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, + "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -251,8 +239,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-events": { - "topic": Topic.INGEST_EVENTS, - "topic_override": settings.KAFKA_INGEST_EVENTS, + "topic": settings.KAFKA_INGEST_EVENTS, "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -260,8 +247,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-attachments": { - "topic": Topic.INGEST_ATTACHMENTS, - "topic_override": settings.KAFKA_INGEST_ATTACHMENTS, + "topic": settings.KAFKA_INGEST_ATTACHMENTS, "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -269,8 +255,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-transactions": { - "topic": Topic.INGEST_TRANSACTIONS, - "topic_override": settings.KAFKA_INGEST_TRANSACTIONS, + "topic": settings.KAFKA_INGEST_TRANSACTIONS, "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), "static_args": { @@ -278,8 +263,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "ingest-metrics": { - "topic": Topic.INGEST_METRICS, - "topic_override": settings.KAFKA_INGEST_METRICS, + "topic": settings.KAFKA_INGEST_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", "click_options": _METRICS_INDEXER_OPTIONS, "static_args": { @@ -290,8 +274,7 @@ def ingest_monitors_options() -> list[click.Option]: "dlq_max_consecutive_count": 1000, }, "ingest-generic-metrics": { - "topic": Topic.INGEST_PERFORMANCE_METRICS, - "topic_override": settings.KAFKA_INGEST_PERFORMANCE_METRICS, + "topic": settings.KAFKA_INGEST_PERFORMANCE_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", "click_options": _METRICS_INDEXER_OPTIONS, "static_args": { @@ -302,8 +285,7 @@ def ingest_monitors_options() -> list[click.Option]: "dlq_max_consecutive_count": 1000, }, "generic-metrics-last-seen-updater": { - "topic": Topic.SNUBA_GENERIC_METRICS, - "topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS, + "topic": settings.KAFKA_SNUBA_GENERIC_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory", "click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS, "static_args": { @@ -311,8 +293,7 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "metrics-last-seen-updater": { - "topic": Topic.SNUBA_METRICS, - "topic_override": settings.KAFKA_SNUBA_METRICS, + "topic": settings.KAFKA_SNUBA_METRICS, "strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory", "click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS, "static_args": { @@ -320,32 +301,28 @@ def ingest_monitors_options() -> list[click.Option]: }, }, "post-process-forwarder-issue-platform": { - "topic": Topic.EVENTSTREAM_GENERIC, - "topic_override": settings.KAFKA_EVENTSTREAM_GENERIC, + "topic": settings.KAFKA_EVENTSTREAM_GENERIC, "strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", "synchronize_commit_log_topic_default": "snuba-generic-events-commit-log", "synchronize_commit_group_default": "generic_events_group", "click_options": _POST_PROCESS_FORWARDER_OPTIONS, }, "post-process-forwarder-transactions": { - "topic": Topic.TRANSACTIONS, - "topic_override": settings.KAFKA_TRANSACTIONS, + "topic": settings.KAFKA_TRANSACTIONS, "strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", "synchronize_commit_log_topic_default": "snuba-transactions-commit-log", "synchronize_commit_group_default": "transactions_group", "click_options": _POST_PROCESS_FORWARDER_OPTIONS, }, "post-process-forwarder-errors": { - "topic": Topic.EVENTS, - "topic_override": settings.KAFKA_EVENTS, + "topic": settings.KAFKA_EVENTS, "strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", "synchronize_commit_log_topic_default": "snuba-commit-log", "synchronize_commit_group_default": "snuba-consumers", "click_options": _POST_PROCESS_FORWARDER_OPTIONS, }, "process-spans": { - "topic": Topic.SNUBA_SPANS, - "topic_override": settings.KAFKA_SNUBA_SPANS, + "topic": settings.KAFKA_SNUBA_SPANS, "strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", }, # TODO: This has to be co-ordinated with getsentry From b1eb8756b90b6ec0da247409bf2dc0c48b7fed08 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 09:42:17 -0800 Subject: [PATCH 21/42] fix type --- src/sentry/conf/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 4b829bb2da8f67..4c6165ddcd1007 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3477,7 +3477,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # Mapping of default Kafka topic name to cluster name. -KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, str]] = { +KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = { "events": "default", "snuba-commit-log": "default", "transactions": "default", From b428a8a569a1fbd45b2e82fe4b1d3c5ad19c09ce Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 09:43:43 -0800 Subject: [PATCH 22/42] remove stray topic_override --- src/sentry/consumers/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 1897afc618c9b4..0c1854d670d909 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -202,7 +202,7 @@ def ingest_events_options() -> list[click.Option]: "click_options": ingest_replay_recordings_buffered_options(), }, "ingest-monitors": { - "topic_override": settings.KAFKA_INGEST_MONITORS, + "topic": settings.KAFKA_INGEST_MONITORS, "strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory", "click_options": ingest_monitors_options(), }, @@ -346,7 +346,6 @@ def ingest_events_options() -> list[click.Option]: "topic": settings.KAFKA_SNUBA_SPANS, "strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", }, - # TODO: This has to be co-ordinated with getsentry **settings.SENTRY_KAFKA_CONSUMERS, } From 1b219a73cb5d55124b663784ec5c9d44bf30ede0 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 09:45:50 -0800 Subject: [PATCH 23/42] check cluster name exists --- tests/sentry/conf/test_topic_definition.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/sentry/conf/test_topic_definition.py b/tests/sentry/conf/test_topic_definition.py index 3f83f8bf352ac8..7db18f1f90db08 100644 --- a/tests/sentry/conf/test_topic_definition.py +++ b/tests/sentry/conf/test_topic_definition.py @@ -6,5 +6,6 @@ def test_topic_definition() -> None: for topic in Topic: - assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER + cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value] + assert cluster_name in settings.KAFKA_CLUSTERS assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) From 2c41229f1f4fa70ee2eb537f223ec8f2526bac2f Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 09:56:39 -0800 Subject: [PATCH 24/42] handle generic metrics subscriptions --- src/sentry/conf/server.py | 4 +++- src/sentry/consumers/__init__.py | 14 ++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 4c6165ddcd1007..01c1621faf3300 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3473,7 +3473,9 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # Mapping of default Kafka topic name to custom names -KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {} +KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = { + "generic-metrics-subscription-results": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS +} # Mapping of default Kafka topic name to cluster name. diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 0c1854d670d909..7f9fdaef288848 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -14,6 +14,7 @@ from django.conf import settings from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition +from sentry.conf.types.topic_definition import Topic from sentry.consumers.validate_schema import ValidateSchema from sentry.utils.imports import import_string from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -235,7 +236,7 @@ def ingest_events_options() -> list[click.Option]: }, }, "generic-metrics-subscription-results": { - "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, + "topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, "validate_schema": True, "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), @@ -482,10 +483,15 @@ def build_consumer_config(group_id: str): ) # Validate schema if "validate_schema" is set - default_topic = consumer_definition.get("validate_schema") - if default_topic: + validate_schema = consumer_definition.get("validate_schema") + + # TODO: Remove this later but for now we can only validate if `topic` is + # the logical topic and not the legacy override topic + assert isinstance(topic, Topic) + + if validate_schema: strategy_factory = ValidateSchemaStrategyFactoryWrapper( - default_topic, validate_schema, strategy_factory + topic.value, validate_schema, strategy_factory ) if healthcheck_file_path is not None: From c6d7aa988a439d6ac2f5d17869d1c25cc441b377 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 11:20:09 -0800 Subject: [PATCH 25/42] update consumer --- src/sentry/consumers/__init__.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 7f9fdaef288848..e36fa17d73eba5 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -393,12 +393,17 @@ def get_stream_processor( ) from e strategy_factory_cls = import_string(consumer_definition["strategy_factory"]) - logical_topic = consumer_definition["topic"] - if not isinstance(logical_topic, str): - logical_topic = logical_topic() + topic_def = consumer_definition["topic"] + if isinstance(topic_def, Topic): + default_topic = topic_def.value + real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic) + else: + # TODO: Deprecated, remove once this way is no longer used + if not isinstance(topic_def, str): + real_topic = topic_def() if topic is None: - topic = logical_topic + topic = real_topic cmd = click.Command( name=consumer_name, params=list(consumer_definition.get("click_options") or ()) @@ -416,9 +421,9 @@ def get_stream_processor( from sentry.utils import kafka_config - # Still supported for now until we have moved everything in prod over - topic_def = settings.KAFKA_TOPICS[logical_topic] + topic_def = settings.KAFKA_TOPICS[real_topic] assert topic_def is not None + if cluster is None: cluster = topic_def["cluster"] @@ -485,11 +490,11 @@ def build_consumer_config(group_id: str): # Validate schema if "validate_schema" is set validate_schema = consumer_definition.get("validate_schema") - # TODO: Remove this later but for now we can only validate if `topic` is - # the logical topic and not the legacy override topic - assert isinstance(topic, Topic) - if validate_schema: + # TODO: Remove this later but for now we can only validate if `topic` is + # the logical topic and not the legacy override topic + assert isinstance(topic, Topic) + strategy_factory = ValidateSchemaStrategyFactoryWrapper( topic.value, validate_schema, strategy_factory ) From 2421da9e82a9f2582b5383e883549ca6d918cb49 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 11:23:27 -0800 Subject: [PATCH 26/42] comment --- src/sentry/conf/server.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 01c1621faf3300..46311fd6775acb 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3474,6 +3474,9 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # Mapping of default Kafka topic name to custom names KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = { + # TODO: This is temporary while we migrate between the old and new way of defining overrides. + # To be removed once this is defined in prod, along with KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS + # variable which will no longer be needed "generic-metrics-subscription-results": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS } From 7014e8860169bcc0668914c19e578dc7558efc55 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 11:25:44 -0800 Subject: [PATCH 27/42] update test --- tests/sentry/conf/test_topic_definition.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/sentry/conf/test_topic_definition.py b/tests/sentry/conf/test_topic_definition.py index 7db18f1f90db08..683ca37cdc03bb 100644 --- a/tests/sentry/conf/test_topic_definition.py +++ b/tests/sentry/conf/test_topic_definition.py @@ -1,4 +1,3 @@ -# Every topic is mapped to a cluster from django.conf import settings from sentry.conf.types.topic_definition import Topic @@ -7,5 +6,7 @@ def test_topic_definition() -> None: for topic in Topic: cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value] - assert cluster_name in settings.KAFKA_CLUSTERS + assert ( + cluster_name in settings.KAFKA_CLUSTERS + ), f"{cluster_name} is not defined in KAFKA_CLUSTERS" assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) From 6430eb5a8b9a32fc08d7b837ae5f954fda39c667 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 11:36:28 -0800 Subject: [PATCH 28/42] fix variable naming --- src/sentry/consumers/__init__.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index e36fa17d73eba5..a17f1846cc1015 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -393,14 +393,14 @@ def get_stream_processor( ) from e strategy_factory_cls = import_string(consumer_definition["strategy_factory"]) - topic_def = consumer_definition["topic"] - if isinstance(topic_def, Topic): - default_topic = topic_def.value + consumer_topic = consumer_definition["topic"] + if isinstance(consumer_topic, Topic): + default_topic = consumer_topic.value real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic) else: # TODO: Deprecated, remove once this way is no longer used - if not isinstance(topic_def, str): - real_topic = topic_def() + if not isinstance(consumer_topic, str): + real_topic = consumer_topic() if topic is None: topic = real_topic @@ -491,12 +491,12 @@ def build_consumer_config(group_id: str): validate_schema = consumer_definition.get("validate_schema") if validate_schema: - # TODO: Remove this later but for now we can only validate if `topic` is + # TODO: Remove this later but for now we can only validate if `topic_def` is # the logical topic and not the legacy override topic - assert isinstance(topic, Topic) + assert isinstance(consumer_topic, Topic) strategy_factory = ValidateSchemaStrategyFactoryWrapper( - topic.value, validate_schema, strategy_factory + consumer_topic.value, validate_schema, strategy_factory ) if healthcheck_file_path is not None: From 2888ea32e17eec5be0a01893082fc5adb2c9efea Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 11:39:55 -0800 Subject: [PATCH 29/42] fix import naming --- src/sentry/consumers/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index a17f1846cc1015..3a1f8431785029 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -416,7 +416,7 @@ def get_stream_processor( from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration from arroyo.backends.kafka.consumer import KafkaConsumer from arroyo.commit import ONCE_PER_SECOND - from arroyo.types import Topic + from arroyo.types import Topic as ArroyoTopic from django.conf import settings from sentry.utils import kafka_config @@ -479,7 +479,7 @@ def build_consumer_config(group_id: str): consumer = SynchronizedConsumer( consumer=consumer, commit_log_consumer=commit_log_consumer, - commit_log_topic=Topic(synchronize_commit_log_topic), + commit_log_topic=ArroyoTopic(synchronize_commit_log_topic), commit_log_groups={synchronize_commit_group}, ) elif consumer_definition.get("require_synchronization"): @@ -488,7 +488,7 @@ def build_consumer_config(group_id: str): ) # Validate schema if "validate_schema" is set - validate_schema = consumer_definition.get("validate_schema") + validate_schema = consumer_definition.get("validate_schema", False) if validate_schema: # TODO: Remove this later but for now we can only validate if `topic_def` is @@ -522,7 +522,7 @@ def build_consumer_config(group_id: str): dlq_producer = KafkaProducer(producer_config) dlq_policy = DlqPolicy( - KafkaDlqProducer(dlq_producer, Topic(dlq_topic)), + KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)), DlqLimit( max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"], max_consecutive_count=consumer_definition["dlq_max_consecutive_count"], @@ -534,7 +534,7 @@ def build_consumer_config(group_id: str): return StreamProcessor( consumer=consumer, - topic=Topic(topic), + topic=ArroyoTopic(topic), processor_factory=strategy_factory, commit_policy=ONCE_PER_SECOND, join_timeout=join_timeout, From 37e066e210841566a2d3da9f0de1445a18446243 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 11:41:24 -0800 Subject: [PATCH 30/42] fix type for real --- src/sentry/consumers/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 3a1f8431785029..188112e67cd984 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -488,7 +488,7 @@ def build_consumer_config(group_id: str): ) # Validate schema if "validate_schema" is set - validate_schema = consumer_definition.get("validate_schema", False) + validate_schema = consumer_definition.get("validate_schema") or False if validate_schema: # TODO: Remove this later but for now we can only validate if `topic_def` is From f19d92de69cfbd88fa6a7d1d6433ef5b719fa346 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 13:38:50 -0800 Subject: [PATCH 31/42] put kafka stuff in a single file to work around import restriction --- src/sentry/conf/server.py | 2 +- src/sentry/conf/types/consumer_definition.py | 47 ----------- src/sentry/conf/types/kafka_definition.py | 79 +++++++++++++++++++ src/sentry/conf/types/topic_definition.py | 34 -------- src/sentry/consumers/__init__.py | 2 +- .../sentry/conf/test_consumer_definitions.py | 2 +- tests/sentry/consumers/test_run.py | 2 +- 7 files changed, 83 insertions(+), 85 deletions(-) delete mode 100644 src/sentry/conf/types/consumer_definition.py create mode 100644 src/sentry/conf/types/kafka_definition.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 46311fd6775acb..e67c44d07fdb2f 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -17,7 +17,7 @@ from urllib.parse import urlparse import sentry -from sentry.conf.types.consumer_definition import ConsumerDefinition +from sentry.conf.types.kafka_definition import ConsumerDefinition from sentry.conf.types.logging_config import LoggingConfig from sentry.conf.types.role_dict import RoleDict from sentry.conf.types.sdk_config import ServerSdkConfig diff --git a/src/sentry/conf/types/consumer_definition.py b/src/sentry/conf/types/consumer_definition.py deleted file mode 100644 index 7e62b3dee8c5a6..00000000000000 --- a/src/sentry/conf/types/consumer_definition.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Mapping, Sequence -from typing import Any, Required, TypedDict - -import click - -from sentry.conf.types.topic_definition import Topic - - -class ConsumerDefinition(TypedDict, total=False): - - # XXX: Eventually only Topic will be accepted here. - # For backward compatibility with getsentry, we must also - # support the physical override topic name (str, Callable[str], str) - # while the migration is taking place - topic: Required[Topic | str | Callable[[], str]] - - # Schema validation will be run if true - validate_schema: bool | None - - strategy_factory: Required[str] - - # Additional CLI options the consumer should accept. These arguments are - # passed as kwargs to the strategy_factory. - click_options: Sequence[click.Option] - - # Hardcoded additional kwargs for strategy_factory - static_args: Mapping[str, Any] - - require_synchronization: bool - synchronize_commit_group_default: str - synchronize_commit_log_topic_default: str - - dlq_topic: str - dlq_max_invalid_ratio: float | None - dlq_max_consecutive_count: int | None - - -def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None: - if "dlq_topic" not in consumer_definition and ( - "dlq_max_invalid_ratio" in consumer_definition - or "dlq_max_consecutive_count" in consumer_definition - ): - raise ValueError( - "Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not" - ) diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py new file mode 100644 index 00000000000000..9b3dedde685109 --- /dev/null +++ b/src/sentry/conf/types/kafka_definition.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from collections.abc import Callable, Mapping, Sequence +from enum import Enum +from typing import Any, Required, TypedDict + +import click + + +class Topic(Enum): + EVENTS = "events" + EVENTS_COMMIT_LOG = "snuba-commit-log" + TRANSACTIONS = "transactions" + TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log" + OUTCOMES = "outcomes" + OUTCOMES_BILLING = "outcomes-billing" + EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results" + TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results" + GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results" + SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" + METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" + INGEST_EVENTS = "ingest-events" + INGEST_ATTACHMENTS = "ingest-attachments" + INGEST_TRANSACTIONS = "ingest-transactions" + INGEST_METRICS = "ingest-metrics" + INGEST_METRICS_DLQ = "ingest-metrics-dlq" + SNUBA_METRICS = "snuba-metrics" + PROFILES = "profiles" + INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" + INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" + SNUBA_GENERIC_METRICS = "snuba-generic-metrics" + INGEST_REPLAY_EVENTS = "ingest-replay-events" + INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" + INGEST_OCCURRENCES = "ingest-occurrences" + INGEST_MONITORS = "ingest-monitors" + EVENTSTREAM_GENERIC = "generic-events" + GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log" + GROUP_ATTRIBUTES = "group-attributes" + SHARED_RESOURCES_USAGE = "shared-resources-usage" + SNUBA_SPANS = "snuba-spans" + + +class ConsumerDefinition(TypedDict, total=False): + + # XXX: Eventually only Topic will be accepted here. + # For backward compatibility with getsentry, we must also + # support the physical override topic name (str, Callable[str], str) + # while the migration is taking place + topic: Required[Topic | str | Callable[[], str]] + + # Schema validation will be run if true + validate_schema: bool | None + + strategy_factory: Required[str] + + # Additional CLI options the consumer should accept. These arguments are + # passed as kwargs to the strategy_factory. + click_options: Sequence[click.Option] + + # Hardcoded additional kwargs for strategy_factory + static_args: Mapping[str, Any] + + require_synchronization: bool + synchronize_commit_group_default: str + synchronize_commit_log_topic_default: str + + dlq_topic: str + dlq_max_invalid_ratio: float | None + dlq_max_consecutive_count: int | None + + +def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None: + if "dlq_topic" not in consumer_definition and ( + "dlq_max_invalid_ratio" in consumer_definition + or "dlq_max_consecutive_count" in consumer_definition + ): + raise ValueError( + "Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not" + ) diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index 57d1c1e2cffff1..558cc0fb9d61fa 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -1,39 +1,5 @@ -from enum import Enum from typing import TypedDict -class Topic(Enum): - EVENTS = "events" - EVENTS_COMMIT_LOG = "snuba-commit-log" - TRANSACTIONS = "transactions" - TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log" - OUTCOMES = "outcomes" - OUTCOMES_BILLING = "outcomes-billing" - EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results" - TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results" - GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results" - SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" - METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" - INGEST_EVENTS = "ingest-events" - INGEST_ATTACHMENTS = "ingest-attachments" - INGEST_TRANSACTIONS = "ingest-transactions" - INGEST_METRICS = "ingest-metrics" - INGEST_METRICS_DLQ = "ingest-metrics-dlq" - SNUBA_METRICS = "snuba-metrics" - PROFILES = "profiles" - INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" - INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" - SNUBA_GENERIC_METRICS = "snuba-generic-metrics" - INGEST_REPLAY_EVENTS = "ingest-replay-events" - INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" - INGEST_OCCURRENCES = "ingest-occurrences" - INGEST_MONITORS = "ingest-monitors" - EVENTSTREAM_GENERIC = "generic-events" - GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log" - GROUP_ATTRIBUTES = "group-attributes" - SHARED_RESOURCES_USAGE = "shared-resources-usage" - SNUBA_SPANS = "snuba-spans" - - class TopicDefinition(TypedDict): cluster: str diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 188112e67cd984..29adf66befacaf 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -13,7 +13,7 @@ from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from django.conf import settings -from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition +from sentry.conf.types.kafka_definition import ConsumerDefinition, validate_consumer_definition from sentry.conf.types.topic_definition import Topic from sentry.consumers.validate_schema import ValidateSchema from sentry.utils.imports import import_string diff --git a/tests/sentry/conf/test_consumer_definitions.py b/tests/sentry/conf/test_consumer_definitions.py index c2e0771df7f0bf..c8ac0ace8ef40a 100644 --- a/tests/sentry/conf/test_consumer_definitions.py +++ b/tests/sentry/conf/test_consumer_definitions.py @@ -1,6 +1,6 @@ import pytest -from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition +from sentry.conf.types.kafka_definition import ConsumerDefinition, validate_consumer_definition from sentry.consumers import KAFKA_CONSUMERS from sentry.testutils.cases import TestCase diff --git a/tests/sentry/consumers/test_run.py b/tests/sentry/consumers/test_run.py index a20c578480b786..5ddebf9e5c01c0 100644 --- a/tests/sentry/consumers/test_run.py +++ b/tests/sentry/consumers/test_run.py @@ -2,7 +2,7 @@ from arroyo.processing.strategies.abstract import ProcessingStrategyFactory from sentry import consumers -from sentry.conf.types.consumer_definition import ConsumerDefinition +from sentry.conf.types.kafka_definition import ConsumerDefinition from sentry.utils.imports import import_string From 99e16df30313ca711c1d4c25b64be39eb4ab7eb0 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 13:55:46 -0800 Subject: [PATCH 32/42] fix more imports --- src/sentry/consumers/__init__.py | 7 +++++-- src/sentry/runner/commands/devserver.py | 2 +- ..._definitions.py => test_kafka_definitions.py} | 16 +++++++++++++++- tests/sentry/conf/test_topic_definition.py | 12 ------------ 4 files changed, 21 insertions(+), 16 deletions(-) rename tests/sentry/conf/{test_consumer_definitions.py => test_kafka_definitions.py} (67%) delete mode 100644 tests/sentry/conf/test_topic_definition.py diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 29adf66befacaf..d938555b7b4353 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -13,8 +13,11 @@ from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from django.conf import settings -from sentry.conf.types.kafka_definition import ConsumerDefinition, validate_consumer_definition -from sentry.conf.types.topic_definition import Topic +from sentry.conf.types.kafka_definition import ( + ConsumerDefinition, + Topic, + validate_consumer_definition, +) from sentry.consumers.validate_schema import ValidateSchema from sentry.utils.imports import import_string from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 84d6810d29ccb9..038a66bfa72238 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -364,7 +364,7 @@ def devserver( """ ) - from sentry.conf.types.topic_definition import Topic + from sentry.conf.types.kafka_definition import Topic from sentry.utils.batching_kafka_consumer import create_topics for topic in Topic: diff --git a/tests/sentry/conf/test_consumer_definitions.py b/tests/sentry/conf/test_kafka_definitions.py similarity index 67% rename from tests/sentry/conf/test_consumer_definitions.py rename to tests/sentry/conf/test_kafka_definitions.py index c8ac0ace8ef40a..ff98fc514294ed 100644 --- a/tests/sentry/conf/test_consumer_definitions.py +++ b/tests/sentry/conf/test_kafka_definitions.py @@ -1,10 +1,24 @@ import pytest +from django.conf import settings -from sentry.conf.types.kafka_definition import ConsumerDefinition, validate_consumer_definition +from sentry.conf.types.kafka_definition import ( + ConsumerDefinition, + Topic, + validate_consumer_definition, +) from sentry.consumers import KAFKA_CONSUMERS from sentry.testutils.cases import TestCase +def test_topic_definition() -> None: + for topic in Topic: + cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value] + assert ( + cluster_name in settings.KAFKA_CLUSTERS + ), f"{cluster_name} is not defined in KAFKA_CLUSTERS" + assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) + + class ConsumersDefinitionTest(TestCase): def test_exception_on_invalid_consumer_definition(self): invalid_definitions: list[ConsumerDefinition] = [ diff --git a/tests/sentry/conf/test_topic_definition.py b/tests/sentry/conf/test_topic_definition.py deleted file mode 100644 index 683ca37cdc03bb..00000000000000 --- a/tests/sentry/conf/test_topic_definition.py +++ /dev/null @@ -1,12 +0,0 @@ -from django.conf import settings - -from sentry.conf.types.topic_definition import Topic - - -def test_topic_definition() -> None: - for topic in Topic: - cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value] - assert ( - cluster_name in settings.KAFKA_CLUSTERS - ), f"{cluster_name} is not defined in KAFKA_CLUSTERS" - assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) From 1668a6ce50171413c6aa529e982cca458d7eb36f Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 14:05:17 -0800 Subject: [PATCH 33/42] add more comments --- src/sentry/conf/server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 8151263a7fe52a..2124d2b8fe9280 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3483,7 +3483,11 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] } -# Mapping of default Kafka topic name to cluster name. +# Mapping of default Kafka topic name to cluster name +# as per KAFKA_CLUSTERS. +# This must be the default name that matches the topic +# in sentry.conf.types.kafka_definition and sentry-kafka-schemas +# and not any environment-specific override value KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = { "events": "default", "snuba-commit-log": "default", From 71a444cfec43822e1262e377ccbc3a918d3e64a9 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 14:44:38 -0800 Subject: [PATCH 34/42] actually define real_topic --- src/sentry/consumers/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index d938555b7b4353..400311869c8082 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -404,6 +404,8 @@ def get_stream_processor( # TODO: Deprecated, remove once this way is no longer used if not isinstance(consumer_topic, str): real_topic = consumer_topic() + else: + real_topic = consumer_topic if topic is None: topic = real_topic From 9b92df94a6b80255ac90f8209e1e58f5535f99dc Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 15:20:38 -0800 Subject: [PATCH 35/42] update test --- tests/sentry/consumers/test_run.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/sentry/consumers/test_run.py b/tests/sentry/consumers/test_run.py index 5ddebf9e5c01c0..7b1e6236d26375 100644 --- a/tests/sentry/consumers/test_run.py +++ b/tests/sentry/consumers/test_run.py @@ -2,7 +2,7 @@ from arroyo.processing.strategies.abstract import ProcessingStrategyFactory from sentry import consumers -from sentry.conf.types.kafka_definition import ConsumerDefinition +from sentry.conf.types.kafka_definition import ConsumerDefinition, Topic from sentry.utils.imports import import_string @@ -16,4 +16,9 @@ def test_all_importable(consumer_def, settings): assert issubclass(factory, ProcessingStrategyFactory) topic = defn["topic"] - assert topic is None or topic in settings.KAFKA_TOPICS + if isinstance(topic, Topic): + assert topic.value in settings.KAFKA_TOPIC_TO_CLUSTER + else: + # TODO: Legacy way, will be deprecated once all consumer definitions + # are migrated + assert topic in settings.KAFKA_TOPICS From b4e867ad03fab6dfe99ec7f89c63a681d7c604a2 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 29 Feb 2024 16:46:56 -0800 Subject: [PATCH 36/42] add new ingest-events-dlq topic --- src/sentry/conf/server.py | 1 + src/sentry/conf/types/kafka_definition.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 8972465e6cf75e..613f2e1020345b 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3491,6 +3491,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # and not any environment-specific override value KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = { "events": "default", + "ingest-event-dlq": "default", "snuba-commit-log": "default", "transactions": "default", "snuba-transactions-commit-log": "default", diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 9b3dedde685109..e078c8d2fbbe65 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -20,6 +20,7 @@ class Topic(Enum): SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" INGEST_EVENTS = "ingest-events" + INGEST_EVENTS_DLQ = "ingest-events-dlq" INGEST_ATTACHMENTS = "ingest-attachments" INGEST_TRANSACTIONS = "ingest-transactions" INGEST_METRICS = "ingest-metrics" From b8e31eb7aeddf9dd09a10bdac9c1c2e1735c5147 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 1 Mar 2024 13:00:00 -0800 Subject: [PATCH 37/42] fix typo, add test --- src/sentry/conf/server.py | 2 +- ...efinitions.py => test_kafka_definition.py} | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) rename tests/sentry/conf/{test_kafka_definitions.py => test_kafka_definition.py} (69%) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 613f2e1020345b..cbde90e04430a2 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3491,7 +3491,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] # and not any environment-specific override value KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = { "events": "default", - "ingest-event-dlq": "default", + "ingest-events-dlq": "default", "snuba-commit-log": "default", "transactions": "default", "snuba-transactions-commit-log": "default", diff --git a/tests/sentry/conf/test_kafka_definitions.py b/tests/sentry/conf/test_kafka_definition.py similarity index 69% rename from tests/sentry/conf/test_kafka_definitions.py rename to tests/sentry/conf/test_kafka_definition.py index ff98fc514294ed..e181d80713e41c 100644 --- a/tests/sentry/conf/test_kafka_definitions.py +++ b/tests/sentry/conf/test_kafka_definition.py @@ -1,4 +1,5 @@ import pytest +import sentry_kafka_schemas from django.conf import settings from sentry.conf.types.kafka_definition import ( @@ -11,6 +12,25 @@ def test_topic_definition() -> None: + # All topic are registered + # TODO: Remove this once these topics are actually registered in sentry-kafka-schemas + currently_unregistered_topics = [ + "outcomes-billing", + "ingest-events", + "ingest-events-dlq", + "ingest-attachments", + "ingest-transactions", + "ingest-metrics-dlq", + "profiles", + "ingest-generic-metrics-dlq", + "ingest-occurrences", + "ingest-monitors", + ] + + for topic in Topic: + if topic.value not in currently_unregistered_topics: + assert sentry_kafka_schemas.get_topic(topic.value) is not None + for topic in Topic: cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value] assert ( From 0daa5100d1fb39737171213d2ce3397ede11d3a9 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 1 Mar 2024 13:16:00 -0800 Subject: [PATCH 38/42] add docstring --- src/sentry/conf/types/kafka_definition.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index e078c8d2fbbe65..61820572647de8 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -8,6 +8,11 @@ class Topic(Enum): + """ + These are the default topic names used by Sentry. They must match + the registered values in sentry-kafka-schemas. + """ + EVENTS = "events" EVENTS_COMMIT_LOG = "snuba-commit-log" TRANSACTIONS = "transactions" From 297db518ec7ea452a3afd4de87468a7020340798 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 1 Mar 2024 13:22:15 -0800 Subject: [PATCH 39/42] don't touch file unnecessarily --- src/sentry/conf/types/topic_definition.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sentry/conf/types/topic_definition.py b/src/sentry/conf/types/topic_definition.py index 558cc0fb9d61fa..41992b74d9ad78 100644 --- a/src/sentry/conf/types/topic_definition.py +++ b/src/sentry/conf/types/topic_definition.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import TypedDict From ffbb1229c55b50683e24ca34f6410d3d3a3ccd72 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 1 Mar 2024 13:28:51 -0800 Subject: [PATCH 40/42] add test --- tests/sentry/conf/test_kafka_definition.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/sentry/conf/test_kafka_definition.py b/tests/sentry/conf/test_kafka_definition.py index e181d80713e41c..548e1b8742cb83 100644 --- a/tests/sentry/conf/test_kafka_definition.py +++ b/tests/sentry/conf/test_kafka_definition.py @@ -36,6 +36,11 @@ def test_topic_definition() -> None: assert ( cluster_name in settings.KAFKA_CLUSTERS ), f"{cluster_name} is not defined in KAFKA_CLUSTERS" + + for topic in settings.KAFKA_TOPIC_OVERRIDES: + # Ensure all override topics aree in the enum + Topic(topic) + assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER) From 59f93e9a19ecfac5d13a1517951a84ac72fcb3bc Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 1 Mar 2024 14:12:28 -0800 Subject: [PATCH 41/42] yikes --- src/sentry/conf/server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index cbde90e04430a2..9973078d045ba7 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3519,9 +3519,8 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] "generic-events": "default", "snuba-generic-events-commit-log": "default", "group-attributes": "default", - "shared-resources-usage": "default", - "events": "default", "snuba-spans": "default", + "shared-resources-usage": "default", } # Cluster configuration for each Kafka topic by name. From 1a4f8576348ed3d064e54f76cd8545de13fa14b5 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 1 Mar 2024 14:21:25 -0800 Subject: [PATCH 42/42] type checker --- tests/sentry/conf/test_kafka_definition.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/sentry/conf/test_kafka_definition.py b/tests/sentry/conf/test_kafka_definition.py index 548e1b8742cb83..e1cbebe8c2e759 100644 --- a/tests/sentry/conf/test_kafka_definition.py +++ b/tests/sentry/conf/test_kafka_definition.py @@ -37,9 +37,9 @@ def test_topic_definition() -> None: cluster_name in settings.KAFKA_CLUSTERS ), f"{cluster_name} is not defined in KAFKA_CLUSTERS" - for topic in settings.KAFKA_TOPIC_OVERRIDES: - # Ensure all override topics aree in the enum - Topic(topic) + for default_topic in settings.KAFKA_TOPIC_OVERRIDES: + # Ensure all override topics are in the enum + Topic(default_topic) assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER)