Skip to content

Commit

Permalink
Revert "ref: Move more code to the new way of defining kafka topics a…
Browse files Browse the repository at this point in the history
…nd overrides (#66283)"

This reverts commit f7ffe5a.

Co-authored-by: ayirr7 <47572810+ayirr7@users.noreply.github.com>
  • Loading branch information
getsentry-bot and ayirr7 committed Mar 5, 2024
1 parent 4f3dbfb commit 622827e
Show file tree
Hide file tree
Showing 18 changed files with 69 additions and 83 deletions.
10 changes: 2 additions & 8 deletions src/sentry/conf/server.py
Expand Up @@ -21,6 +21,7 @@
from sentry.conf.types.logging_config import LoggingConfig
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
from sentry.conf.types.topic_definition import TopicDefinition
from sentry.utils import json # NOQA (used in getsentry config)
from sentry.utils.celery import crontab_with_minute_jitter
from sentry.utils.types import Type, type_from_value
Expand Down Expand Up @@ -3526,16 +3527,9 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"shared-resources-usage": "default",
}

from typing import TypedDict


class LegacyTopicDefinition(TypedDict):
cluster: str


# Cluster configuration for each Kafka topic by name.
# DEPRECATED
KAFKA_TOPICS: Mapping[str, LegacyTopicDefinition] = {
KAFKA_TOPICS: Mapping[str, TopicDefinition] = {
KAFKA_EVENTS: {"cluster": "default"},
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
KAFKA_TRANSACTIONS: {"cluster": "default"},
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/conf/types/kafka_definition.py
Expand Up @@ -70,7 +70,7 @@ class ConsumerDefinition(TypedDict, total=False):
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str

dlq_topic: Topic
dlq_topic: str
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None

Expand Down
2 changes: 0 additions & 2 deletions src/sentry/conf/types/topic_definition.py
Expand Up @@ -5,5 +5,3 @@

class TopicDefinition(TypedDict):
cluster: str
# The topic name may be overridden from the default via KAFKA_TOPIC_OVERRIDES
real_topic_name: str
9 changes: 4 additions & 5 deletions src/sentry/consumers/__init__.py
Expand Up @@ -290,7 +290,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"ingest_profile": "release-health",
},
"dlq_topic": Topic.INGEST_METRICS_DLQ,
"dlq_topic": settings.KAFKA_INGEST_METRICS_DLQ,
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
},
Expand All @@ -301,7 +301,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"ingest_profile": "performance",
},
"dlq_topic": Topic.INGEST_GENERIC_METRICS_DLQ,
"dlq_topic": settings.KAFKA_INGEST_GENERIC_METRICS_DLQ,
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
},
Expand Down Expand Up @@ -509,8 +509,7 @@ def build_consumer_config(group_id: str):
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
) from e
try:
dlq_topic_defn = get_topic_definition(dlq_topic)
cluster_setting = dlq_topic_defn["cluster"]
cluster_setting = get_topic_definition(dlq_topic)["cluster"]
except ValueError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
Expand All @@ -520,7 +519,7 @@ def build_consumer_config(group_id: str):
dlq_producer = KafkaProducer(producer_config)

dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])),
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic)),
DlqLimit(
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"],
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"],
Expand Down
18 changes: 8 additions & 10 deletions src/sentry/eventstream/kafka/backend.py
Expand Up @@ -7,9 +7,9 @@
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from django.conf import settings

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.eventstream.base import EventStreamEventType, GroupStates
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.killswitches import killswitch_matches_context
Expand All @@ -24,15 +24,15 @@

class KafkaEventStream(SnubaProtocolEventStream):
def __init__(self, **options: Any) -> None:
self.topic = Topic.EVENTS
self.transactions_topic = Topic.TRANSACTIONS
self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC
self.__producers: MutableMapping[Topic, Producer] = {}
self.topic = settings.KAFKA_EVENTS
self.transactions_topic = settings.KAFKA_TRANSACTIONS
self.issue_platform_topic = settings.KAFKA_EVENTSTREAM_GENERIC
self.__producers: MutableMapping[str, Producer] = {}

def get_transactions_topic(self, project_id: int) -> Topic:
def get_transactions_topic(self, project_id: int) -> str:
return self.transactions_topic

def get_producer(self, topic: Topic) -> Producer:
def get_producer(self, topic: str) -> Producer:
if topic not in self.__producers:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
Expand Down Expand Up @@ -202,11 +202,9 @@ def _send(

assert isinstance(extra_data, tuple)

real_topic = get_topic_definition(topic)["real_topic_name"]

try:
producer.produce(
topic=real_topic,
topic=topic,
key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None,
value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data),
on_delivery=self.delivery_callback,
Expand Down
7 changes: 3 additions & 4 deletions src/sentry/issues/attributes.py
Expand Up @@ -6,7 +6,7 @@

import requests
import urllib3
from arroyo import Topic as ArroyoTopic
from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.db.models import F, Window
Expand All @@ -16,7 +16,6 @@
from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.models.group import Group
from sentry.models.groupassignee import GroupAssignee
from sentry.models.groupowner import GroupOwner, GroupOwnerType
Expand Down Expand Up @@ -45,7 +44,7 @@ class GroupValues:


def _get_attribute_snapshot_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.GROUP_ATTRIBUTES)["cluster"]
cluster_name = get_topic_definition(settings.KAFKA_GROUP_ATTRIBUTES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand Down Expand Up @@ -123,7 +122,7 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None:
raise snuba.SnubaError(err)
else:
payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), [])
_attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload)
_attribute_snapshot_producer.produce(Topic(settings.KAFKA_GROUP_ATTRIBUTES), payload)


