Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DLQ unprocessable messages on ingest-events #66236

Merged
merged 13 commits into from Mar 15, 2024
1 change: 1 addition & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -259,6 +259,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"consumer_type": "events",
},
"dlq_topic": Topic.INGEST_EVENTS_DLQ,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has the topic already been created in all environments ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

},
"ingest-attachments": {
"topic": Topic.INGEST_ATTACHMENTS,
Expand Down
62 changes: 49 additions & 13 deletions src/sentry/ingest/consumer/simple_event.py
@@ -1,9 +1,12 @@
import logging

import msgpack
import sentry_kafka_schemas
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.types import Message
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.utils import metrics

Expand All @@ -12,6 +15,13 @@
logger = logging.getLogger(__name__)


consumer_type_to_default_topic = {
"events": Topic.INGEST_EVENTS,
"transactions": Topic.INGEST_TRANSACTIONS,
"attachments": Topic.INGEST_ATTACHMENTS,
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not cover the attachment topic.
The functions for attachments are process_attachments_and_events and decode_and_process_chunks.
I think we should cover all of them. Whether you want to do attachment in a separate PR that's ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. One step at a time.

def process_simple_event_message(
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool
) -> None:
Expand All @@ -36,19 +46,45 @@ def process_simple_event_message(
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

message_type = message["type"]
project_id = message["project_id"]
try:
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

message_type = message["type"]
project_id = message["project_id"]

if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")
if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")

try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return
try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return

return process_event(message, project, reprocess_only_stuck_events)

except Exception:
# Any exception that fails schema validation will raise InvalidMessage, which Arroyo will DLQ.
# Messages that pass schema validation will not be DLQed as they may be retriable.

default_topic = consumer_type_to_default_topic[consumer_type].value

# TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise
# the exception if it's a different topic. This can be removed once attachments and transactions
# have schemas too.
if default_topic != "ingest-events":
raise

codec = sentry_kafka_schemas.get_codec(default_topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a heavy operation ? Any reason not to instantiate the codec only once ?

Copy link
Member Author

@lynnagara lynnagara Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's cached in the library after the first time


try:
codec.decode(raw_payload, validate=True)
except Exception:
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be anything else ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no


raise InvalidMessage(raw_value.partition, raw_value.offset)

return process_event(message, project, reprocess_only_stuck_events)
raise
Comment on lines +91 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were discussing about inverting the way DLQ classifies retriable errors: Instead of putting the burden on the consumer developer to identify errors that should make us route the message to the DLQ, asking the consumer developers to identify errors that should trigger a crash and treat everything else as InvalidMessage.

Is that still the case and are you planning to deal with this as a separate PR/project ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still case, but in this specific function it's very difficult because of the specific logic / external systems that are depended and things that can fail here. Practically, I think checking for both an exception + then the schema as well is the safest way in this one case.

70 changes: 70 additions & 0 deletions tests/sentry/ingest/ingest_consumer/test_dlq.py
@@ -0,0 +1,70 @@
import time
from datetime import datetime
from unittest.mock import Mock

import msgpack
import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message, Partition, Topic

from sentry.ingest.consumer.factory import IngestStrategyFactory
from sentry.testutils.pytest.fixtures import django_db_all


def make_message(payload: bytes, partition: Partition, offset: int) -> Message:
return Message(
BrokerValue(
KafkaPayload(None, payload, []),
partition,
offset,
datetime.now(),
)
)


@django_db_all
def test_dlq_invalid_messages(factories) -> None:
organization = factories.create_organization()
project = factories.create_project(organization=organization)

valid_payload = msgpack.packb(
{
"type": "event",
"project_id": project.id,
"payload": b"{}",
"start_time": int(time.time()),
"event_id": "aaa",
}
)

bogus_payload = b"bogus message"

partition = Partition(Topic("ingest-events"), 0)
offset = 5

factory = IngestStrategyFactory(
"events",
reprocess_only_stuck_events=False,
num_processes=1,
max_batch_size=1,
max_batch_time=1,
input_block_size=None,
output_block_size=None,
)
strategy = factory.create_with_partitions(Mock(), Mock())

# Valid payload raises original error
with pytest.raises(Exception) as exc_info:
message = make_message(valid_payload, partition, offset)
strategy.submit(message)
assert not isinstance(exc_info.value, InvalidMessage)

# Invalid payload raises InvalidMessage error

with pytest.raises(InvalidMessage) as exc_info:
message = make_message(bogus_payload, partition, offset)
strategy.submit(message)

assert exc_info.value.partition == partition
assert exc_info.value.offset == offset