Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Remove unused settings #66552

Merged
merged 14 commits into from Mar 11, 2024
44 changes: 1 addition & 43 deletions src/sentry/conf/server.py
Expand Up @@ -3443,51 +3443,9 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
}
}

# START DEPRECATED SECTION
KAFKA_EVENTS = "events"
KAFKA_EVENTS_COMMIT_LOG = "snuba-commit-log"
KAFKA_TRANSACTIONS = "transactions"
KAFKA_TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log"
KAFKA_OUTCOMES = "outcomes"
KAFKA_OUTCOMES_BILLING = "outcomes-billing"
KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results"
KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results"
KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results"

KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results"
KAFKA_METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results"
KAFKA_INGEST_EVENTS = "ingest-events"
KAFKA_INGEST_EVENTS_DLQ = "ingest-events-dlq"
KAFKA_INGEST_ATTACHMENTS = "ingest-attachments"
KAFKA_INGEST_TRANSACTIONS = "ingest-transactions"
KAFKA_INGEST_METRICS = "ingest-metrics"
KAFKA_INGEST_METRICS_DLQ = "ingest-metrics-dlq"
KAFKA_SNUBA_METRICS = "snuba-metrics"
KAFKA_PROFILES = "profiles"
KAFKA_INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
KAFKA_INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq"
KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
KAFKA_INGEST_REPLAY_EVENTS = "ingest-replay-events"
KAFKA_INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
KAFKA_INGEST_OCCURRENCES = "ingest-occurrences"
KAFKA_INGEST_MONITORS = "ingest-monitors"
KAFKA_EVENTSTREAM_GENERIC = "generic-events"
KAFKA_GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
KAFKA_GROUP_ATTRIBUTES = "group-attributes"
KAFKA_SHARED_RESOURCES_USAGE = "shared-resources-usage"

# spans
KAFKA_SNUBA_SPANS = "snuba-spans"
# END DEPRECATED SECTION


# Mapping of default Kafka topic name to custom names
KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {
# TODO: This is temporary while we migrate between the old and new way of defining overrides.
# To be removed once this is defined in prod, along with KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS
# variable which will no longer be needed
"generic-metrics-subscription-results": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS
}
KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {}


# Mapping of default Kafka topic name to cluster name
Expand Down
4 changes: 3 additions & 1 deletion src/sentry/issues/attributes.py
Expand Up @@ -123,7 +123,9 @@ def produce_snapshot_to_kafka(snapshot: GroupAttributesSnapshot) -> None:
raise snuba.SnubaError(err)
else:
payload = KafkaPayload(None, json.dumps(snapshot).encode("utf-8"), [])
_attribute_snapshot_producer.produce(ArroyoTopic(settings.KAFKA_GROUP_ATTRIBUTES), payload)
_attribute_snapshot_producer.produce(
ArroyoTopic(get_topic_definition(Topic.GROUP_ATTRIBUTES)["real_topic_name"]), payload
)


def _retrieve_group_values(group_id: int) -> GroupValues:
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/issues/producer.py
Expand Up @@ -72,7 +72,8 @@ def produce_occurrence_to_kafka(
return

try:
_occurrence_producer.produce(ArroyoTopic(settings.KAFKA_INGEST_OCCURRENCES), payload)
topic = get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"]
_occurrence_producer.produce(ArroyoTopic(topic), payload)
except KafkaException:
logger.exception(
"Failed to send occurrence to issue platform",
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/monitors/tasks.py
Expand Up @@ -204,8 +204,9 @@ def clock_pulse(current_datetime=None):
# We create a clock-pulse (heart-beat) for EACH available partition in the
# topic. This is a requirement to ensure that none of the partitions stall,
# since the global clock is tied to the slowest partition.
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_MONITORS)["real_topic_name"])
for partition in _get_partitions().values():
dest = Partition(ArroyoTopic(settings.KAFKA_INGEST_MONITORS), partition.id)
dest = Partition(topic, partition.id)
_checkin_producer.produce(dest, payload)


Expand Down
9 changes: 4 additions & 5 deletions tests/sentry/eventstream/test_eventstream.py
Expand Up @@ -4,7 +4,6 @@
from unittest.mock import Mock, patch

import pytest
from django.conf import settings
from snuba_sdk import Column, Condition, Entity, Op, Query, Request

from sentry import nodestore
Expand Down Expand Up @@ -61,13 +60,13 @@ def __produce_event(self, *insert_args, **insert_kwargs):
produce_args, produce_kwargs = list(producer.produce.call_args)
assert not produce_args
if event_type == EventStreamEventType.Transaction:
assert produce_kwargs["topic"] == settings.KAFKA_TRANSACTIONS
assert produce_kwargs["topic"] == "transactions"
assert produce_kwargs["key"] is None
elif event_type == EventStreamEventType.Generic:
assert produce_kwargs["topic"] == settings.KAFKA_EVENTSTREAM_GENERIC
assert produce_kwargs["topic"] == "generic-events"
assert produce_kwargs["key"] is None
else:
assert produce_kwargs["topic"] == settings.KAFKA_EVENTS
assert produce_kwargs["topic"] == "events"
assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")

