Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): DLQ all non-retriable messages even if schema is valid #67200

Merged
merged 1 commit into from Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/sentry/ingest/consumer/processors.py
Expand Up @@ -87,7 +87,13 @@ def process_event(
# keeping it around because it does provide some protection against
# reprocessing good events if a single consumer is in a restart loop.
deduplication_key = f"ev:{project_id}:{event_id}"
if cache.get(deduplication_key) is not None:

try:
cached_value = cache.get(deduplication_key)
except Exception as exc:
raise Retriable(exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's additional calls to redis and celery below (event_processing_store.store(data), task.delay in submit_symbolicate). Should these also be marked as Retriable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should already be wrapped in Retriable -- did this in an earlier PR. Just spotted this one I forgot earlier.


if cached_value is not None:
logger.warning(
"pre-process-forwarder detected a duplicated event" " with id:%s for project:%s.",
event_id,
Expand Down
22 changes: 5 additions & 17 deletions src/sentry/ingest/consumer/simple_event.py
@@ -1,7 +1,6 @@
import logging

import msgpack
import sentry_kafka_schemas
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message
Expand Down Expand Up @@ -71,23 +70,12 @@ def process_simple_event_message(
if isinstance(exc, Retriable):
raise

# If no retriable exception was raised, check the schema to decide whether to DLQ
# TODO: Remove this line once all topics (transactions, attachments,
# user feedback) also have DLQs
default_topic = consumer_type_to_default_topic[consumer_type].value

# TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise
# the exception if it's a different topic. This can be removed once attachments and transactions
# have schemas too.
if default_topic != "ingest-events":
raise

codec = sentry_kafka_schemas.get_codec(default_topic)

try:
codec.decode(raw_payload, validate=True)
except Exception:
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)

raise InvalidMessage(raw_value.partition, raw_value.offset)

raise
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset) from exc