Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DLQ unprocessable messages on ingest-events #66236

Merged
merged 13 commits into from Mar 15, 2024
1 change: 1 addition & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -270,6 +270,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"consumer_type": "events",
},
"dlq_topic": settings.KAFKA_INGEST_EVENTS_DLQ,
},
"ingest-attachments": {
"topic": settings.KAFKA_INGEST_ATTACHMENTS,
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/ingest/consumer/processors.py
Expand Up @@ -32,6 +32,10 @@
IngestMessage = Mapping[str, Any]


class Retriable(Exception):
pass


def trace_func(**span_kwargs):
def wrapper(f):
@functools.wraps(f)
Expand Down
60 changes: 39 additions & 21 deletions src/sentry/ingest/consumer/simple_event.py
Expand Up @@ -2,12 +2,13 @@

import msgpack
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.types import Message
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.models.project import Project
from sentry.utils import metrics

from .processors import IngestMessage, process_event
from .processors import IngestMessage, Retriable, process_event

logger = logging.getLogger(__name__)

Expand All @@ -29,26 +30,43 @@ def process_simple_event_message(
`symbolicate_event` or `process_event`.
"""

raw_payload = raw_message.payload.value
metrics.distribution(
"ingest_consumer.payload_size",
len(raw_payload),
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)
try:
raw_payload = raw_message.payload.value
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none of this code changed, it's just indented in try/except block now

metrics.distribution(
"ingest_consumer.payload_size",
len(raw_payload),
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

message_type = message["type"]
project_id = message["project_id"]
message_type = message["type"]
project_id = message["project_id"]

if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")
if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")

try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return
try:
try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return

return process_event(message, project, reprocess_only_stuck_events)
except Exception as exc:
raise Retriable(exc)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be safe and avoid dlqing anything potentially retriable, we don't dlq any exception from the process_event function


return process_event(message, project, reprocess_only_stuck_events)
except Exception as exc:
# Non retriable exceptions raise InvalidMessage, which Arroyo will DLQ.
Copy link
Member

@untitaker untitaker Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure this makes a ton of sense. why put retrieable errors in the DLQ if nobody can retry those DLQ items by consuming that dlq?

Copy link
Member Author

@lynnagara lynnagara Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the non-retriable errors not retriable ones that go in the DLQ, so the consumer does not get stuck on those. The retriable errors are things that might be caused by network issues, temporary unavailability of database, even a deploy, etc and should not be DLQed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's what I mean. IMO retriable errors should go into the DLQ as well. the DLQ is meant to be replayed.

Copy link
Member Author

@lynnagara lynnagara Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in an ideal world we'd do that eventually but I'm not sure if we're there yet, and we haven't tried it anywhere. IMO inspection of events and replaying needs to be built so it can be done in a really fast and easy manner before taking this step.

Today DLQing everything in the case of temporary blips might cause a longer and more manual recovery period and comes with it's own risks that I haven't fully thought through yet.

Let's do it as a follow up later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would INC-660 messages have been marked as Retriable or InvalidMessage in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the meantime, I'm going to work on defining a schema for this topic so we can use that to determine what is valid or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what you have here seems good to start with, just that we should continue thinking about how to turn it into a general purpose failure handling tool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree this is not an end state. Just want to get DLQ in place in and we can tweak what actually gets DLQed later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During INC-660, we crashed inside process_event, here:

cache_key = event_processing_store.store(data)

See this sentry error.

The kafka message itself was populated, but the event JSON inside was empty.

@lynnagara How often does the consumer currently crash because of temporary network outages? If it's rare enough (say, once per month), I would personally go ahead and DLQ all exceptions.

I do believe this PR is an improvement (any DLQ is better than none), just pointing out that it would not have helped with INC-660.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I realised it's going to be a lot of work to figure out what's retriable and what isn't on a consumer by consumer basis. So I rewrote this to check against the schema whenever an exception is raised to decide whether to DLQ it or not. Since there wasn't a schema for ingest-events, I added one here. getsentry/sentry-kafka-schemas#230

# The consumer will crash on any potentially retriable exceptions.
# A better approach would be to instead validate the schema here and only
# DLQ messages that fail schema validation, but this topic has no schema
# defined currently.
if not isinstance(exc, Retriable):
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be anything else ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

raise InvalidMessage(raw_value.partition, raw_value.offset)
else:
raise