From ce17387fcf3274fec2e6b979c265f1af8f009555 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 19 Mar 2024 13:39:09 -0700 Subject: [PATCH] feat(ingest): DLQ all non-retriable messages even if schema is valid (#67200) Since ingest messages contain json bytes inside a msgpack payload the schema cannot be relied on to determine whether to DLQ, as the inner message can be invalid even if schema checking still passes. This now DLQs any exception that is not marked Retriable. --- src/sentry/ingest/consumer/processors.py | 8 +++++++- src/sentry/ingest/consumer/simple_event.py | 22 +++++----------------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index 42e20cec2d16c..465ec538eda81 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -87,7 +87,13 @@ def process_event( # keeping it around because it does provide some protection against # reprocessing good events if a single consumer is in a restart loop. deduplication_key = f"ev:{project_id}:{event_id}" - if cache.get(deduplication_key) is not None: + + try: + cached_value = cache.get(deduplication_key) + except Exception as exc: + raise Retriable(exc) + + if cached_value is not None: logger.warning( "pre-process-forwarder detected a duplicated event" " with id:%s for project:%s.", event_id, diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index e1278ab8890c0..07b09863a7552 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -1,7 +1,6 @@ import logging import msgpack -import sentry_kafka_schemas from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message @@ -71,23 +70,12 @@ def process_simple_event_message( if isinstance(exc, Retriable): raise - # If no retriable exception was raised, check the schema to decide whether to DLQ + # TODO: Remove this line once all topics (transactions, attachments, + # user feedback) also have DLQs default_topic = consumer_type_to_default_topic[consumer_type].value - - # TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise - # the exception if it's a different topic. This can be removed once attachments and transactions - # have schemas too. if default_topic != "ingest-events": raise - codec = sentry_kafka_schemas.get_codec(default_topic) - - try: - codec.decode(raw_payload, validate=True) - except Exception: - raw_value = raw_message.value - assert isinstance(raw_value, BrokerValue) - - raise InvalidMessage(raw_value.partition, raw_value.offset) - - raise + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + raise InvalidMessage(raw_value.partition, raw_value.offset) from exc