Skip to content

Commit

Permalink
feat(ingest): DLQ all non-retriable messages even if schema is valid (#…
Browse files Browse the repository at this point in the history
…67200)

Since ingest messages contain json bytes inside a msgpack payload the
schema cannot be relied on to determine whether to DLQ, as the inner
message can be invalid even if schema checking still passes. This now
DLQs any exception that is not marked Retriable.
  • Loading branch information
lynnagara committed Mar 19, 2024
1 parent 871522d commit ce17387
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
8 changes: 7 additions & 1 deletion src/sentry/ingest/consumer/processors.py
Expand Up @@ -87,7 +87,13 @@ def process_event(
# keeping it around because it does provide some protection against
# reprocessing good events if a single consumer is in a restart loop.
deduplication_key = f"ev:{project_id}:{event_id}"
if cache.get(deduplication_key) is not None:

try:
cached_value = cache.get(deduplication_key)
except Exception as exc:
raise Retriable(exc)

if cached_value is not None:
logger.warning(
"pre-process-forwarder detected a duplicated event" " with id:%s for project:%s.",
event_id,
Expand Down
22 changes: 5 additions & 17 deletions src/sentry/ingest/consumer/simple_event.py
@@ -1,7 +1,6 @@
import logging

import msgpack
import sentry_kafka_schemas
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message
Expand Down Expand Up @@ -71,23 +70,12 @@ def process_simple_event_message(
if isinstance(exc, Retriable):
raise

# If no retriable exception was raised, check the schema to decide whether to DLQ
# TODO: Remove this line once all topics (transactions, attachments,
# user feedback) also have DLQs
default_topic = consumer_type_to_default_topic[consumer_type].value

# TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise
# the exception if it's a different topic. This can be removed once attachments and transactions
# have schemas too.
if default_topic != "ingest-events":
raise

codec = sentry_kafka_schemas.get_codec(default_topic)

try:
codec.decode(raw_payload, validate=True)
except Exception:
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)

raise InvalidMessage(raw_value.partition, raw_value.offset)

raise
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset) from exc

0 comments on commit ce17387

Please sign in to comment.