Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Rationalize Kafka topic config #65793

Merged
merged 48 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
40bf484
ref: Rationalize Kafka topic config
lynnagara Feb 26, 2024
112473d
add default
lynnagara Feb 26, 2024
05cfb0a
:hammer_and_wrench: apply pre-commit fixes
getsantry[bot] Feb 26, 2024
977df66
.
lynnagara Feb 26, 2024
8025712
cleanup topic autocreation
lynnagara Feb 26, 2024
3a510f2
add all the default topic names
lynnagara Feb 27, 2024
348c290
add topics
lynnagara Feb 27, 2024
9f7206e
comment no longer applies
lynnagara Feb 27, 2024
e7f1e9a
fix comment
lynnagara Feb 27, 2024
7bb9d4f
validate_schema is bool
lynnagara Feb 27, 2024
570c23d
Merge remote-tracking branch 'origin/master' into kafka-topics-cleanup
lynnagara Feb 27, 2024
5635bee
ref: Fix billing kafka config
lynnagara Feb 28, 2024
4880de4
remove more special casing
lynnagara Feb 28, 2024
334b83d
more cleanup of special cased things
lynnagara Feb 28, 2024
46d0259
Merge branch 'fix-billing-kafka-config' into kafka-topics-cleanup
lynnagara Feb 28, 2024
4f82e7e
fix types
lynnagara Feb 28, 2024
855805d
add the topic cluster mapping
lynnagara Feb 28, 2024
12f1f90
devserver uses the new way
lynnagara Feb 28, 2024
5307df7
test
lynnagara Feb 28, 2024
d6f8cda
fix
lynnagara Feb 28, 2024
0796771
build it in a more backward compatible way
lynnagara Feb 28, 2024
679da09
revert for backward compatibility
lynnagara Feb 28, 2024
b8cd48b
Merge remote-tracking branch 'origin/master' into kafka-topics-cleanup
lynnagara Feb 29, 2024
9f7bb57
Merge remote-tracking branch 'origin/master' into kafka-topics-cleanup
lynnagara Feb 29, 2024
b1eb875
fix type
lynnagara Feb 29, 2024
b428a8a
remove stray topic_override
lynnagara Feb 29, 2024
1b219a7
check cluster name exists
lynnagara Feb 29, 2024
2c41229
handle generic metrics subscriptions
lynnagara Feb 29, 2024
c6d7aa9
update consumer
lynnagara Feb 29, 2024
2421da9
comment
lynnagara Feb 29, 2024
7014e88
update test
lynnagara Feb 29, 2024
6430eb5
fix variable naming
lynnagara Feb 29, 2024
2888ea3
fix import naming
lynnagara Feb 29, 2024
37e066e
fix type for real
lynnagara Feb 29, 2024
f19d92d
put kafka stuff in a single file to work around import restriction
lynnagara Feb 29, 2024
5bfbbcf
Merge branch 'master' into kafka-topics-cleanup
lynnagara Feb 29, 2024
99e16df
fix more imports
lynnagara Feb 29, 2024
1668a6c
add more comments
lynnagara Feb 29, 2024
71a444c
actually define real_topic
lynnagara Feb 29, 2024
9b92df9
update test
lynnagara Feb 29, 2024
95c3cce
Merge remote-tracking branch 'origin/master' into kafka-topics-cleanup
lynnagara Mar 1, 2024
b4e867a
add new ingest-events-dlq topic
lynnagara Mar 1, 2024
b8e31eb
fix typo, add test
lynnagara Mar 1, 2024
0daa510
add docstring
lynnagara Mar 1, 2024
297db51
don't touch file unnecessarily
lynnagara Mar 1, 2024
ffbb122
add test
lynnagara Mar 1, 2024
59f93e9
yikes
lynnagara Mar 1, 2024
1a4f857
type checker
lynnagara Mar 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/sentry/conf/server.py
Expand Up @@ -3458,16 +3458,17 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
# spans
KAFKA_SNUBA_SPANS = "snuba-spans"

KAFKA_SUBSCRIPTION_RESULT_TOPICS = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into #66078

"events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"generic-metrics": KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"sessions": KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"metrics": KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
# Mapping of default Kafka topic name to custom names
KAFKA_TOPIC_OVERRIDES: Mapping[str, str] = {}

