-
-
Notifications
You must be signed in to change notification settings - Fork 4k
/
consumer_definition.py
39 lines (28 loc) · 1.23 KB
/
consumer_definition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from __future__ import annotations
from collections.abc import Callable, Mapping, Sequence
from typing import Any, Required, TypedDict
import click
class ConsumerDefinition(TypedDict, total=False):
# Which logical topic from settings to use.
topic: Required[str | Callable[[], str]]
default_topic: str
strategy_factory: Required[str]
# Additional CLI options the consumer should accept. These arguments are
# passed as kwargs to the strategy_factory.
click_options: Sequence[click.Option]
# Hardcoded additional kwargs for strategy_factory
static_args: Mapping[str, Any]
require_synchronization: bool
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str
dlq_topic: str
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None
def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None:
if "dlq_topic" not in consumer_definition and (
"dlq_max_invalid_ratio" in consumer_definition
or "dlq_max_consecutive_count" in consumer_definition
):
raise ValueError(
"Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not"
)