/
test_kafka_definition.py
62 lines (52 loc) · 2.09 KB
/
test_kafka_definition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import pytest
import sentry_kafka_schemas
from django.conf import settings
from sentry.conf.types.kafka_definition import (
ConsumerDefinition,
Topic,
validate_consumer_definition,
)
from sentry.consumers import KAFKA_CONSUMERS
from sentry.testutils.cases import TestCase
def test_topic_definition() -> None:
# All topic are registered
# TODO: Remove this once these topics are actually registered in sentry-kafka-schemas
currently_unregistered_topics = [
"outcomes-billing",
"ingest-attachments",
"ingest-transactions",
"profiles",
"ingest-occurrences",
"ingest-monitors",
]
for topic in Topic:
if topic.value not in currently_unregistered_topics:
assert sentry_kafka_schemas.get_topic(topic.value) is not None
for topic in Topic:
cluster_name = settings.KAFKA_TOPIC_TO_CLUSTER[topic.value]
assert (
cluster_name in settings.KAFKA_CLUSTERS
), f"{cluster_name} is not defined in KAFKA_CLUSTERS"
for default_topic in settings.KAFKA_TOPIC_OVERRIDES:
# Ensure all override topics are in the enum
Topic(default_topic)
assert len(Topic) == len(settings.KAFKA_TOPIC_TO_CLUSTER)
class ConsumersDefinitionTest(TestCase):
def test_exception_on_invalid_consumer_definition(self):
invalid_definitions: list[ConsumerDefinition] = [
{
"topic": Topic.INGEST_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"static_args": {
"ingest_profile": "release-health",
},
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
}
]
for invalid_definition in invalid_definitions:
with pytest.raises(ValueError):
validate_consumer_definition(invalid_definition)
def test_kafka_consumer_definition_validity(self):
for definition in KAFKA_CONSUMERS.values():
validate_consumer_definition(definition)