Skip to content

Commit

Permalink
feat(crons): Add stub consumers for clock tick / tasks
Browse files Browse the repository at this point in the history
We will use these to process clock ticks and their associated tasks
  • Loading branch information
evanpurkhiser committed Apr 29, 2024
1 parent aed6fb4 commit 6444c3e
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3472,6 +3472,8 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"ingest-replay-recordings": "default",
"ingest-occurrences": "default",
"ingest-monitors": "default",
"monitors-clock-tick": "default",
"monitors-clock-tasks": "default",
"generic-events": "default",
"snuba-generic-events-commit-log": "default",
"group-attributes": "default",
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Topic(Enum):
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
INGEST_OCCURRENCES = "ingest-occurrences"
INGEST_MONITORS = "ingest-monitors"
MONITORS_CLOCK_TICK = "monitors-clock-tick"
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
EVENTSTREAM_GENERIC = "generic-events"
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
GROUP_ATTRIBUTES = "group-attributes"
Expand Down
8 changes: 8 additions & 0 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ def ingest_events_options() -> list[click.Option]:
"strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory",
"click_options": ingest_monitors_options(),
},
"monitors-clock-tick": {
"topic": Topic.MONITORS_CLOCK_TICK,
"strategy_factory": "sentry.monitors.consumers.clock_tick_consumer.MonitorClockTickStrategyFactory",
},
"monitors-clock-tasks": {
"topic": Topic.MONITORS_CLOCK_TASKS,
"strategy_factory": "sentry.monitors.consumers.clock_tasks_consumer.MonitorClockTasksStrategyFactory",
},
"billing-metrics-consumer": {
"topic": Topic.SNUBA_GENERIC_METRICS,
"strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory",
Expand Down
23 changes: 23 additions & 0 deletions src/sentry/monitors/consumers/clock_tasks_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

import logging
from collections.abc import Mapping

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import Commit, Partition

logger = logging.getLogger(__name__)


class MonitorClockTasksStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(self) -> None:
pass

def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return CommitOffsets(commit)
23 changes: 23 additions & 0 deletions src/sentry/monitors/consumers/clock_tick_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

import logging
from collections.abc import Mapping

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import Commit, Partition

logger = logging.getLogger(__name__)


class MonitorClockTickStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(self) -> None:
pass

def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return CommitOffsets(commit)
3 changes: 3 additions & 0 deletions src/sentry/runner/commands/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ def devserver(
kafka_consumers.add("ingest-monitors")
kafka_consumers.add("ingest-feedback-events")

kafka_consumers.add("monitors-clock-tick")
kafka_consumers.add("monitors-clock-tasks")

if settings.SENTRY_USE_PROFILING:
kafka_consumers.add("ingest-profiles")

Expand Down
3 changes: 3 additions & 0 deletions tests/sentry/conf/test_kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def test_topic_definition() -> None:
"ingest-transactions",
"profiles",
"ingest-occurrences",
"monitors-clock-pulse",
"monitors-mark-missing",
"monitors-mark-timeout",
]

for topic in Topic:
Expand Down
2 changes: 2 additions & 0 deletions tests/sentry/consumers/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def test_dlq(consumer_def) -> None:
"process-spans",
"detect-performance-issues",
"ingest-monitors",
"monitors-clock-tick",
"monitors-clock-tasks",
"metrics-last-seen-updater",
"generic-metrics-last-seen-updater",
"billing-metrics-consumer",
Expand Down

0 comments on commit 6444c3e

Please sign in to comment.