# Mapping of default Kafka topic name to broker config
KAFKA_TOPIC_TO_CLUSTER: Mapping[str, Mapping[str, TopicDefinition | None]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In snuba these are called KAFKA_TOPIC_MAP and KAFKA_BROKER_CONFIG.
I think there would be value in consolidate the naming as well. It reduces the cognitive overhead when managing moving from one config (snuba) to the other (sentry).

Any chance we can name them in the same way? Even if this naming is more appropriate than the Snuba one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally named it differently because it is not the same as Snuba at all. Here we map the topic to a cluster definition. Snuba does not go through any cluster definition and all the topic configs are mapped to the broker config. I think we might want to align them in the future but I'm not yet sure which way is better, and I want to avoid making this huge change in this PR right now.

{"cluster": "default"}
}


# Cluster configuration for each Kafka topic by name.
# Deprecated
KAFKA_TOPICS: Mapping[str, TopicDefinition | None] = {
KAFKA_EVENTS: {"cluster": "default"},
KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
Expand Down Expand Up @@ -3510,9 +3511,6 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"},
}


# If True, consumers will create the topics if they don't exist
KAFKA_CONSUMER_AUTO_CREATE_TOPICS = True
fpacifici marked this conversation as resolved.
Show resolved Hide resolved
# If True, sentry.utils.arroyo.RunTaskWithMultiprocessing will actually be
# single-threaded under the hood for performance
KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING = False
Expand Down
10 changes: 8 additions & 2 deletions src/sentry/conf/types/consumer_definition.py
Expand Up @@ -5,11 +5,17 @@

import click

from sentry.conf.types.topic_definition import Topic


class ConsumerDefinition(TypedDict, total=False):
# Which logical topic from settings to use.
topic: Required[str | Callable[[], str]]
default_topic: str
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default_topic is replaced by validate_schema since it was only actually controlling whether schema validation was run or not

topic: Topic

# Override topic. To be deprecated
topic_override: Required[str | Callable[[], str]]

validate_schema: str
lynnagara marked this conversation as resolved.
Show resolved Hide resolved

strategy_factory: Required[str]

Expand Down
36 changes: 34 additions & 2 deletions src/sentry/conf/types/topic_definition.py
@@ -1,7 +1,39 @@
from __future__ import annotations

from enum import Enum
from typing import TypedDict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we call this kafka_topic_definition ?



class Topic(Enum):
EVENTS = "events"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice I like this.

EVENTS_COMMIT_LOG = "snuba-commit-log"
TRANSACTIONS = "transactions"
TRANSACTIONS_COMMIT_LOG = "snuba-transactions-commit-log"
OUTCOMES = "outcomes"
OUTCOMES_BILLING = "outcomes-billing"
EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results"
TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results"
GENERIC_METRICS_SUBSCRIPTIONS_RESULTS = "generic-metrics-subscription-results"
SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results"
METRICS_SUBSCRIPTIONS_RESULTS = "metrics-subscription-results"
INGEST_EVENTS = "ingest-events"
INGEST_ATTACHMENTS = "ingest-attachments"
INGEST_TRANSACTIONS = "ingest-transactions"
INGEST_METRICS = "ingest-metrics"
INGEST_METRICS_DLQ = "ingest-metrics-dlq"
SNUBA_METRICS = "snuba-metrics"
PROFILES = "profiles"
INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq"
SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
INGEST_REPLAY_EVENTS = "ingest-replay-events"
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
INGEST_OCCURRENCES = "ingest-occurrences"
INGEST_MONITORS = "ingest-monitors"
EVENTSTREAM_GENERIC = "generic-events"
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"


class TopicDefinition(TypedDict):
cluster: str
81 changes: 52 additions & 29 deletions src/sentry/consumers/__init__.py
Expand Up @@ -14,6 +14,7 @@
from django.conf import settings

from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition
from sentry.conf.types.topic_definition import Topic
from sentry.consumers.validate_schema import ValidateSchema
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -163,109 +164,122 @@ def ingest_monitors_options() -> list[click.Option]:
]

