Skip to content

Commit

Permalink
feat: DLQ for all ingest consumers (#67349)
Browse files Browse the repository at this point in the history
Failed attachments and transactions, and user feedback messages now get
dlq'ed
  • Loading branch information
lynnagara committed Mar 21, 2024
1 parent c987d30 commit 2458817
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -277,6 +277,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"consumer_type": "attachments",
},
"dlq_topic": Topic.INGEST_ATTACHMENTS_DLQ,
},
"ingest-transactions": {
"topic": Topic.INGEST_TRANSACTIONS,
Expand All @@ -285,6 +286,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"consumer_type": "transactions",
},
"dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ,
},
"ingest-metrics": {
"topic": Topic.INGEST_METRICS,
Expand Down
15 changes: 0 additions & 15 deletions src/sentry/ingest/consumer/simple_event.py
Expand Up @@ -5,7 +5,6 @@
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.utils import metrics

Expand All @@ -14,14 +13,6 @@
logger = logging.getLogger(__name__)


consumer_type_to_default_topic = {
"events": Topic.INGEST_EVENTS,
"transactions": Topic.INGEST_TRANSACTIONS,
"attachments": Topic.INGEST_ATTACHMENTS,
"ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS,
}


def process_simple_event_message(
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool
) -> None:
Expand Down Expand Up @@ -70,12 +61,6 @@ def process_simple_event_message(
if isinstance(exc, Retriable):
raise

# TODO: Remove this line once all topics (transactions, attachments,
# user feedback) also have DLQs
default_topic = consumer_type_to_default_topic[consumer_type].value
if default_topic != "ingest-events":
raise

raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset) from exc

0 comments on commit 2458817

Please sign in to comment.