From 027352504687502213048d8c153dcab71d028be6 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 18 Mar 2024 15:01:40 -0700 Subject: [PATCH] feat(ingest): DLQ all messages even if schema is valid Since ingest messages contain json bytes inside a msgpack payload the schema cannot be relied on to determine whether to DLQ, as the inner message can be invalid even if schema checking still passes. This now DLQs any exception that is not marked Retriable. --- src/sentry/ingest/consumer/processors.py | 8 +++++++- src/sentry/ingest/consumer/simple_event.py | 22 +++++----------------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index 42e20cec2d16c3..465ec538eda816 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -87,7 +87,13 @@ def process_event( # keeping it around because it does provide some protection against # reprocessing good events if a single consumer is in a restart loop. deduplication_key = f"ev:{project_id}:{event_id}" - if cache.get(deduplication_key) is not None: + + try: + cached_value = cache.get(deduplication_key) + except Exception as exc: + raise Retriable(exc) + + if cached_value is not None: logger.warning( "pre-process-forwarder detected a duplicated event" " with id:%s for project:%s.", event_id, diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index e1278ab8890c06..07b09863a75529 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -1,7 +1,6 @@ import logging import msgpack -import sentry_kafka_schemas from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message @@ -71,23 +70,12 @@ def process_simple_event_message( if isinstance(exc, Retriable): raise - # If no retriable exception was raised, check the schema to decide whether to DLQ + # TODO: Remove this line once all topics (transactions, attachments, + # user feedback) also have DLQs default_topic = consumer_type_to_default_topic[consumer_type].value - - # TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise - # the exception if it's a different topic. This can be removed once attachments and transactions - # have schemas too. if default_topic != "ingest-events": raise - codec = sentry_kafka_schemas.get_codec(default_topic) - - try: - codec.decode(raw_payload, validate=True) - except Exception: - raw_value = raw_message.value - assert isinstance(raw_value, BrokerValue) - - raise InvalidMessage(raw_value.partition, raw_value.offset) - - raise + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + raise InvalidMessage(raw_value.partition, raw_value.offset) from exc