Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Rationalize kafka config - take 2 #66242

Merged
merged 3 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 53 additions & 2 deletions src/sentry/conf/server.py
Expand Up @@ -17,7 +17,7 @@
from urllib.parse import urlparse

import sentry
from sentry.conf.types.consumer_definition import ConsumerDefinition
from sentry.conf.types.kafka_definition import ConsumerDefinition
from sentry.conf.types.logging_config import LoggingConfig
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
Expand Down Expand Up @@ -3443,6 +3443,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
# `prod.py`, also override the entirety of `KAFKA_TOPICS` to ensure the keys
# pick up the change.

# START DEPRECATED SECTION
KAFKA_EVENTS = "events"
KAFKA_EVENTS_COMMIT_LOG = "snuba-commit-log"
KAFKA_TRANSACTIONS = "transactions"
Expand Down Expand Up @@ -3477,8 +3478,59 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]

# spans
KAFKA_SNUBA_SPANS = "snuba-spans"
# END DEPRECATED SECTION


# Mapping of default Kafka topic name to custom names
KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {
# TODO: This is temporary while we migrate between the old and new way of defining overrides.
# To be removed once this is defined in prod, along with KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS
# variable which will no longer be needed
"generic-metrics-subscription-results": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS
}


# Mapping of default Kafka topic name to cluster name
# as per KAFKA_CLUSTERS.
# This must be the default name that matches the topic
# in sentry.conf.types.kafka_definition and sentry-kafka-schemas
# and not any environment-specific override value
KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = {
"events": "default",
"ingest-events-dlq": "default",
"snuba-commit-log": "default",
"transactions": "default",
"snuba-transactions-commit-log": "default",
"outcomes": "default",
"outcomes-billing": "default",
"events-subscription-results": "default",
"transactions-subscription-results": "default",
"generic-metrics-subscription-results": "default",
"sessions-subscription-results": "default",
"metrics-subscription-results": "default",
"ingest-events": "default",
"ingest-attachments": "default",
"ingest-transactions": "default",
"ingest-metrics": "default",
"ingest-metrics-dlq": "default",
"snuba-metrics": "default",
"profiles": "default",
"ingest-performance-metrics": "default",
"ingest-generic-metrics-dlq": "default",
"snuba-generic-metrics": "default",
"ingest-replay-events": "default",
"ingest-replay-recordings": "default",
"ingest-occurrences": "default",
"ingest-monitors": "default",
"generic-events": "default",
"snuba-generic-events-commit-log": "default",
"group-attributes": "default",
"snuba-spans": "default",
"shared-resources-usage": "default",
}

# Cluster configuration for each Kafka topic by name.
# DEPRECATED
KAFKA_TOPICS: Mapping[str, TopicDefinition] = {
KAFKA_EVENTS: {"cluster": "default"},
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
Expand Down Expand Up @@ -3521,7 +3573,6 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"},
}


# If True, sentry.utils.arroyo.RunTaskWithMultiprocessing will actually be
# single-threaded under the hood for performance
KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING = False
Expand Down
39 changes: 0 additions & 39 deletions src/sentry/conf/types/consumer_definition.py

This file was deleted.

85 changes: 85 additions & 0 deletions src/sentry/conf/types/kafka_definition.py
@@ -0,0 +1,85 @@
from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from enum import Enum
from typing import Any, Required, TypedDict

import click


class Topic(Enum):
"""
These are the default topic names used by Sentry. They must match
the registered values in sentry-kafka-schemas.
"""

EVENTS = "events"
EVENTS_COMMIT_LOG = "snuba-commit-log"
TRANSACTIONS = "transactions"
TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log"
OUTCOMES = "outcomes"
OUTCOMES_BILLING = "outcomes-billing"
EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results"
TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results"
GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results"
SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results"
METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results"
INGEST_EVENTS = "ingest-events"
INGEST_EVENTS_DLQ = "ingest-events-dlq"
INGEST_ATTACHMENTS = "ingest-attachments"
INGEST_TRANSACTIONS = "ingest-transactions"
INGEST_METRICS = "ingest-metrics"
INGEST_METRICS_DLQ = "ingest-metrics-dlq"
SNUBA_METRICS = "snuba-metrics"
PROFILES = "profiles"
INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq"
SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
INGEST_REPLAY_EVENTS = "ingest-replay-events"
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
INGEST_OCCURRENCES = "ingest-occurrences"
INGEST_MONITORS = "ingest-monitors"
EVENTSTREAM_GENERIC = "generic-events"
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"


class ConsumerDefinition(TypedDict, total=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this actually supposed to be total=False? all keys are optional?

I usually find for dicts where the are some required keys that it's easier to understand if the not-require ones are marked NotRequired rather than total=False + Required

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure, i'm somewhat reluctant to touch this code in this PR. it's just copy + paste


# XXX: Eventually only Topic will be accepted here.
# For backward compatibility with getsentry, we must also
# support the physical override topic name (str, Callable[str], str)
# while the migration is taking place
topic: Required[Topic | str | Callable[[], str]]

# Schema validation will be run if true
validate_schema: bool | None

strategy_factory: Required[str]

# Additional CLI options the consumer should accept. These arguments are
# passed as kwargs to the strategy_factory.
click_options: Sequence[click.Option]

# Hardcoded additional kwargs for strategy_factory
static_args: Mapping[str, Any]

require_synchronization: bool
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str

dlq_topic: str
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None
Comment on lines +74 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these actually nullable or are they meant to be NotRequired

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think nullable is right here



def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None:
if "dlq_topic" not in consumer_definition and (
"dlq_max_invalid_ratio" in consumer_definition
or "dlq_max_consecutive_count" in consumer_definition
):
raise ValueError(
"Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not"
)
Comment on lines +78 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the intention of the types modules is that they are just types -- this kind of goes against it by having logic here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just copy + paste

69 changes: 42 additions & 27 deletions src/sentry/consumers/__init__.py
Expand Up @@ -13,7 +13,11 @@
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from django.conf import settings

from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition
from sentry.conf.types.kafka_definition import (
ConsumerDefinition,
Topic,
validate_consumer_definition,
)
from sentry.consumers.validate_schema import ValidateSchema
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -184,10 +188,8 @@ def ingest_events_options() -> list[click.Option]:
]