def _retrieve_group_values(group_id: int) -> GroupValues:
Expand Down
7 changes: 3 additions & 4 deletions src/sentry/issues/producer.py
Expand Up @@ -4,12 +4,11 @@
from collections.abc import MutableMapping
from typing import Any, cast

from arroyo import Topic as ArroyoTopic
from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from arroyo.types import Message, Value
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.run import process_message
from sentry.issues.status_change_message import StatusChangeMessage
Expand All @@ -34,7 +33,7 @@ class PayloadType(ValueEqualityEnum):


def _get_occurrence_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.INGEST_OCCURRENCES)["cluster"]
cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand Down Expand Up @@ -69,7 +68,7 @@ def produce_occurrence_to_kafka(
process_message(Message(Value(payload=payload, committable={})))
return

_occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload)
_occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload)


def _prepare_occurrence_message(
Expand Down
14 changes: 6 additions & 8 deletions src/sentry/monitors/tasks.py
Expand Up @@ -7,13 +7,11 @@

import msgpack
import sentry_sdk
from arroyo import Partition
from arroyo import Topic as ArroyoTopic
from arroyo import Partition, Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from confluent_kafka.admin import AdminClient, PartitionMetadata
from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.constants import ObjectStatus
from sentry.monitors.logic.mark_failed import mark_failed
from sentry.monitors.schedule import get_prev_schedule
Expand Down Expand Up @@ -52,7 +50,7 @@


def _get_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.INGEST_MONITORS)["cluster"]
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
Expand All @@ -64,10 +62,10 @@ def _get_producer() -> KafkaProducer:

@lru_cache(maxsize=None)
def _get_partitions() -> Mapping[int, PartitionMetadata]:
topic_defn = get_topic_definition(Topic.INGEST_MONITORS)
topic = topic_defn["real_topic_name"]
topic = settings.KAFKA_INGEST_MONITORS
cluster_name = get_topic_definition(topic)["cluster"]

conf = get_kafka_admin_cluster_options(topic_defn["cluster"])
conf = get_kafka_admin_cluster_options(cluster_name)
admin_client = AdminClient(conf)
result = admin_client.list_topics(topic)
topic_metadata = result.topics.get(topic)
Expand Down Expand Up @@ -205,7 +203,7 @@ def clock_pulse(current_datetime=None):
# topic. This is a requirement to ensure that none of the partitions stall,
# since the global clock is tied to the slowest partition.
for partition in _get_partitions().values():
dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id)
dest = Partition(Topic(settings.KAFKA_INGEST_MONITORS), partition.id)
_checkin_producer.produce(dest, payload)


Expand Down
5 changes: 3 additions & 2 deletions src/sentry/replays/lib/kafka.py
@@ -1,4 +1,5 @@
from sentry.conf.types.kafka_definition import Topic
from django.conf import settings

from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.pubsub import KafkaPublisher

Expand All @@ -9,7 +10,7 @@ def initialize_replays_publisher(is_async=False) -> KafkaPublisher:
global replay_publisher

if replay_publisher is None:
config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
get_kafka_producer_cluster_options(config["cluster"]),
asynchronous=is_async,
Expand Down
5 changes: 3 additions & 2 deletions src/sentry/replays/usecases/ingest/dom_index.py
Expand Up @@ -8,8 +8,9 @@
from hashlib import md5
from typing import Any, Literal, TypedDict, cast

from django.conf import settings

