Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Remove dead code #67089

Merged
merged 2 commits into from Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 11 additions & 82 deletions src/sentry/runner/commands/run.py
Expand Up @@ -65,87 +65,6 @@ def convert(self, value, param, ctx):
QueueSet = QueueSetType()


def kafka_options(
consumer_group: str,
allow_force_cluster: bool = True,
include_batching_options: bool = False,
default_max_batch_size: int | None = None,
default_max_batch_time_ms: int | None = 1000,
):
"""
Basic set of Kafka options for a consumer.
"""

def inner(f):
f = click.option(
"--consumer-group",
"group_id",
default=consumer_group,
help="Kafka consumer group for the consumer.",
)(f)

f = click.option(
"--auto-offset-reset",
"auto_offset_reset",
default="latest",
type=click.Choice(["earliest", "latest", "error"]),
help="Position in the commit log topic to begin reading from when no prior offset has been recorded.",
)(f)

if include_batching_options:
f = click.option(
"--max-batch-size",
"max_batch_size",
default=default_max_batch_size,
type=int,
help="Maximum number of messages to batch before flushing.",
)(f)

f = click.option(
"--max-batch-time-ms",
"max_batch_time",
default=default_max_batch_time_ms,
type=int,
help="Maximum time (in seconds) to wait before flushing a batch.",
)(f)

if allow_force_cluster:
f = click.option(
"--force-topic",
"force_topic",
default=None,
type=str,
help="Override the Kafka topic the consumer will read from.",
)(f)

f = click.option(
"--force-cluster",
"force_cluster",
default=None,
type=str,
help="Kafka cluster ID of the overridden topic. Configure clusters via KAFKA_CLUSTERS in server settings.",
)(f)

return f

return inner


def strict_offset_reset_option():
return click.option(
"--strict-offset-reset/--no-strict-offset-reset",
default=True,
help=(
"--strict-offset-reset, the default, means that the kafka consumer "
"still errors in case the offset is out of range.\n\n"
"--no-strict-offset-reset will use the auto offset reset even in that case. "
"This is useful in development, but not desirable in production since expired "
"offsets mean data-loss.\n\n"
"Most consumers that do not have this option at all default to 'Not Strict'."
),
)


@click.group()
def run():
"Run a service."
Expand Down Expand Up @@ -412,7 +331,17 @@ def cron(**options):
type=click.Choice(["debug", "info", "warning", "error", "critical"], case_sensitive=False),
help="log level to pass to the arroyo consumer",
)
@strict_offset_reset_option()
@click.option(
"--strict-offset-reset/--no-strict-offset-reset",
default=True,
help=(
"--strict-offset-reset, the default, means that the kafka consumer "
"still errors in case the offset is out of range.\n\n"
"--no-strict-offset-reset will use the auto offset reset even in that case. "
"This is useful in development, but not desirable in production since expired "
"offsets mean data-loss.\n\n"
),
)
@configuration
def basic_consumer(consumer_name, consumer_args, topic, **options):
"""
Expand Down