Skip to content

Commit

Permalink
ref: Move code to new-style kafka topics - take 2 (#66381)
Browse files Browse the repository at this point in the history
Brings back #66283 with fixes to
the indexer strategy. It crashed on the prior deploy as we were not
producing to the correctly resolved topic.
  • Loading branch information
lynnagara committed Mar 6, 2024
1 parent bf0ebfe commit 6fa756e
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 70 deletions.
10 changes: 8 additions & 2 deletions src/sentry/conf/server.py
Expand Up @@ -21,7 +21,6 @@
from sentry.conf.types.logging_config import LoggingConfig
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
from sentry.conf.types.topic_definition import TopicDefinition
from sentry.utils import json # NOQA (used in getsentry config)
from sentry.utils.celery import crontab_with_minute_jitter
from sentry.utils.types import Type, type_from_value
Expand Down Expand Up @@ -3529,9 +3528,16 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"shared-resources-usage": "default",
}

from typing import TypedDict


class LegacyTopicDefinition(TypedDict):
cluster: str


# Cluster configuration for each Kafka topic by name.
# DEPRECATED
KAFKA_TOPICS: Mapping[str, TopicDefinition] = {
KAFKA_TOPICS: Mapping[str, LegacyTopicDefinition] = {
KAFKA_EVENTS: {"cluster": "default"},
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
KAFKA_TRANSACTIONS: {"cluster": "default"},
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/conf/types/kafka_definition.py
Expand Up @@ -70,7 +70,7 @@ class ConsumerDefinition(TypedDict, total=False):
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str

dlq_topic: str
dlq_topic: Topic
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None

Expand Down
2 changes: 2 additions & 0 deletions src/sentry/conf/types/topic_definition.py
Expand Up @@ -5,3 +5,5 @@

class TopicDefinition(TypedDict):
cluster: str
# The topic name may be overridden from the default via KAFKA_TOPIC_OVERRIDES
real_topic_name: str
9 changes: 5 additions & 4 deletions src/sentry/consumers/__init__.py
Expand Up @@ -298,7 +298,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"ingest_profile": "release-health",
},
"dlq_topic": settings.KAFKA_INGEST_METRICS_DLQ,
"dlq_topic": Topic.INGEST_METRICS_DLQ,
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
},
Expand All @@ -309,7 +309,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"ingest_profile": "performance",
},
"dlq_topic": settings.KAFKA_INGEST_GENERIC_METRICS_DLQ,
"dlq_topic": Topic.INGEST_GENERIC_METRICS_DLQ,
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
},
Expand Down Expand Up @@ -517,7 +517,8 @@ def build_consumer_config(group_id: str):
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
) from e
try:
cluster_setting = get_topic_definition(dlq_topic)["cluster"]
dlq_topic_defn = get_topic_definition(dlq_topic)
cluster_setting = dlq_topic_defn["cluster"]
except ValueError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
Expand All @@ -527,7 +528,7 @@ def build_consumer_config(group_id: str):
dlq_producer = KafkaProducer(producer_config)

dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)),
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])),
DlqLimit(
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"],
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"],
Expand Down
18 changes: 10 additions & 8 deletions src/sentry/eventstream/kafka/backend.py
Expand Up @@ -7,9 +7,9 @@
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from django.conf import settings

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.eventstream.base import EventStreamEventType, GroupStates
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.killswitches import killswitch_matches_context
Expand All @@ -24,15 +24,15 @@

class KafkaEventStream(SnubaProtocolEventStream):
def __init__(self, **options: Any) -> None:
self.topic = settings.KAFKA_EVENTS
self.transactions_topic = settings.KAFKA_TRANSACTIONS
self.issue_platform_topic = settings.KAFKA_EVENTSTREAM_GENERIC
self.__producers: MutableMapping[str, Producer] = {}
self.topic = Topic.EVENTS
self.transactions_topic = Topic.TRANSACTIONS
self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC
self.__producers: MutableMapping[Topic, Producer] = {}

def get_transactions_topic(self, project_id: int) -> str:
def get_transactions_topic(self, project_id: int) -> Topic:
return self.transactions_topic

def get_producer(self, topic: str) -> Producer:
def get_producer(self, topic: Topic) -> Producer:
if topic not in self.__producers:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
Expand Down Expand Up @@ -202,9 +202,11 @@ def _send(

assert isinstance(extra_data, tuple)

real_topic = get_topic_definition(topic)["real_topic_name"]

try:
producer.produce(
topic=topic,
topic=real_topic,
key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None,
value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data),
on_delivery=self.delivery_callback,
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/issues/attributes.py
Expand Up @@ -6,7 +6,7 @@

import requests
import urllib3
from arroyo import Topic
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.db.models import F, Window
Expand All @@ -16,6 +16,7 @@
from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.models.group import Group
from sentry.models.groupassignee import GroupAssignee
from sentry.models.groupowner import GroupOwner, GroupOwnerType
Expand Down Expand Up @@ -44,7 +45,7 @@ class GroupValues:


def _get_attribute_snapshot_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_GROUP_ATTRIBUTES)["cluster"]
cluster_name = get_topic_definition(Topic.GROUP_ATTRIBUTES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand Down Expand Up @@ -122,7 +123,7 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None:
raise snuba.SnubaError(err)
else:
payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), [])
_attribute_snapshot_producer.produce(Topic(settings.KAFKA_GROUP_ATTRIBUTES), payload)
_attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload)