version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
Expand Down Expand Up @@ -242,7 +241,7 @@ def test_groupevent_occurrence_passed(self, mock_eventstream_insert):
producer = self.producer_mock
produce_args, produce_kwargs = list(producer.produce.call_args)
version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
assert produce_kwargs["topic"] == settings.KAFKA_EVENTSTREAM_GENERIC
assert produce_kwargs["topic"] == "generic-events"
assert produce_kwargs["key"] is None
assert version == 2
assert type_ == "insert"
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/monitors/test_tasks.py
Expand Up @@ -996,7 +996,7 @@ def test_timeout_with_future_complete_checkin(self, mark_checkin_timeout_mock):
).exists()


@override_settings(KAFKA_INGEST_MONITORS="monitors-test-topic")
@override_settings(KAFKA_TOPIC_OVERRIDES={"ingest-monitors": "monitors-test-topic"})
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@mock.patch("sentry.monitors.tasks._checkin_producer")
def test_clock_pulse(checkin_producer_mock):
Expand Down
Expand Up @@ -58,11 +58,6 @@ def setUp(self) -> None:
self.events_topic = f"events-{self.consumer_and_topic_suffix}"
self.commit_log_topic = f"events-commit-{self.consumer_and_topic_suffix}"
self.override_settings_cm = override_settings(
KAFKA_EVENTS=self.events_topic,
KAFKA_TRANSACTIONS=self.events_topic,
KAFKA_TOPICS={
self.events_topic: {"cluster": "default"},
},
KAFKA_TOPIC_OVERRIDES={
"events": self.events_topic,
"transactions": self.events_topic,
Expand Down
Expand Up @@ -6,7 +6,6 @@

from sentry.conf.server import (
KAFKA_CLUSTERS,
KAFKA_SNUBA_GENERIC_METRICS,
SENTRY_SLICING_CONFIG,
SENTRY_SLICING_LOGICAL_PARTITION_COUNT,
SLICED_KAFKA_TOPICS,
Expand All @@ -20,6 +19,8 @@
_validate_slicing_consumer_config,
)

KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"


@pytest.fixture
def metrics_message(org_id: int) -> Message[RoutingPayload]:
Expand Down
9 changes: 5 additions & 4 deletions tests/sentry/snuba/test_query_subscription_consumer.py
Expand Up @@ -6,11 +6,12 @@

import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition, Topic
from arroyo.types import BrokerValue, Message, Partition
from arroyo.types import Topic as ArroyoTopic
from dateutil.parser import parse as parse_date
from django.conf import settings
from sentry_kafka_schemas import get_codec

from sentry.conf.types.kafka_definition import Topic
from sentry.runner.commands.run import DEFAULT_BLOCK_SIZE
from sentry.snuba.dataset import Dataset
from sentry.snuba.models import SnubaQuery
Expand All @@ -37,7 +38,7 @@ def dataset(self):

@cached_property
def topic(self):
return settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS
return Topic.METRICS_SUBSCRIPTIONS_RESULTS.value

@cached_property
def jsoncodec(self):
Expand Down Expand Up @@ -99,7 +100,7 @@ def test_arroyo_consumer(self):
data = self.valid_wrapper
data["payload"]["subscription_id"] = sub.subscription_id
commit = mock.Mock()
partition = Partition(Topic("test"), 0)
partition = Partition(ArroyoTopic("test"), 0)
strategy = QuerySubscriptionStrategyFactory(
self.dataset.value,
1,
Expand Down
10 changes: 6 additions & 4 deletions tests/sentry/spans/consumers/process/test_factory.py
Expand Up @@ -2,12 +2,14 @@
from unittest import mock

from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition, Topic
from django.conf import settings
from arroyo.types import BrokerValue, Message, Partition
from arroyo.types import Topic as ArroyoTopic

from sentry.conf.types.kafka_definition import Topic
from sentry.spans.buffer.redis import get_redis_client
from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory
from sentry.utils import json
from sentry.utils.kafka_config import get_topic_definition


def build_mock_span(**kwargs):
Expand Down Expand Up @@ -52,7 +54,7 @@ def build_mock_message(data, topic=None):
def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
redis_client = get_redis_client()

topic = Topic(settings.KAFKA_SNUBA_SPANS)
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = ProcessSpansStrategyFactory().create_with_partitions(
commit=mock.Mock(),
Expand Down Expand Up @@ -86,7 +88,7 @@ def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
def test_second_span_in_segment_does_not_queue_task(process_segment):
redis_client = get_redis_client()

topic = Topic(settings.KAFKA_SNUBA_SPANS)
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = ProcessSpansStrategyFactory().create_with_partitions(
commit=mock.Mock(),
Expand Down