from sentry import features
from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.replays.usecases.ingest.events import SentryEvent
from sentry.replays.usecases.ingest.issue_creation import (
Expand Down Expand Up @@ -218,7 +219,7 @@ def _initialize_publisher() -> KafkaPublisher:
global replay_publisher

if replay_publisher is None:
config = kafka_config.get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
config = kafka_config.get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
kafka_config.get_kafka_producer_cluster_options(config["cluster"])
)
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/runner/commands/devserver.py
Expand Up @@ -366,11 +366,12 @@ def devserver(

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.batching_kafka_consumer import create_topics
from sentry.utils.kafka_config import get_topic_definition

for topic in Topic:
topic_defn = get_topic_definition(topic)
create_topics(topic_defn["cluster"], [topic_defn["real_topic_name"]])
default_name = topic.value
physical_name = settings.KAFKA_TOPIC_OVERRIDES.get(default_name, default_name)
cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[default_name]
create_topics(cluster_name, [physical_name])

if dev_consumer:
daemons.append(
Expand Down
8 changes: 3 additions & 5 deletions src/sentry/sentry_metrics/configuration.py
Expand Up @@ -10,8 +10,6 @@

import sentry_sdk

from sentry.conf.types.kafka_definition import Topic

# The maximum length of a column that is indexed in postgres. It is important to keep this in
# sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py
MAX_INDEXED_COLUMN_LENGTH = 200
Expand Down Expand Up @@ -48,7 +46,7 @@ class MetricsIngestConfiguration:
db_backend: IndexerStorage
db_backend_options: Mapping[str, Any]
input_topic: str
output_topic: Topic
output_topic: str
use_case_id: UseCaseKey
internal_metrics_tag: str | None
writes_limiter_cluster_options: Mapping[str, Any]
Expand Down Expand Up @@ -81,7 +79,7 @@ def get_ingest_config(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_METRICS,
output_topic=Topic.SNUBA_METRICS,
output_topic=settings.KAFKA_SNUBA_METRICS,
use_case_id=UseCaseKey.RELEASE_HEALTH,
internal_metrics_tag="release-health",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS,
Expand All @@ -98,7 +96,7 @@ def get_ingest_config(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
output_topic=Topic.SNUBA_GENERIC_METRICS,
output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS,
use_case_id=UseCaseKey.PERFORMANCE,
internal_metrics_tag="perf",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE,
Expand Down
3 changes: 1 addition & 2 deletions src/sentry/sentry_metrics/consumers/indexer/multiprocess.py
Expand Up @@ -10,7 +10,6 @@
from arroyo.types import Commit, FilteredPayload, Message, Partition
from confluent_kafka import Producer

from sentry.conf.types.kafka_definition import Topic
from sentry.utils import kafka_config, metrics

logger = logging.getLogger(__name__)
Expand All @@ -19,7 +18,7 @@
class SimpleProduceStep(ProcessingStep[KafkaPayload]):
def __init__(
self,
output_topic: Topic,
output_topic: str,
commit_function: Commit,
producer: AbstractProducer[KafkaPayload] | None = None,
) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/usage_accountant/accountant.py
Expand Up @@ -12,9 +12,9 @@

from arroyo.backends.abstract import Producer
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from usageaccountant import UsageAccumulator, UsageUnit

from sentry.conf.types.kafka_definition import Topic
from sentry.options import get
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand Down Expand Up @@ -71,7 +71,7 @@ def record(

if _accountant_backend is None:
cluster_name = get_topic_definition(
Topic.SHARED_RESOURCES_USAGE,
settings.KAFKA_SHARED_RESOURCES_USAGE,
)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer = KafkaProducer(
Expand Down
12 changes: 6 additions & 6 deletions src/sentry/utils/kafka_config.py
Expand Up @@ -3,7 +3,6 @@

from django.conf import settings

from sentry.conf.types.kafka_definition import Topic
from sentry.conf.types.topic_definition import TopicDefinition

SUPPORTED_KAFKA_CONFIGURATION = (
Expand Down Expand Up @@ -97,8 +96,9 @@ def get_kafka_admin_cluster_options(
)


def get_topic_definition(topic: Topic) -> TopicDefinition:
return {
"cluster": settings.KAFKA_TOPIC_TO_CLUSTER[topic.value],
"real_topic_name": settings.KAFKA_TOPIC_OVERRIDES.get(topic.value, topic.value),
}
def get_topic_definition(topic: str) -> TopicDefinition:
defn = settings.KAFKA_TOPICS.get(topic)
if defn is not None:
return defn
else:
raise ValueError(f"Unknown {topic=}")

0 comments on commit 622827e

Please sign in to comment.