# consumer name -> consumer definition
# XXX: default_topic is needed to lookup the schema even if the actual topic name has been
# overridden. This is because the current topic override mechanism means the default topic name
# is no longer available anywhere in code. We should probably fix this later so we don't need both
# "topic" and "default_topic" here though.
# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden
# string. We support both currently for backward compatibility.
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = {
"ingest-profiles": {
"topic": settings.KAFKA_PROFILES,
Expand Down Expand Up @@ -237,8 +239,8 @@ def ingest_events_options() -> list[click.Option]:
},
},
"generic-metrics-subscription-results": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"default_topic": "generic-metrics-subscription-results",
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"validate_schema": True,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
Expand Down Expand Up @@ -378,6 +380,14 @@ def get_stream_processor(
validate_schema: bool = False,
group_instance_id: str | None = None,
) -> StreamProcessor:
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer
from arroyo.commit import ONCE_PER_SECOND
from arroyo.types import Topic as ArroyoTopic
from django.conf import settings

from sentry.utils import kafka_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of these imports can be removed entirely -- or moved to the top of the file (some are already there)

Copy link
Member Author

@lynnagara lynnagara Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved up the arroyo imports, gonna leave the rest as-is


try:
consumer_definition = KAFKA_CONSUMERS[consumer_name]
except KeyError:
Expand All @@ -394,12 +404,19 @@ def get_stream_processor(
) from e

strategy_factory_cls = import_string(consumer_definition["strategy_factory"])
logical_topic = consumer_definition["topic"]
if not isinstance(logical_topic, str):
logical_topic = logical_topic()
consumer_topic = consumer_definition["topic"]
if isinstance(consumer_topic, Topic):
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)
else:
# TODO: Deprecated, remove once this way is no longer used
if not isinstance(consumer_topic, str):
real_topic = consumer_topic()
else:
real_topic = consumer_topic

if topic is None:
topic = logical_topic
topic = real_topic

cmd = click.Command(
name=consumer_name, params=list(consumer_definition.get("click_options") or ())
Expand All @@ -409,16 +426,9 @@ def get_stream_processor(
strategy_factory_cls, **cmd_context.params, **consumer_definition.get("static_args") or {}
)

from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer
from arroyo.commit import ONCE_PER_SECOND
from arroyo.types import Topic
from django.conf import settings

from sentry.utils import kafka_config

topic_def = settings.KAFKA_TOPICS[logical_topic]
topic_def = settings.KAFKA_TOPICS[real_topic]
assert topic_def is not None

if cluster is None:
cluster = topic_def["cluster"]

Expand Down Expand Up @@ -474,19 +484,24 @@ def build_consumer_config(group_id: str):
consumer = SynchronizedConsumer(
consumer=consumer,
commit_log_consumer=commit_log_consumer,
commit_log_topic=Topic(synchronize_commit_log_topic),
commit_log_topic=ArroyoTopic(synchronize_commit_log_topic),
commit_log_groups={synchronize_commit_group},
)
elif consumer_definition.get("require_synchronization"):
click.BadParameter(
"--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer"
)

# Validate schema if "default_topic" is set
default_topic = consumer_definition.get("default_topic")
if default_topic:
# Validate schema if "validate_schema" is set
validate_schema = consumer_definition.get("validate_schema") or False

if validate_schema:
# TODO: Remove this later but for now we can only validate if `topic_def` is
# the logical topic and not the legacy override topic
assert isinstance(consumer_topic, Topic)

strategy_factory = ValidateSchemaStrategyFactoryWrapper(
default_topic, validate_schema, strategy_factory
consumer_topic.value, validate_schema, strategy_factory
)

if healthcheck_file_path is not None:
Expand All @@ -512,7 +527,7 @@ def build_consumer_config(group_id: str):
dlq_producer = KafkaProducer(producer_config)

dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, Topic(dlq_topic)),
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)),
DlqLimit(
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"],
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"],
Expand All @@ -524,7 +539,7 @@ def build_consumer_config(group_id: str):

return StreamProcessor(
consumer=consumer,
topic=Topic(topic),
topic=ArroyoTopic(topic),
processor_factory=strategy_factory,
commit_policy=ONCE_PER_SECOND,
join_timeout=join_timeout,
Expand Down
9 changes: 6 additions & 3 deletions src/sentry/runner/commands/devserver.py
Expand Up @@ -364,11 +364,14 @@ def devserver(
"""
)

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.batching_kafka_consumer import create_topics

for topic_name, topic_data in settings.KAFKA_TOPICS.items():
if topic_data is not None:
create_topics(topic_data["cluster"], [topic_name])
for topic in Topic:
default_name = topic.value
physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name)
cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name]
create_topics(cluster_name, [physical_name])

if dev_consumer:
daemons.append(
Expand Down
27 changes: 0 additions & 27 deletions tests/sentry/conf/test_consumer_definitions.py

This file was deleted.