# consumer name -> consumer definition
# XXX: default_topic is needed to lookup the schema even if the actual topic name has been
# overridden. This is because the current topic override mechanism means the default topic name
# is no longer available anywhere in code. We should probably fix this later so we don't need both
# "topic" and "default_topic" here though.
# XXX: topic_override will be deprecated here in favor of KAFKA_TOPIC_OVERRIDE once all overrides in
# prod have been migrated
KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = {
"ingest-profiles": {
"topic": settings.KAFKA_PROFILES,
"topic": Topic.PROFILES,
"topic_override": settings.KAFKA_PROFILES,
lynnagara marked this conversation as resolved.
Show resolved Hide resolved
"strategy_factory": "sentry.profiles.consumers.process.factory.ProcessProfileStrategyFactory",
},
"ingest-replay-recordings": {
"topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS,
"topic": Topic.EVENTS,
"topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS,
"strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory",
"click_options": ingest_replay_recordings_options(),
},
"ingest-replay-recordings-buffered": {
"topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS,
"topic": Topic.INGEST_REPLAYS_RECORDINGS,
"topic_override": settings.KAFKA_INGEST_REPLAYS_RECORDINGS,
"strategy_factory": "sentry.replays.consumers.recording_buffered.RecordingBufferedStrategyFactory",
"click_options": ingest_replay_recordings_buffered_options(),
},
"ingest-monitors": {
"topic": settings.KAFKA_INGEST_MONITORS,
"topic": Topic.INGEST_MONITORS,
"topic_override": settings.KAFKA_INGEST_MONITORS,
"strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory",
"click_options": ingest_monitors_options(),
},
"billing-metrics-consumer": {
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
"topic": Topic.SNUBA_GENERIC_METRICS,
"topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory",
},
# Known differences to 'sentry run occurrences-ingest-consumer':
# - ingest_consumer_types metric tag is missing. Use the kafka_topic and
# group_id tags provided by run_basic_consumer instead
"ingest-occurrences": {
"topic": settings.KAFKA_INGEST_OCCURRENCES,
"topic": Topic.INGEST_OCCURRENCES,
"topic_override": settings.KAFKA_INGEST_OCCURRENCES,
"strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=20),
},
"events-subscription-results": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.EVENTS_SUBSCRIPTIONS_RESULTS,
"topic_override": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
},
},
"transactions-subscription-results": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"topic_override": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
},
},
"generic-metrics-subscription-results": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"default_topic": "generic-metrics-subscription-results",
"topic": Topic.GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific reason this is the only topic being set via the Topic enum ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give the same answer as above - two reasons:

  • demonstration of how this works
  • it is the only topic that has schema validation turned on. In order to keep that happening, we need to know both the default and override topic there so it must use the new system.

