/
kafka_config.py
104 lines (88 loc) · 3.51 KB
/
kafka_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from collections.abc import MutableMapping
from typing import Any
from django.conf import settings
from sentry.conf.types.topic_definition import TopicDefinition
SUPPORTED_KAFKA_CONFIGURATION = (
# Check https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# for the full list of available options
"bootstrap.servers",
"compression.type",
"message.max.bytes",
"sasl.mechanism",
"sasl.username",
"sasl.password",
"security.protocol",
"socket.timeout.ms",
"ssl.ca.location",
"ssl.ca.certificate.stores",
"ssl.certificate.location",
"ssl.certificate.pem",
"ssl.cipher.suites",
"ssl.crl.location",
"ssl.curves.list",
"ssl.endpoint.identification.algorithm",
"ssl.key.location",
"ssl.key.password",
"ssl.key.pem",
"ssl.keystore.location",
"ssl.keystore.password",
"ssl.sigalgs.list",
)
COMMON_SECTION = "common"
PRODUCERS_SECTION = "producers"
CONSUMERS_SECTION = "consumers"
ADMIN_SECTION = "admin"
KNOWN_SECTIONS = (COMMON_SECTION, PRODUCERS_SECTION, CONSUMERS_SECTION, ADMIN_SECTION)
def _get_legacy_kafka_cluster_options(cluster_name):
options = settings.KAFKA_CLUSTERS[cluster_name]
options = {k: v for k, v in options.items() if k not in KNOWN_SECTIONS}
if "bootstrap.servers" in options:
if isinstance(options["bootstrap.servers"], (list, tuple)):
options["bootstrap.servers"] = ",".join(options["bootstrap.servers"])
return options
def _get_kafka_cluster_options(
cluster_name, config_section, only_bootstrap=False, override_params=None
):
options = {}
custom_options = settings.KAFKA_CLUSTERS[cluster_name].get(config_section, {})
common_options = settings.KAFKA_CLUSTERS[cluster_name].get(COMMON_SECTION, {})
legacy_options = _get_legacy_kafka_cluster_options(cluster_name)
if legacy_options:
assert "bootstrap.servers" in legacy_options
if only_bootstrap:
options["bootstrap.servers"] = legacy_options["bootstrap.servers"]
else:
# producer uses all legacy_options
options.update(legacy_options)
else:
options.update(common_options)
options.update(custom_options)
# check key validity
for configuration_key in options:
if configuration_key not in SUPPORTED_KAFKA_CONFIGURATION:
raise ValueError(f"The `{configuration_key}` configuration key is not supported.")
if not isinstance(options["bootstrap.servers"], str):
raise ValueError("bootstrap.servers must be a comma separated string")
if override_params:
options.update(override_params)
return options
def get_kafka_producer_cluster_options(cluster_name):
return _get_kafka_cluster_options(cluster_name, PRODUCERS_SECTION)
def get_kafka_consumer_cluster_options(
cluster_name: str, override_params: MutableMapping[str, Any] | None = None
) -> MutableMapping[Any, Any]:
return _get_kafka_cluster_options(
cluster_name, CONSUMERS_SECTION, only_bootstrap=True, override_params=override_params
)
def get_kafka_admin_cluster_options(
cluster_name: str, override_params: MutableMapping[str, Any] | None = None
) -> MutableMapping[Any, Any]:
return _get_kafka_cluster_options(
cluster_name, ADMIN_SECTION, only_bootstrap=True, override_params=override_params
)
def get_topic_definition(topic: str) -> TopicDefinition:
defn = settings.KAFKA_TOPICS.get(topic)
if defn is not None:
return defn
else:
raise ValueError(f"Unknown {topic=}")