New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref: Rationalize kafka config - take 2 #66242
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from __future__ import annotations | ||
|
||
from collections.abc import Callable, Mapping, Sequence | ||
from enum import Enum | ||
from typing import Any, Required, TypedDict | ||
|
||
import click | ||
|
||
|
||
class Topic(Enum): | ||
""" | ||
These are the default topic names used by Sentry. They must match | ||
the registered values in sentry-kafka-schemas. | ||
""" | ||
|
||
EVENTS = "events" | ||
EVENTS_COMMIT_LOG = "snuba-commit-log" | ||
TRANSACTIONS = "transactions" | ||
TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log" | ||
OUTCOMES = "outcomes" | ||
OUTCOMES_BILLING = "outcomes-billing" | ||
EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results" | ||
TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results" | ||
GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results" | ||
SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results" | ||
METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results" | ||
INGEST_EVENTS = "ingest-events" | ||
INGEST_EVENTS_DLQ = "ingest-events-dlq" | ||
INGEST_ATTACHMENTS = "ingest-attachments" | ||
INGEST_TRANSACTIONS = "ingest-transactions" | ||
INGEST_METRICS = "ingest-metrics" | ||
INGEST_METRICS_DLQ = "ingest-metrics-dlq" | ||
SNUBA_METRICS = "snuba-metrics" | ||
PROFILES = "profiles" | ||
INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics" | ||
INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq" | ||
SNUBA_GENERIC_METRICS = "snuba-generic-metrics" | ||
INGEST_REPLAY_EVENTS = "ingest-replay-events" | ||
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" | ||
INGEST_OCCURRENCES = "ingest-occurrences" | ||
INGEST_MONITORS = "ingest-monitors" | ||
EVENTSTREAM_GENERIC = "generic-events" | ||
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log" | ||
GROUP_ATTRIBUTES = "group-attributes" | ||
SHARED_RESOURCES_USAGE = "shared-resources-usage" | ||
SNUBA_SPANS = "snuba-spans" | ||
|
||
|
||
class ConsumerDefinition(TypedDict, total=False): | ||
|
||
# XXX: Eventually only Topic will be accepted here. | ||
# For backward compatibility with getsentry, we must also | ||
# support the physical override topic name (str, Callable[str], str) | ||
# while the migration is taking place | ||
topic: Required[Topic | str | Callable[[], str]] | ||
|
||
# Schema validation will be run if true | ||
validate_schema: bool | None | ||
|
||
strategy_factory: Required[str] | ||
|
||
# Additional CLI options the consumer should accept. These arguments are | ||
# passed as kwargs to the strategy_factory. | ||
click_options: Sequence[click.Option] | ||
|
||
# Hardcoded additional kwargs for strategy_factory | ||
static_args: Mapping[str, Any] | ||
|
||
require_synchronization: bool | ||
synchronize_commit_group_default: str | ||
synchronize_commit_log_topic_default: str | ||
|
||
dlq_topic: str | ||
dlq_max_invalid_ratio: float | None | ||
dlq_max_consecutive_count: int | None | ||
Comment on lines
+74
to
+75
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are these actually nullable or are they meant to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think nullable is right here |
||
|
||
|
||
def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None: | ||
if "dlq_topic" not in consumer_definition and ( | ||
"dlq_max_invalid_ratio" in consumer_definition | ||
or "dlq_max_consecutive_count" in consumer_definition | ||
): | ||
raise ValueError( | ||
"Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not" | ||
) | ||
Comment on lines
+78
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, the intention of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just copy + paste |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,11 @@ | |
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory | ||
from django.conf import settings | ||
|
||
from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition | ||
from sentry.conf.types.kafka_definition import ( | ||
ConsumerDefinition, | ||
Topic, | ||
validate_consumer_definition, | ||
) | ||
from sentry.consumers.validate_schema import ValidateSchema | ||
from sentry.utils.imports import import_string | ||
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition | ||
|
@@ -184,10 +188,8 @@ def ingest_events_options() -> list[click.Option]: | |
] | ||
|
||
# consumer name -> consumer definition | ||
# XXX: default_topic is needed to lookup the schema even if the actual topic name has been | ||
# overridden. This is because the current topic override mechanism means the default topic name | ||
# is no longer available anywhere in code. We should probably fix this later so we don't need both | ||
# "topic" and "default_topic" here though. | ||
# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden | ||
# string. We support both currently for backward compatibility. | ||
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = { | ||
"ingest-profiles": { | ||
"topic": settings.KAFKA_PROFILES, | ||
|
@@ -237,8 +239,8 @@ def ingest_events_options() -> list[click.Option]: | |
}, | ||
}, | ||
"generic-metrics-subscription-results": { | ||
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"default_topic": "generic-metrics-subscription-results", | ||
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS, | ||
"validate_schema": True, | ||
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory", | ||
"click_options": multiprocessing_options(default_max_batch_size=100), | ||
"static_args": { | ||
|
@@ -378,6 +380,14 @@ def get_stream_processor( | |
validate_schema: bool = False, | ||
group_instance_id: str | None = None, | ||
) -> StreamProcessor: | ||
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration | ||
from arroyo.backends.kafka.consumer import KafkaConsumer | ||
from arroyo.commit import ONCE_PER_SECOND | ||
from arroyo.types import Topic as ArroyoTopic | ||
from django.conf import settings | ||
|
||
from sentry.utils import kafka_config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a lot of these imports can be removed entirely -- or moved to the top of the file (some are already there) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved up the arroyo imports, gonna leave the rest as-is |
||
|
||
try: | ||
consumer_definition = KAFKA_CONSUMERS[consumer_name] | ||
except KeyError: | ||
|
@@ -394,12 +404,19 @@ def get_stream_processor( | |
) from e | ||
|
||
strategy_factory_cls = import_string(consumer_definition["strategy_factory"]) | ||
logical_topic = consumer_definition["topic"] | ||
if not isinstance(logical_topic, str): | ||
logical_topic = logical_topic() | ||
consumer_topic = consumer_definition["topic"] | ||
if isinstance(consumer_topic, Topic): | ||
default_topic = consumer_topic.value | ||
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic) | ||
else: | ||
# TODO: Deprecated, remove once this way is no longer used | ||
if not isinstance(consumer_topic, str): | ||
real_topic = consumer_topic() | ||
else: | ||
real_topic = consumer_topic | ||
|
||
if topic is None: | ||
topic = logical_topic | ||
topic = real_topic | ||
|
||
cmd = click.Command( | ||
name=consumer_name, params=list(consumer_definition.get("click_options") or ()) | ||
|
@@ -409,16 +426,9 @@ def get_stream_processor( | |
strategy_factory_cls, **cmd_context.params, **consumer_definition.get("static_args") or {} | ||
) | ||
|
||
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration | ||
from arroyo.backends.kafka.consumer import KafkaConsumer | ||
from arroyo.commit import ONCE_PER_SECOND | ||
from arroyo.types import Topic | ||
from django.conf import settings | ||
|
||
from sentry.utils import kafka_config | ||
|
||
topic_def = settings.KAFKA_TOPICS[logical_topic] | ||
topic_def = settings.KAFKA_TOPICS[real_topic] | ||
assert topic_def is not None | ||
|
||
if cluster is None: | ||
cluster = topic_def["cluster"] | ||
|
||
|
@@ -474,19 +484,24 @@ def build_consumer_config(group_id: str): | |
consumer = SynchronizedConsumer( | ||
consumer=consumer, | ||
commit_log_consumer=commit_log_consumer, | ||
commit_log_topic=Topic(synchronize_commit_log_topic), | ||
commit_log_topic=ArroyoTopic(synchronize_commit_log_topic), | ||
commit_log_groups={synchronize_commit_group}, | ||
) | ||
elif consumer_definition.get("require_synchronization"): | ||
click.BadParameter( | ||
"--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer" | ||
) | ||
|
||
# Validate schema if "default_topic" is set | ||
default_topic = consumer_definition.get("default_topic") | ||
if default_topic: | ||
# Validate schema if "validate_schema" is set | ||
validate_schema = consumer_definition.get("validate_schema") or False | ||
|
||
if validate_schema: | ||
# TODO: Remove this later but for now we can only validate if `topic_def` is | ||
# the logical topic and not the legacy override topic | ||
assert isinstance(consumer_topic, Topic) | ||
|
||
strategy_factory = ValidateSchemaStrategyFactoryWrapper( | ||
default_topic, validate_schema, strategy_factory | ||
consumer_topic.value, validate_schema, strategy_factory | ||
) | ||
|
||
if healthcheck_file_path is not None: | ||
|
@@ -512,7 +527,7 @@ def build_consumer_config(group_id: str): | |
dlq_producer = KafkaProducer(producer_config) | ||
|
||
dlq_policy = DlqPolicy( | ||
KafkaDlqProducer(dlq_producer, Topic(dlq_topic)), | ||
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)), | ||
DlqLimit( | ||
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"], | ||
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"], | ||
|
@@ -524,7 +539,7 @@ def build_consumer_config(group_id: str): | |
|
||
return StreamProcessor( | ||
consumer=consumer, | ||
topic=Topic(topic), | ||
topic=ArroyoTopic(topic), | ||
processor_factory=strategy_factory, | ||
commit_policy=ONCE_PER_SECOND, | ||
join_timeout=join_timeout, | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this actually supposed to be
total=False
? all keys are optional?I usually find for dicts where the are some required keys that it's easier to understand if the not-require ones are marked
NotRequired
rather thantotal=False
+Required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure, i'm somewhat reluctant to touch this code in this PR. it's just copy + paste