New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: DLQ unprocessable messages on ingest-events #66236
Changes from all commits
7078a19
9e31d5d
9a784be
e71b3fb
f278266
c93b85c
3926556
6f630d6
ab4b4a0
52351dd
3588f20
2bf612a
6550ee1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,28 @@ | ||
import logging | ||
|
||
import msgpack | ||
import sentry_kafka_schemas | ||
from arroyo.backends.kafka.consumer import KafkaPayload | ||
from arroyo.types import Message | ||
from arroyo.dlq import InvalidMessage | ||
from arroyo.types import BrokerValue, Message | ||
|
||
from sentry.conf.types.kafka_definition import Topic | ||
from sentry.models.project import Project | ||
from sentry.utils import metrics | ||
|
||
from .processors import IngestMessage, process_event | ||
from .processors import IngestMessage, Retriable, process_event | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
consumer_type_to_default_topic = { | ||
"events": Topic.INGEST_EVENTS, | ||
"transactions": Topic.INGEST_TRANSACTIONS, | ||
"attachments": Topic.INGEST_ATTACHMENTS, | ||
"ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS, | ||
} | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not cover the attachment topic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. One step at a time. |
||
def process_simple_event_message( | ||
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool | ||
) -> None: | ||
|
@@ -36,19 +47,47 @@ def process_simple_event_message( | |
tags={"consumer": consumer_type}, | ||
unit="byte", | ||
) | ||
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) | ||
|
||
message_type = message["type"] | ||
project_id = message["project_id"] | ||
try: | ||
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) | ||
|
||
message_type = message["type"] | ||
project_id = message["project_id"] | ||
|
||
if message_type != "event": | ||
raise ValueError(f"Unsupported message type: {message_type}") | ||
if message_type != "event": | ||
raise ValueError(f"Unsupported message type: {message_type}") | ||
|
||
try: | ||
with metrics.timer("ingest_consumer.fetch_project"): | ||
project = Project.objects.get_from_cache(id=project_id) | ||
except Project.DoesNotExist: | ||
logger.exception("Project for ingested event does not exist: %s", project_id) | ||
return | ||
try: | ||
with metrics.timer("ingest_consumer.fetch_project"): | ||
project = Project.objects.get_from_cache(id=project_id) | ||
except Project.DoesNotExist: | ||
logger.exception("Project for ingested event does not exist: %s", project_id) | ||
return | ||
|
||
return process_event(message, project, reprocess_only_stuck_events) | ||
|
||
except Exception as exc: | ||
# If the retriable exception was raised, we should not DLQ | ||
if isinstance(exc, Retriable): | ||
raise | ||
|
||
# If no retriable exception was raised, check the schema to decide whether to DLQ | ||
default_topic = consumer_type_to_default_topic[consumer_type].value | ||
|
||
# TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise | ||
# the exception if it's a different topic. This can be removed once attachments and transactions | ||
# have schemas too. | ||
if default_topic != "ingest-events": | ||
raise | ||
|
||
codec = sentry_kafka_schemas.get_codec(default_topic) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this a heavy operation ? Any reason not to instantiate the codec only once ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's cached in the library after the first time |
||
|
||
try: | ||
codec.decode(raw_payload, validate=True) | ||
except Exception: | ||
raw_value = raw_message.value | ||
assert isinstance(raw_value, BrokerValue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can it be anything else ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no |
||
|
||
raise InvalidMessage(raw_value.partition, raw_value.offset) | ||
|
||
return process_event(message, project, reprocess_only_stuck_events) | ||
raise | ||
Comment on lines
+91
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we were discussing about inverting the way DLQ classifies retriable errors: Instead of putting the burden on the consumer developer to identify errors that should make us route the message to the DLQ, asking the consumer developers to identify errors that should trigger a crash and treat everything else as InvalidMessage. Is that still the case and are you planning to deal with this as a separate PR/project ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still case, but in this specific function it's very difficult because of the specific logic / external systems that are depended and things that can fail here. Practically, I think checking for both an exception + then the schema as well is the safest way in this one case. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import time | ||
from datetime import datetime | ||
from unittest.mock import Mock | ||
|
||
import msgpack | ||
import pytest | ||
from arroyo.backends.kafka import KafkaPayload | ||
from arroyo.dlq import InvalidMessage | ||
from arroyo.types import BrokerValue, Message, Partition, Topic | ||
|
||
from sentry.ingest.consumer.factory import IngestStrategyFactory | ||
from sentry.testutils.pytest.fixtures import django_db_all | ||
|
||
|
||
def make_message(payload: bytes, partition: Partition, offset: int) -> Message: | ||
return Message( | ||
BrokerValue( | ||
KafkaPayload(None, payload, []), | ||
partition, | ||
offset, | ||
datetime.now(), | ||
) | ||
) | ||
|
||
|
||
@django_db_all | ||
def test_dlq_invalid_messages(factories) -> None: | ||
organization = factories.create_organization() | ||
project = factories.create_project(organization=organization) | ||
|
||
valid_payload = msgpack.packb( | ||
{ | ||
"type": "event", | ||
"project_id": project.id, | ||
"payload": b"{}", | ||
"start_time": int(time.time()), | ||
"event_id": "aaa", | ||
} | ||
) | ||
|
||
bogus_payload = b"bogus message" | ||
|
||
partition = Partition(Topic("ingest-events"), 0) | ||
offset = 5 | ||
|
||
factory = IngestStrategyFactory( | ||
"events", | ||
reprocess_only_stuck_events=False, | ||
num_processes=1, | ||
max_batch_size=1, | ||
max_batch_time=1, | ||
input_block_size=None, | ||
output_block_size=None, | ||
) | ||
strategy = factory.create_with_partitions(Mock(), Mock()) | ||
|
||
# Valid payload raises original error | ||
with pytest.raises(Exception) as exc_info: | ||
message = make_message(valid_payload, partition, offset) | ||
strategy.submit(message) | ||
assert not isinstance(exc_info.value, InvalidMessage) | ||
|
||
# Invalid payload raises InvalidMessage error | ||
|
||
with pytest.raises(InvalidMessage) as exc_info: | ||
message = make_message(bogus_payload, partition, offset) | ||
strategy.submit(message) | ||
|
||
assert exc_info.value.partition == partition | ||
assert exc_info.value.offset == offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has the topic already been created in all environments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes