Skip to content

Commit

Permalink
ref: Rationalize kafka config - take 2 (#66242)
Browse files Browse the repository at this point in the history
Redo of #65793 with fixed
imports
  • Loading branch information
lynnagara committed Mar 4, 2024
1 parent faed411 commit 2ad12d6
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 100 deletions.
55 changes: 53 additions & 2 deletions src/sentry/conf/server.py
Expand Up @@ -17,7 +17,7 @@
from urllib.parse import urlparse

import sentry
from sentry.conf.types.consumer_definition import ConsumerDefinition
from sentry.conf.types.kafka_definition import ConsumerDefinition
from sentry.conf.types.logging_config import LoggingConfig
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
Expand Down Expand Up @@ -3443,6 +3443,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
# `prod.py`, also override the entirety of `KAFKA_TOPICS` to ensure the keys
# pick up the change.

# START DEPRECATED SECTION
KAFKA_EVENTS = "events"
KAFKA_EVENTS_COMMIT_LOG = "snuba-commit-log"
KAFKA_TRANSACTIONS = "transactions"
Expand Down Expand Up @@ -3477,8 +3478,59 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]

# spans
KAFKA_SNUBA_SPANS = "snuba-spans"
# END DEPRECATED SECTION


# Mapping of default Kafka topic name to custom names
KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {
# TODO: This is temporary while we migrate between the old and new way of defining overrides.
# To be removed once this is defined in prod, along with KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS
# variable which will no longer be needed
"generic-metrics-subscription-results": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS
}


# Mapping of default Kafka topic name to cluster name
# as per KAFKA_CLUSTERS.
# This must be the default name that matches the topic
# in sentry.conf.types.kafka_definition and sentry-kafka-schemas
# and not any environment-specific override value
KAFKA_TOPIC_TO_CLUSTER: Mapping[str, str] = {
"events": "default",
"ingest-events-dlq": "default",
"snuba-commit-log": "default",
"transactions": "default",
"snuba-transactions-commit-log": "default",
"outcomes": "default",
"outcomes-billing": "default",
"events-subscription-results": "default",
"transactions-subscription-results": "default",
"generic-metrics-subscription-results": "default",
"sessions-subscription-results": "default",
"metrics-subscription-results": "default",
"ingest-events": "default",
"ingest-attachments": "default",
"ingest-transactions": "default",
"ingest-metrics": "default",
"ingest-metrics-dlq": "default",
"snuba-metrics": "default",
"profiles": "default",
"ingest-performance-metrics": "default",
"ingest-generic-metrics-dlq": "default",
"snuba-generic-metrics": "default",
"ingest-replay-events": "default",
"ingest-replay-recordings": "default",
"ingest-occurrences": "default",
"ingest-monitors": "default",
"generic-events": "default",
"snuba-generic-events-commit-log": "default",
"group-attributes": "default",
"snuba-spans": "default",
"shared-resources-usage": "default",
}

# Cluster configuration for each Kafka topic by name.
# DEPRECATED
KAFKA_TOPICS: Mapping[str, TopicDefinition] = {
KAFKA_EVENTS: {"cluster": "default"},
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
Expand Down Expand Up @@ -3521,7 +3573,6 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"},
}


# If True, sentry.utils.arroyo.RunTaskWithMultiprocessing will actually be
# single-threaded under the hood for performance
KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING = False
Expand Down
39 changes: 0 additions & 39 deletions src/sentry/conf/types/consumer_definition.py

This file was deleted.

85 changes: 85 additions & 0 deletions src/sentry/conf/types/kafka_definition.py
@@ -0,0 +1,85 @@
from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from enum import Enum
from typing import Any, Required, TypedDict

import click


class Topic(Enum):
"""
These are the default topic names used by Sentry. They must match
the registered values in sentry-kafka-schemas.
"""

EVENTS = "events"
EVENTS_COMMIT_LOG = "snuba-commit-log"
TRANSACTIONS = "transactions"
TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log"
OUTCOMES = "outcomes"
OUTCOMES_BILLING = "outcomes-billing"
EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results"
TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results"
GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results"
SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results"
METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results"
INGEST_EVENTS = "ingest-events"
INGEST_EVENTS_DLQ = "ingest-events-dlq"
INGEST_ATTACHMENTS = "ingest-attachments"
INGEST_TRANSACTIONS = "ingest-transactions"
INGEST_METRICS = "ingest-metrics"
INGEST_METRICS_DLQ = "ingest-metrics-dlq"
SNUBA_METRICS = "snuba-metrics"
PROFILES = "profiles"
INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq"
SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
INGEST_REPLAY_EVENTS = "ingest-replay-events"
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
INGEST_OCCURRENCES = "ingest-occurrences"
INGEST_MONITORS = "ingest-monitors"
EVENTSTREAM_GENERIC = "generic-events"
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"


class ConsumerDefinition(TypedDict, total=False):

# XXX: Eventually only Topic will be accepted here.
# For backward compatibility with getsentry, we must also
# support the physical override topic name (str, Callable[str], str)
# while the migration is taking place
topic: Required[Topic | str | Callable[[], str]]

# Schema validation will be run if true
validate_schema: bool | None

strategy_factory: Required[str]

# Additional CLI options the consumer should accept. These arguments are
# passed as kwargs to the strategy_factory.
click_options: Sequence[click.Option]

# Hardcoded additional kwargs for strategy_factory
static_args: Mapping[str, Any]

require_synchronization: bool
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str

dlq_topic: str
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None


