-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_valid_topic_data.py
139 lines (119 loc) · 4.75 KB
/
test_valid_topic_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from pathlib import Path
from sentry_kafka_schemas import get_codec
import fastjsonschema
from yaml import safe_load
import re
_SCHEMAS = Path(__file__).parents[2].joinpath("schemas/")
_EXAMPLES = Path(__file__).parents[2].joinpath("examples/")
_TOPICS = Path(__file__).parents[2].joinpath("topics/")
_TOPIC_SCHEMA = fastjsonschema.compile(
{
"properties": {
"topic": {"type": "string"},
"description": {"type": "string"},
"pipeline": {"type": "string"},
"services": {
"properties": {
"consumers": {
"type": "array",
"items": {"$ref": "#/definitions/Repo"},
},
"producers": {
"type": "array",
"items": {"$ref": "#/definitions/Repo"},
},
},
"required": ["consumers", "producers"],
"aditionalProperties": False,
},
"schemas": {
"type": "array",
"items": {
"properties": {
"version": {"type": "integer", "minimum": 1},
"type": {"enum": ["msgpack", "json"]},
"compatibility_mode": {"enum": ["none", "backward"]},
"resource": {"type": "string"},
"examples": {"type": "array", "items": {"type": "string"}},
},
"required": [
"version",
"type",
"compatibility_mode",
"resource",
"examples",
],
},
},
"topic_creation_config": {
"type": "object",
"additionalProperties": {"type": "string"},
},
},
"aditionalProperties": False,
"required": ["topic", "description", "services"],
"definitions": {
"Repo": {
"enum": [
# enumerate all repos here to avoid typos
"getsentry/relay",
"getsentry/sentry",
"getsentry/snuba",
"getsentry/vroom",
"getsentry/super-big-consumers",
]
}
},
}
)
def test_all_topics() -> None:
# `.` is technically also valid in Kafka but we don't allow it
# at Sentry since it can collide with `_`
valid_chars = re.compile(r"^[a-zA-Z0-9\-\_]+$")
used_schema_filepaths = set()
used_examples = set()
topics_dir = _TOPICS
for filename in topics_dir.iterdir():
if filename.suffix != ".yaml":
raise Exception(f"Invalid YAML file: {filename}")
with open(filename) as f:
topic_data = safe_load(f)
_TOPIC_SCHEMA(topic_data)
# Check valid topic name
topic_name = topic_data["topic"]
assert topic_name == filename.stem
assert valid_chars.match(topic_name)
assert len(topic_name) <= 255
# Check description provided for topic
assert topic_data["description"]
# Check every topic has an explicit, valid compression type
# Today we use lz4 everywhere, this list can be extended if needed
valid_types = ["lz4"]
assert (
topic_data["topic_creation_config"]["compression.type"] in valid_types
)
# Check valid schema versions
topic_schemas = topic_data["schemas"]
for i, schema_raw in enumerate(topic_schemas):
used_schema_filepaths.add(_SCHEMAS.joinpath(schema_raw["resource"]))
for example_path in schema_raw["examples"]:
for entry in _EXAMPLES.joinpath(example_path).rglob("*"):
if entry.is_file():
used_examples.add(entry)
assert schema_raw["version"] == i + 1
# The schema can be loaded
get_codec(filename.stem)
existing_schema_filepaths = set()
for entry in _SCHEMAS.rglob("*"):
if entry.is_file():
existing_schema_filepaths.add(entry)
unused_schema_filepaths = existing_schema_filepaths - used_schema_filepaths
# Assert that every schema file in schemas/ is referenced by a topic.
assert not unused_schema_filepaths
existing_examples = set()
for entry in _EXAMPLES.rglob("*"):
if entry.is_file():
existing_examples.add(entry)
# Assert that every example file in examples/ is referenced by a topic.
unused_examples = existing_examples - used_examples
assert not unused_examples