New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref: Rationalize Kafka topic config #65793
Changes from 8 commits
40bf484
112473d
05cfb0a
977df66
8025712
3a510f2
348c290
9f7206e
e7f1e9a
7bb9d4f
570c23d
5635bee
4880de4
334b83d
46d0259
4f82e7e
855805d
12f1f90
5307df7
d6f8cda
0796771
679da09
b8cd48b
9f7bb57
b1eb875
b428a8a
1b219a7
2c41229
c6d7aa9
2421da9
7014e88
6430eb5
2888ea3
37e066e
f19d92d
5bfbbcf
99e16df
1668a6c
71a444c
9b92df9
95c3cce
b4e867a
b8e31eb
0daa510
297db51
ffbb122
59f93e9
1a4f857
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3458,16 +3458,17 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] | |
# spans | ||
KAFKA_SNUBA_SPANS = "snuba-spans" | ||
|
||
KAFKA_SUBSCRIPTION_RESULT_TOPICS = { | ||
"events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, | ||
"transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, | ||
"generic-metrics": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"sessions": KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, | ||
"metrics": KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, | ||
# Mapping of default Kafka topic name to custom names | ||
KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {} | ||
|
||
# Mapping of default Kafka topic name to broker config | ||
KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition | None]] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In snuba these are called Any chance we can name them in the same way? Even if this naming is more appropriate than the Snuba one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I intentionally named it differently because it is not the same as Snuba at all. Here we map the topic to a cluster definition. Snuba does not go through any cluster definition and all the topic configs are mapped to the broker config. I think we might want to align them in the future but I'm not yet sure which way is better, and I want to avoid making this huge change in this PR right now. |
||
{"cluster": "default"} | ||
} | ||
|
||
|
||
# Cluster configuration for each Kafka topic by name. | ||
# Deprecated | ||
KAFKA_TOPICS: Mapping[str, TopicDefinition | None] = { | ||
KAFKA_EVENTS: {"cluster": "default"}, | ||
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"}, | ||
|
@@ -3510,9 +3511,6 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] | |
KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"}, | ||
} | ||
|
||
|
||
# If True, consumers will create the topics if they don't exist | ||
KAFKA_CONSUMER_AUTO_CREATE_TOPICS = True | ||
fpacifici marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# If True, sentry.utils.arroyo.RunTaskWithMultiprocessing will actually be | ||
# single-threaded under the hood for performance | ||
KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING = False | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,11 +5,17 @@ | |
|
||
import click | ||
|
||
from sentry.conf.types.topic_definition import Topic | ||
|
||
|
||
class ConsumerDefinition(TypedDict, total=False): | ||
# Which logical topic from settings to use. | ||
topic: Required[str | Callable[[], str]] | ||
default_topic: str | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
topic: Topic | ||
|
||
# Override topic. To be deprecated | ||
topic_override: Required[str | Callable[[], str]] | ||
|
||
validate_schema: str | ||
lynnagara marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
strategy_factory: Required[str] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,39 @@ | ||
from __future__ import annotations | ||
|
||
from enum import Enum | ||
from typing import TypedDict | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we call this kafka_topic_definition ? |
||
|
||
|
||
class Topic(Enum): | ||
EVENTS = "events" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice I like this. |
||
EVENTS_COMMIT_LOG = "snuba-commit-log" | ||
TRANSACTIONS = "transactions" | ||
TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log" | ||
OUTCOMES = "outcomes" | ||
OUTCOMES_BILLING = "outcomes-billing" | ||
EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results" | ||
TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results" | ||
GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results" | ||
SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" | ||
METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" | ||
INGEST_EVENTS = "ingest-events" | ||
INGEST_ATTACHMENTS = "ingest-attachments" | ||
INGEST_TRANSACTIONS = "ingest-transactions" | ||
INGEST_METRICS = "ingest-metrics" | ||
INGEST_METRICS_DLQ = "ingest-metrics-dlq" | ||
SNUBA_METRICS = "snuba-metrics" | ||
PROFILES = "profiles" | ||
INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" | ||
INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" | ||
SNUBA_GENERIC_METRICS = "snuba-generic-metrics" | ||
INGEST_REPLAY_EVENTS = "ingest-replay-events" | ||
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" | ||
INGEST_OCCURRENCES = "ingest-occurrences" | ||
INGEST_MONITORS = "ingest-monitors" | ||
EVENTSTREAM_GENERIC = "generic-events" | ||
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log" | ||
GROUP_ATTRIBUTES = "group-attributes" | ||
SHARED_RESOURCES_USAGE = "shared-resources-usage" | ||
SNUBA_SPANS = "snuba-spans" | ||
|
||
|
||
class TopicDefinition(TypedDict): | ||
cluster: str |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
from django.conf import settings | ||
|
||
from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition | ||
from sentry.conf.types.topic_definition import Topic | ||
from sentry.consumers.validate_schema import ValidateSchema | ||
from sentry.utils.imports import import_string | ||
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition | ||
|
@@ -163,109 +164,122 @@ def ingest_monitors_options() -> list[click.Option]: | |
] | ||
|
||
# consumer name -> consumer definition | ||
# XXX: default_topic is needed to lookup the schema even if the actual topic name has been | ||
# overridden. This is because the current topic override mechanism means the default topic name | ||
# is no longer available anywhere in code. We should probably fix this later so we don't need both | ||
# "topic" and "default_topic" here though. | ||
# XXX: topic_override will be deprecated here in favor of KAFKA_TOPIC_OVERRIDE once all overrides in | ||
# prod have been migrated | ||
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = { | ||
"ingest-profiles": { | ||
"topic": settings.KAFKA_PROFILES, | ||
"topic": Topic.PROFILES, | ||
"topic_override": settings.KAFKA_PROFILES, | ||
lynnagara marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"strategy_factory": "sentry.profiles.consumers.process.factory.ProcessProfileStrategyFactory", | ||
}, | ||
"ingest-replay-recordings": { | ||
"topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, | ||
"topic": Topic.EVENTS, | ||
"topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, | ||
"strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory", | ||
"click_options": ingest_replay_recordings_options(), | ||
}, | ||
"ingest-replay-recordings-buffered": { | ||
"topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, | ||
"topic": Topic.INGEST_REPLAYS_RECORDINGS, | ||
"topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS, | ||
"strategy_factory": "sentry.replays.consumers.recording_buffered.RecordingBufferedStrategyFactory", | ||
"click_options": ingest_replay_recordings_buffered_options(), | ||
}, | ||
"ingest-monitors": { | ||
"topic": settings.KAFKA_INGEST_MONITORS, | ||
"topic": Topic.INGEST_MONITORS, | ||
"topic_override": settings.KAFKA_INGEST_MONITORS, | ||
"strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory", | ||
"click_options": ingest_monitors_options(), | ||
}, | ||
"billing-metrics-consumer": { | ||
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS, | ||
"topic": Topic.SNUBA_GENERIC_METRICS, | ||
"topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS, | ||
"strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory", | ||
}, | ||
# Known differences to 'sentry run occurrences-ingest-consumer': | ||
# - ingest_consumer_types metric tag is missing. Use the kafka_topic and | ||
# group_id tags provided by run_basic_consumer instead | ||
"ingest-occurrences": { | ||
"topic": settings.KAFKA_INGEST_OCCURRENCES, | ||
"topic": Topic.INGEST_OCCURRENCES, | ||
"topic_override": settings.KAFKA_INGEST_OCCURRENCES, | ||
"strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=20), | ||
}, | ||
"events-subscription-results": { | ||
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, | ||
"topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS, | ||
"topic_override": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, | ||
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS, | ||
}, | ||
}, | ||
"transactions-subscription-results": { | ||
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, | ||
"topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS, | ||
"topic_override": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, | ||
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS, | ||
}, | ||
}, | ||
"generic-metrics-subscription-results": { | ||
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"default_topic": "generic-metrics-subscription-results", | ||
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any specific reason this is the only topic being set via the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll give the same answer as above - two reasons:
|
||
"topic_override": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"validate_schema": True, | ||
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
}, | ||
}, | ||
"sessions-subscription-results": { | ||
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, | ||
"topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS, | ||
"topic_override": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, | ||
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", | ||
"click_options": multiprocessing_options(), | ||
"static_args": { | ||
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS, | ||
}, | ||
}, | ||
"metrics-subscription-results": { | ||
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS, | ||
"topic_override": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS, | ||
}, | ||
}, | ||
"ingest-events": { | ||
"topic": settings.KAFKA_INGEST_EVENTS, | ||
"topic": Topic.INGEST_EVENTS, | ||
"topic_override": settings.KAFKA_INGEST_EVENTS, | ||
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"consumer_type": "events", | ||
}, | ||
}, | ||
"ingest-attachments": { | ||
"topic": settings.KAFKA_INGEST_ATTACHMENTS, | ||
"topic": Topic.INGEST_ATTACHMENTS, | ||
"topic_override": settings.KAFKA_INGEST_ATTACHMENTS, | ||
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"consumer_type": "attachments", | ||
}, | ||
}, | ||
"ingest-transactions": { | ||
"topic": settings.KAFKA_INGEST_TRANSACTIONS, | ||
"topic": Topic.INGEST_TRANSACTIONS, | ||
"topic_override": settings.KAFKA_INGEST_TRANSACTIONS, | ||
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
"consumer_type": "transactions", | ||
}, | ||
}, | ||
"ingest-metrics": { | ||
"topic": settings.KAFKA_INGEST_METRICS, | ||
"topic": Topic.INGEST_METRICS, | ||
"topic_override": settings.KAFKA_INGEST_METRICS, | ||
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", | ||
"click_options": _METRICS_INDEXER_OPTIONS, | ||
"static_args": { | ||
|
@@ -276,7 +290,8 @@ def ingest_monitors_options() -> list[click.Option]: | |
"dlq_max_consecutive_count": 1000, | ||
}, | ||
"ingest-generic-metrics": { | ||
"topic": settings.KAFKA_INGEST_PERFORMANCE_METRICS, | ||
"topic": Topic.INGEST_PERFORMANCE_METRICS, | ||
"topic_override": settings.KAFKA_INGEST_PERFORMANCE_METRICS, | ||
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory", | ||
"click_options": _METRICS_INDEXER_OPTIONS, | ||
"static_args": { | ||
|
@@ -287,46 +302,53 @@ def ingest_monitors_options() -> list[click.Option]: | |
"dlq_max_consecutive_count": 1000, | ||
}, | ||
"generic-metrics-last-seen-updater": { | ||
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS, | ||
"topic": Topic.SNUBA_GENERIC_METRICS, | ||
"topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS, | ||
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory", | ||
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS, | ||
"static_args": { | ||
"ingest_profile": "performance", | ||
}, | ||
}, | ||
"metrics-last-seen-updater": { | ||
"topic": settings.KAFKA_SNUBA_METRICS, | ||
"topic": Topic.SNUBA_METRICS, | ||
"topic_override": settings.KAFKA_SNUBA_METRICS, | ||
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory", | ||
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS, | ||
"static_args": { | ||
"ingest_profile": "release-health", | ||
}, | ||
}, | ||
"post-process-forwarder-issue-platform": { | ||
"topic": settings.KAFKA_EVENTSTREAM_GENERIC, | ||
"topic": Topic.EVENTSTREAM_GENERIC, | ||
"topic_override": settings.KAFKA_EVENTSTREAM_GENERIC, | ||
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", | ||
"synchronize_commit_log_topic_default": "snuba-generic-events-commit-log", | ||
"synchronize_commit_group_default": "generic_events_group", | ||
"click_options": _POST_PROCESS_FORWARDER_OPTIONS, | ||
}, | ||
"post-process-forwarder-transactions": { | ||
"topic": settings.KAFKA_TRANSACTIONS, | ||
"topic": Topic.TRANSACTIONS, | ||
"topic_override": settings.KAFKA_TRANSACTIONS, | ||
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", | ||
"synchronize_commit_log_topic_default": "snuba-transactions-commit-log", | ||
"synchronize_commit_group_default": "transactions_group", | ||
"click_options": _POST_PROCESS_FORWARDER_OPTIONS, | ||
}, | ||
"post-process-forwarder-errors": { | ||
"topic": settings.KAFKA_EVENTS, | ||
"topic": Topic.EVENTS, | ||
"topic_override": settings.KAFKA_EVENTS, | ||
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory", | ||
"synchronize_commit_log_topic_default": "snuba-commit-log", | ||
"synchronize_commit_group_default": "snuba-consumers", | ||
"click_options": _POST_PROCESS_FORWARDER_OPTIONS, | ||
}, | ||
"process-spans": { | ||
"topic": settings.KAFKA_SNUBA_SPANS, | ||
"topic": Topic.SNUBA_SPANS, | ||
"topic_override": settings.KAFKA_SNUBA_SPANS, | ||
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", | ||
}, | ||
# TODO: This has to be co-ordinated with getsentry | ||
**settings.SENTRY_KAFKA_CONSUMERS, | ||
} | ||
|
||
|
@@ -396,6 +418,7 @@ def get_stream_processor( | |
|
||
from sentry.utils import kafka_config | ||
|
||
# Still supported for now until we have moved everything in prod over | ||
topic_def = settings.KAFKA_TOPICS[logical_topic] | ||
assert topic_def is not None | ||
if cluster is None: | ||
|
@@ -461,8 +484,8 @@ def build_consumer_config(group_id: str): | |
"--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer" | ||
) | ||
|
||
# Validate schema if "default_topic" is set | ||
default_topic = consumer_definition.get("default_topic") | ||
# Validate schema if "validate_schema" is set | ||
default_topic = consumer_definition.get("validate_schema") | ||
if default_topic: | ||
strategy_factory = ValidateSchemaStrategyFactoryWrapper( | ||
default_topic, validate_schema, strategy_factory | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved into #66078