def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None:
if "dlq_topic" not in consumer_definition and (
"dlq_max_invalid_ratio" in consumer_definition
or "dlq_max_consecutive_count" in consumer_definition
):
raise ValueError(
"Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not"
)
69 changes: 42 additions & 27 deletions src/sentry/consumers/__init__.py
Expand Up @@ -7,13 +7,21 @@
import click
from arroyo.backends.abstract import Consumer
from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer
from arroyo.commit import ONCE_PER_SECOND
from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer
from arroyo.processing.processor import StreamProcessor
from arroyo.processing.strategies import Healthcheck
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.types import Topic as ArroyoTopic
from django.conf import settings

from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition
from sentry.conf.types.kafka_definition import (
ConsumerDefinition,
Topic,
validate_consumer_definition,
)
from sentry.consumers.validate_schema import ValidateSchema
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -184,10 +192,8 @@ def ingest_events_options() -> list[click.Option]:
]

# consumer name -> consumer definition
# XXX: default_topic is needed to lookup the schema even if the actual topic name has been
# overridden. This is because the current topic override mechanism means the default topic name
# is no longer available anywhere in code. We should probably fix this later so we don't need both
# "topic" and "default_topic" here though.
# TODO: `topic` should gradually be migrated to the logical topic rather than the overridden
# string. We support both currently for backward compatibility.
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = {
"ingest-profiles": {
"topic": settings.KAFKA_PROFILES,
Expand Down Expand Up @@ -237,8 +243,8 @@ def ingest_events_options() -> list[click.Option]:
},
},
"generic-metrics-subscription-results": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"default_topic": "generic-metrics-subscription-results",
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"validate_schema": True,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
Expand Down Expand Up @@ -378,6 +384,10 @@ def get_stream_processor(
validate_schema: bool = False,
group_instance_id: str | None = None,
) -> StreamProcessor:
from django.conf import settings

from sentry.utils import kafka_config

try:
consumer_definition = KAFKA_CONSUMERS[consumer_name]
except KeyError:
Expand All @@ -394,12 +404,19 @@ def get_stream_processor(
) from e

strategy_factory_cls = import_string(consumer_definition["strategy_factory"])
logical_topic = consumer_definition["topic"]
if not isinstance(logical_topic, str):
logical_topic = logical_topic()
consumer_topic = consumer_definition["topic"]
if isinstance(consumer_topic, Topic):
default_topic = consumer_topic.value
real_topic = settings.KAFKA_TOPIC_OVERRIDES.get(default_topic, default_topic)
else:
# TODO: Deprecated, remove once this way is no longer used
if not isinstance(consumer_topic, str):
real_topic = consumer_topic()
else:
real_topic = consumer_topic

if topic is None:
topic = logical_topic
topic = real_topic

cmd = click.Command(
name=consumer_name, params=list(consumer_definition.get("click_options") or ())
Expand All @@ -409,16 +426,9 @@ def get_stream_processor(
strategy_factory_cls, **cmd_context.params, **consumer_definition.get("static_args") or {}
)

from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer
from arroyo.commit import ONCE_PER_SECOND
from arroyo.types import Topic
from django.conf import settings

from sentry.utils import kafka_config

topic_def = settings.KAFKA_TOPICS[logical_topic]
topic_def = settings.KAFKA_TOPICS[real_topic]
assert topic_def is not None

if cluster is None:
cluster = topic_def["cluster"]

Expand Down Expand Up @@ -474,19 +484,24 @@ def build_consumer_config(group_id: str):
consumer = SynchronizedConsumer(
consumer=consumer,
commit_log_consumer=commit_log_consumer,
commit_log_topic=Topic(synchronize_commit_log_topic),
commit_log_topic=ArroyoTopic(synchronize_commit_log_topic),
commit_log_groups={synchronize_commit_group},
)
elif consumer_definition.get("require_synchronization"):
click.BadParameter(
"--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer"
)

# Validate schema if "default_topic" is set
default_topic = consumer_definition.get("default_topic")
if default_topic:
# Validate schema if "validate_schema" is set
validate_schema = consumer_definition.get("validate_schema") or False

if validate_schema:
# TODO: Remove this later but for now we can only validate if `topic_def` is
# the logical topic and not the legacy override topic
assert isinstance(consumer_topic, Topic)

strategy_factory = ValidateSchemaStrategyFactoryWrapper(
default_topic, validate_schema, strategy_factory
consumer_topic.value, validate_schema, strategy_factory
)

if healthcheck_file_path is not None:
Expand All @@ -512,7 +527,7 @@ def build_consumer_config(group_id: str):
dlq_producer = KafkaProducer(producer_config)

dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, Topic(dlq_topic)),
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)),
DlqLimit(
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"],
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"],
Expand All @@ -524,7 +539,7 @@ def build_consumer_config(group_id: str):

return StreamProcessor(
consumer=consumer,
topic=Topic(topic),
topic=ArroyoTopic(topic),
processor_factory=strategy_factory,
commit_policy=ONCE_PER_SECOND,
join_timeout=join_timeout,
Expand Down
9 changes: 6 additions & 3 deletions src/sentry/runner/commands/devserver.py
Expand Up @@ -364,11 +364,14 @@ def devserver(
"""
)

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.batching_kafka_consumer import create_topics

for topic_name, topic_data in settings.KAFKA_TOPICS.items():
if topic_data is not None:
create_topics(topic_data["cluster"], [topic_name])
for topic in Topic:
default_name = topic.value
physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name)
cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name]
create_topics(cluster_name, [physical_name])

if dev_consumer:
daemons.append(
Expand Down

0 comments on commit 2ad12d6

Please sign in to comment.