From 245881707a53b533a7bca617056c5c022f982366 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Mar 2024 10:12:39 -0700 Subject: [PATCH] feat: DLQ for all ingest consumers (#67349) Failed attachments and transactions, and user feedback messages now get dlq'ed --- src/sentry/consumers/__init__.py | 2 ++ src/sentry/ingest/consumer/simple_event.py | 15 --------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 22151044fa829f..a7eca3fe23ece8 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -277,6 +277,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "consumer_type": "attachments", }, + "dlq_topic": Topic.INGEST_ATTACHMENTS_DLQ, }, "ingest-transactions": { "topic": Topic.INGEST_TRANSACTIONS, @@ -285,6 +286,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "consumer_type": "transactions", }, + "dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ, }, "ingest-metrics": { "topic": Topic.INGEST_METRICS, diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index 07b09863a75529..fb594f11a14b35 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -5,7 +5,6 @@ from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message -from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.utils import metrics @@ -14,14 +13,6 @@ logger = logging.getLogger(__name__) -consumer_type_to_default_topic = { - "events": Topic.INGEST_EVENTS, - "transactions": Topic.INGEST_TRANSACTIONS, - "attachments": Topic.INGEST_ATTACHMENTS, - "ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS, -} - - def process_simple_event_message( raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool ) -> None: @@ -70,12 +61,6 @@ def process_simple_event_message( if isinstance(exc, Retriable): raise - # TODO: Remove this line once all topics (transactions, attachments, - # user feedback) also have DLQs - default_topic = consumer_type_to_default_topic[consumer_type].value - if default_topic != "ingest-events": - raise - raw_value = raw_message.value assert isinstance(raw_value, BrokerValue) raise InvalidMessage(raw_value.partition, raw_value.offset) from exc