"topic_override": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
"validate_schema": True,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
},
},
"sessions-subscription-results": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.SESSIONS_SUBSCRIPTIONS_RESULTS,
"topic_override": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(),
"static_args": {
"topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
},
},
"metrics-subscription-results": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
"topic": Topic.METRICS_SUBSCRIPTIONS_RESULTS,
"topic_override": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
"strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
},
},
"ingest-events": {
"topic": settings.KAFKA_INGEST_EVENTS,
"topic": Topic.INGEST_EVENTS,
"topic_override": settings.KAFKA_INGEST_EVENTS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"consumer_type": "events",
},
},
"ingest-attachments": {
"topic": settings.KAFKA_INGEST_ATTACHMENTS,
"topic": Topic.INGEST_ATTACHMENTS,
"topic_override": settings.KAFKA_INGEST_ATTACHMENTS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"consumer_type": "attachments",
},
},
"ingest-transactions": {
"topic": settings.KAFKA_INGEST_TRANSACTIONS,
"topic": Topic.INGEST_TRANSACTIONS,
"topic_override": settings.KAFKA_INGEST_TRANSACTIONS,
"strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"static_args": {
"consumer_type": "transactions",
},
},
"ingest-metrics": {
"topic": settings.KAFKA_INGEST_METRICS,
"topic": Topic.INGEST_METRICS,
"topic_override": settings.KAFKA_INGEST_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"click_options": _METRICS_INDEXER_OPTIONS,
"static_args": {
Expand All @@ -276,7 +290,8 @@ def ingest_monitors_options() -> list[click.Option]:
"dlq_max_consecutive_count": 1000,
},
"ingest-generic-metrics": {
"topic": settings.KAFKA_INGEST_PERFORMANCE_METRICS,
"topic": Topic.INGEST_PERFORMANCE_METRICS,
"topic_override": settings.KAFKA_INGEST_PERFORMANCE_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"click_options": _METRICS_INDEXER_OPTIONS,
"static_args": {
Expand All @@ -287,46 +302,53 @@ def ingest_monitors_options() -> list[click.Option]:
"dlq_max_consecutive_count": 1000,
},
"generic-metrics-last-seen-updater": {
"topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
"topic": Topic.SNUBA_GENERIC_METRICS,
"topic_override": settings.KAFKA_SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory",
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS,
"static_args": {
"ingest_profile": "performance",
},
},
"metrics-last-seen-updater": {
"topic": settings.KAFKA_SNUBA_METRICS,
"topic": Topic.SNUBA_METRICS,
"topic_override": settings.KAFKA_SNUBA_METRICS,
"strategy_factory": "sentry.sentry_metrics.consumers.last_seen_updater.LastSeenUpdaterStrategyFactory",
"click_options": _METRICS_LAST_SEEN_UPDATER_OPTIONS,
"static_args": {
"ingest_profile": "release-health",
},
},
"post-process-forwarder-issue-platform": {
"topic": settings.KAFKA_EVENTSTREAM_GENERIC,
"topic": Topic.EVENTSTREAM_GENERIC,
"topic_override": settings.KAFKA_EVENTSTREAM_GENERIC,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-generic-events-commit-log",
"synchronize_commit_group_default": "generic_events_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"post-process-forwarder-transactions": {
"topic": settings.KAFKA_TRANSACTIONS,
"topic": Topic.TRANSACTIONS,
"topic_override": settings.KAFKA_TRANSACTIONS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-transactions-commit-log",
"synchronize_commit_group_default": "transactions_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"post-process-forwarder-errors": {
"topic": settings.KAFKA_EVENTS,
"topic": Topic.EVENTS,
"topic_override": settings.KAFKA_EVENTS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-commit-log",
"synchronize_commit_group_default": "snuba-consumers",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
},
"process-spans": {
"topic": settings.KAFKA_SNUBA_SPANS,
"topic": Topic.SNUBA_SPANS,
"topic_override": settings.KAFKA_SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
# TODO: This has to be co-ordinated with getsentry
**settings.SENTRY_KAFKA_CONSUMERS,
}

Expand Down Expand Up @@ -396,6 +418,7 @@ def get_stream_processor(

from sentry.utils import kafka_config

# Still supported for now until we have moved everything in prod over
topic_def = settings.KAFKA_TOPICS[logical_topic]
assert topic_def is not None
if cluster is None:
Expand Down Expand Up @@ -461,8 +484,8 @@ def build_consumer_config(group_id: str):
"--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer"
)

# Validate schema if "default_topic" is set
default_topic = consumer_definition.get("default_topic")
# Validate schema if "validate_schema" is set
default_topic = consumer_definition.get("validate_schema")
if default_topic:
strategy_factory = ValidateSchemaStrategyFactoryWrapper(
default_topic, validate_schema, strategy_factory
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/runner/commands/devserver.py
Expand Up @@ -368,7 +368,7 @@ def devserver(

for topic_name, topic_data in settings.KAFKA_TOPICS.items():
if topic_data is not None:
create_topics(topic_data["cluster"], [topic_name], force=True)
create_topics(topic_data["cluster"], [topic_name])

if dev_consumer:
daemons.append(
Expand Down
9 changes: 3 additions & 6 deletions src/sentry/utils/batching_kafka_consumer.py
Expand Up @@ -3,7 +3,6 @@

from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient
from django.conf import settings

from sentry.utils import kafka_config

Expand Down Expand Up @@ -53,8 +52,6 @@ def create_topics(cluster_name: str, topics: list[str], force: bool = False) ->

topics must be from the same cluster.
"""
if settings.KAFKA_CONSUMER_AUTO_CREATE_TOPICS or force:
conf = kafka_config.get_kafka_admin_cluster_options(cluster_name)
# Topics are implicitly created here
admin_client = AdminClient(conf)
wait_for_topics(admin_client, topics)
conf = kafka_config.get_kafka_admin_cluster_options(cluster_name)
lynnagara marked this conversation as resolved.
Show resolved Hide resolved
admin_client = AdminClient(conf)
wait_for_topics(admin_client, topics)