New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: DLQ unprocessable messages on ingest-events #66236
Changes from 1 commit
7078a19
9e31d5d
9a784be
e71b3fb
f278266
c93b85c
3926556
6f630d6
ab4b4a0
52351dd
3588f20
2bf612a
6550ee1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -2,12 +2,13 @@ | |||
|
||||
import msgpack | ||||
from arroyo.backends.kafka.consumer import KafkaPayload | ||||
from arroyo.types import Message | ||||
from arroyo.dlq import InvalidMessage | ||||
from arroyo.types import BrokerValue, Message | ||||
|
||||
from sentry.models.project import Project | ||||
from sentry.utils import metrics | ||||
|
||||
from .processors import IngestMessage, process_event | ||||
from .processors import IngestMessage, Retriable, process_event | ||||
|
||||
logger = logging.getLogger(__name__) | ||||
|
||||
|
@@ -29,26 +30,43 @@ def process_simple_event_message( | |||
`symbolicate_event` or `process_event`. | ||||
""" | ||||
|
||||
raw_payload = raw_message.payload.value | ||||
metrics.distribution( | ||||
"ingest_consumer.payload_size", | ||||
len(raw_payload), | ||||
tags={"consumer": consumer_type}, | ||||
unit="byte", | ||||
) | ||||
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) | ||||
try: | ||||
raw_payload = raw_message.payload.value | ||||
metrics.distribution( | ||||
"ingest_consumer.payload_size", | ||||
len(raw_payload), | ||||
tags={"consumer": consumer_type}, | ||||
unit="byte", | ||||
) | ||||
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) | ||||
|
||||
message_type = message["type"] | ||||
project_id = message["project_id"] | ||||
message_type = message["type"] | ||||
project_id = message["project_id"] | ||||
|
||||
if message_type != "event": | ||||
raise ValueError(f"Unsupported message type: {message_type}") | ||||
if message_type != "event": | ||||
raise ValueError(f"Unsupported message type: {message_type}") | ||||
|
||||
try: | ||||
with metrics.timer("ingest_consumer.fetch_project"): | ||||
project = Project.objects.get_from_cache(id=project_id) | ||||
except Project.DoesNotExist: | ||||
logger.exception("Project for ingested event does not exist: %s", project_id) | ||||
return | ||||
try: | ||||
try: | ||||
with metrics.timer("ingest_consumer.fetch_project"): | ||||
project = Project.objects.get_from_cache(id=project_id) | ||||
except Project.DoesNotExist: | ||||
logger.exception("Project for ingested event does not exist: %s", project_id) | ||||
return | ||||
|
||||
return process_event(message, project, reprocess_only_stuck_events) | ||||
except Exception as exc: | ||||
raise Retriable(exc) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to be safe and avoid dlqing anything potentially retriable, we don't dlq any exception from the |
||||
|
||||
return process_event(message, project, reprocess_only_stuck_events) | ||||
except Exception as exc: | ||||
# Non retriable exceptions raise InvalidMessage, which Arroyo will DLQ. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure this makes a ton of sense. why put retrieable errors in the DLQ if nobody can retry those DLQ items by consuming that dlq? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the non-retriable errors not retriable ones that go in the DLQ, so the consumer does not get stuck on those. The retriable errors are things that might be caused by network issues, temporary unavailability of database, even a deploy, etc and should not be DLQed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's what I mean. IMO retriable errors should go into the DLQ as well. the DLQ is meant to be replayed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in an ideal world we'd do that eventually but I'm not sure if we're there yet, and we haven't tried it anywhere. IMO inspection of events and replaying needs to be built so it can be done in a really fast and easy manner before taking this step. Today DLQing everything in the case of temporary blips might cause a longer and more manual recovery period and comes with it's own risks that I haven't fully thought through yet. Let's do it as a follow up later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would INC-660 messages have been marked as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the meantime, I'm going to work on defining a schema for this topic so we can use that to determine what is valid or not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, what you have here seems good to start with, just that we should continue thinking about how to turn it into a general purpose failure handling tool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, agree this is not an end state. Just want to get DLQ in place in and we can tweak what actually gets DLQed later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. During INC-660, we crashed inside
See this sentry error. The kafka message itself was populated, but the event JSON inside was empty. @lynnagara How often does the consumer currently crash because of temporary network outages? If it's rare enough (say, once per month), I would personally go ahead and DLQ all exceptions. I do believe this PR is an improvement (any DLQ is better than none), just pointing out that it would not have helped with INC-660. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I realised it's going to be a lot of work to figure out what's retriable and what isn't on a consumer by consumer basis. So I rewrote this to check against the schema whenever an exception is raised to decide whether to DLQ it or not. Since there wasn't a schema for ingest-events, I added one here. getsentry/sentry-kafka-schemas#230 |
||||
# The consumer will crash on any potentially retriable exceptions. | ||||
# A better approach would be to instead validate the schema here and only | ||||
# DLQ messages that fail schema validation, but this topic has no schema | ||||
# defined currently. | ||||
if not isinstance(exc, Retriable): | ||||
raw_value = raw_message.value | ||||
assert isinstance(raw_value, BrokerValue) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can it be anything else ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no |
||||
raise InvalidMessage(raw_value.partition, raw_value.offset) | ||||
else: | ||||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none of this code changed, it's just indented in try/except block now