def _retrieve_group_values(group_id: int) -> GroupValues:
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/issues/producer.py
Expand Up @@ -4,11 +4,12 @@
from collections.abc import MutableMapping
from typing import Any, cast

from arroyo import Topic
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from arroyo.types import Message, Value
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.run import process_message
from sentry.issues.status_change_message import StatusChangeMessage
Expand All @@ -33,7 +34,7 @@ class PayloadType(ValueEqualityEnum):


def _get_occurrence_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"]
cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand Down Expand Up @@ -68,7 +69,7 @@ def produce_occurrence_to_kafka(
process_message(Message(Value(payload=payload, committable={})))
return

_occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload)
_occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload)


def _prepare_occurrence_message(
Expand Down
14 changes: 8 additions & 6 deletions src/sentry/monitors/tasks.py
Expand Up @@ -7,11 +7,13 @@

import msgpack
import sentry_sdk
from arroyo import Partition, Topic
from arroyo import Partition
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from confluent_kafka.admin import AdminClient, PartitionMetadata
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.constants import ObjectStatus
from sentry.monitors.logic.mark_failed import mark_failed
from sentry.monitors.schedule import get_prev_schedule
Expand Down Expand Up @@ -50,7 +52,7 @@


def _get_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
cluster_name = get_topic_definition(Topic.INGEST_MONITORS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand All @@ -62,10 +64,10 @@ def _get_producer() -> KafkaProducer:

@lru_cache(maxsize=None)
def _get_partitions() -> Mapping[int, PartitionMetadata]:
topic = settings.KAFKA_INGEST_MONITORS
cluster_name = get_topic_definition(topic)["cluster"]
topic_defn = get_topic_definition(Topic.INGEST_MONITORS)
topic = topic_defn["real_topic_name"]

conf = get_kafka_admin_cluster_options(cluster_name)
conf = get_kafka_admin_cluster_options(topic_defn["cluster"])
admin_client = AdminClient(conf)
result = admin_client.list_topics(topic)
topic_metadata = result.topics.get(topic)
Expand Down Expand Up @@ -203,7 +205,7 @@ def clock_pulse(current_datetime=None):
# topic. This is a requirement to ensure that none of the partitions stall,
# since the global clock is tied to the slowest partition.
for partition in _get_partitions().values():
dest = Partition(Topic(settings.KAFKA_INGEST_MONITORS), partition.id)
dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id)
_checkin_producer.produce(dest, payload)


Expand Down
5 changes: 2 additions & 3 deletions src/sentry/replays/lib/kafka.py
@@ -1,5 +1,4 @@
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.pubsub import KafkaPublisher

Expand All @@ -10,7 +9,7 @@ def initialize_replays_publisher(is_async=False) -> KafkaPublisher:
global replay_publisher

if replay_publisher is None:
config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
get_kafka_producer_cluster_options(config["cluster"]),
asynchronous=is_async,
Expand Down
5 changes: 2 additions & 3 deletions src/sentry/replays/usecases/ingest/dom_index.py
Expand Up @@ -8,9 +8,8 @@
from hashlib import md5
from typing import Any, Literal, TypedDict, cast

from django.conf import settings

from sentry import features
from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.replays.usecases.ingest.events import SentryEvent
from sentry.replays.usecases.ingest.issue_creation import (
Expand Down Expand Up @@ -219,7 +218,7 @@ def _initialize_publisher() -> KafkaPublisher:
global replay_publisher

if replay_publisher is None:
config = kafka_config.get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
config = kafka_config.get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
kafka_config.get_kafka_producer_cluster_options(config["cluster"])
)
Expand Down
7 changes: 3 additions & 4 deletions src/sentry/runner/commands/devserver.py
Expand Up @@ -366,12 +366,11 @@ def devserver(

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.batching_kafka_consumer import create_topics
from sentry.utils.kafka_config import get_topic_definition

for topic in Topic:
default_name = topic.value
physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name)
cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name]
create_topics(cluster_name, [physical_name])
topic_defn = get_topic_definition(topic)
create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]])

