diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index 42e20cec2d16c..465ec538eda81 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -87,7 +87,13 @@ def process_event( # keeping it around because it does provide some protection against # reprocessing good events if a single consumer is in a restart loop. deduplication_key = f"ev:{project_id}:{event_id}" - if cache.get(deduplication_key) is not None: + + try: + cached_value = cache.get(deduplication_key) + except Exception as exc: + raise Retriable(exc) + + if cached_value is not None: logger.warning( "pre-process-forwarder detected a duplicated event" " with id:%s for project:%s.", event_id, diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index e1278ab8890c0..07b09863a7552 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -1,7 +1,6 @@ import logging import msgpack -import sentry_kafka_schemas from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message @@ -71,23 +70,12 @@ def process_simple_event_message( if isinstance(exc, Retriable): raise - # If no retriable exception was raised, check the schema to decide whether to DLQ + # TODO: Remove this line once all topics (transactions, attachments, + # user feedback) also have DLQs default_topic = consumer_type_to_default_topic[consumer_type].value - - # TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise - # the exception if it's a different topic. This can be removed once attachments and transactions - # have schemas too. if default_topic != "ingest-events": raise - codec = sentry_kafka_schemas.get_codec(default_topic) - - try: - codec.decode(raw_payload, validate=True) - except Exception: - raw_value = raw_message.value - assert isinstance(raw_value, BrokerValue) - - raise InvalidMessage(raw_value.partition, raw_value.offset) - - raise + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + raise InvalidMessage(raw_value.partition, raw_value.offset) from exc