diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index ffdeb801d764e5..05a53ee8467b52 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -277,6 +277,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "consumer_type": "attachments", }, + "dlq_topic": Topic.INGEST_ATTACHMENTS_DLQ, }, "ingest-transactions": { "topic": Topic.INGEST_TRANSACTIONS, @@ -285,6 +286,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "consumer_type": "transactions", }, + "dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ, }, "ingest-metrics": { "topic": Topic.INGEST_METRICS, diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index 07b09863a75529..fb594f11a14b35 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -5,7 +5,6 @@ from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message -from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.utils import metrics @@ -14,14 +13,6 @@ logger = logging.getLogger(__name__) -consumer_type_to_default_topic = { - "events": Topic.INGEST_EVENTS, - "transactions": Topic.INGEST_TRANSACTIONS, - "attachments": Topic.INGEST_ATTACHMENTS, - "ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS, -} - - def process_simple_event_message( raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool ) -> None: @@ -70,12 +61,6 @@ def process_simple_event_message( if isinstance(exc, Retriable): raise - # TODO: Remove this line once all topics (transactions, attachments, - # user feedback) also have DLQs - default_topic = consumer_type_to_default_topic[consumer_type].value - if default_topic != "ingest-events": - raise - raw_value = raw_message.value assert isinstance(raw_value, BrokerValue) raise InvalidMessage(raw_value.partition, raw_value.offset) from exc