if dev_consumer:
daemons.append(
Expand Down
8 changes: 5 additions & 3 deletions src/sentry/sentry_metrics/configuration.py
Expand Up @@ -10,6 +10,8 @@

import sentry_sdk

from sentry.conf.types.kafka_definition import Topic

# The maximum length of a column that is indexed in postgres. It is important to keep this in
# sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py
MAX_INDEXED_COLUMN_LENGTH = 200
Expand Down Expand Up @@ -46,7 +48,7 @@ class MetricsIngestConfiguration:
db_backend: IndexerStorage
db_backend_options: Mapping[str, Any]
input_topic: str
output_topic: str
output_topic: Topic
use_case_id: UseCaseKey
internal_metrics_tag: str | None
writes_limiter_cluster_options: Mapping[str, Any]
Expand Down Expand Up @@ -79,7 +81,7 @@ def get_ingest_config(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_METRICS,
output_topic=settings.KAFKA_SNUBA_METRICS,
output_topic=Topic.SNUBA_METRICS,
use_case_id=UseCaseKey.RELEASE_HEALTH,
internal_metrics_tag="release-health",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS,
Expand All @@ -96,7 +98,7 @@ def get_ingest_config(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS,
output_topic=Topic.SNUBA_GENERIC_METRICS,
use_case_id=UseCaseKey.PERFORMANCE,
internal_metrics_tag="perf",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE,
Expand Down
5 changes: 3 additions & 2 deletions src/sentry/sentry_metrics/consumers/indexer/multiprocess.py
Expand Up @@ -10,6 +10,7 @@
from arroyo.types import Commit, FilteredPayload, Message, Partition
from confluent_kafka import Producer

from sentry.conf.types.kafka_definition import Topic
from sentry.utils import kafka_config, metrics

logger = logging.getLogger(__name__)
Expand All @@ -18,15 +19,15 @@
class SimpleProduceStep(ProcessingStep[KafkaPayload]):
def __init__(
self,
output_topic: str,
output_topic: Topic,
commit_function: Commit,
producer: AbstractProducer[KafkaPayload] | None = None,
) -> None:
snuba_metrics = kafka_config.get_topic_definition(output_topic)
self.__producer = Producer(
kafka_config.get_kafka_producer_cluster_options(snuba_metrics["cluster"]),
)
self.__producer_topic = output_topic
self.__producer_topic = snuba_metrics["real_topic_name"]
self.__commit_function = commit_function

self.__closed = False
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/usage_accountant/accountant.py
Expand Up @@ -12,9 +12,9 @@

from arroyo.backends.abstract import Producer
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from usageaccountant import UsageAccumulator, UsageUnit

from sentry.conf.types.kafka_definition import Topic
from sentry.options import get
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand Down Expand Up @@ -71,7 +71,7 @@ def record(

if _accountant_backend is None:
cluster_name = get_topic_definition(
settings.KAFKA_SHARED_RESOURCES_USAGE,
Topic.SHARED_RESOURCES_USAGE,
)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer = KafkaProducer(
Expand Down
12 changes: 6 additions & 6 deletions src/sentry/utils/kafka_config.py
Expand Up @@ -3,6 +3,7 @@

from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.conf.types.topic_definition import TopicDefinition

SUPPORTED_KAFKA_CONFIGURATION = (
Expand Down Expand Up @@ -96,9 +97,8 @@ def get_kafka_admin_cluster_options(
)


def get_topic_definition(topic: str) -> TopicDefinition:
defn = settings.KAFKA_TOPICS.get(topic)
if defn is not None:
return defn
else:
raise ValueError(f"Unknown {topic=}")
def get_topic_definition(topic: Topic) -> TopicDefinition:
return {
"cluster": settings.KAFKA_TOPIC_TO_CLUSTER[topic.value],
"real_topic_name": settings.KAFKA_TOPIC_OVERRIDES.get(topic.value, topic.value),
}

0 comments on commit 6fa756e

Please sign in to comment.