From 497d057f2a39b026e75109769457014b45a7ff37 Mon Sep 17 00:00:00 2001 From: Andrew Liu <159852527+aliu3ntry@users.noreply.github.com> Date: Mon, 25 Mar 2024 10:38:46 -0700 Subject: [PATCH] test(ingest): add DLQ tests for txns and attachments and DLQ support for attachments (#67495) --- .../ingest/consumer/attachment_event.py | 26 ++++++++++++++----- .../sentry/ingest/ingest_consumer/test_dlq.py | 18 ++++++++++--- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/sentry/ingest/consumer/attachment_event.py b/src/sentry/ingest/consumer/attachment_event.py index 78a6ccaec7184b..fcaa0bc5ccae89 100644 --- a/src/sentry/ingest/consumer/attachment_event.py +++ b/src/sentry/ingest/consumer/attachment_event.py @@ -4,13 +4,15 @@ import msgpack from arroyo.backends.kafka.consumer import KafkaPayload -from arroyo.types import Message +from arroyo.dlq import InvalidMessage +from arroyo.types import BrokerValue, Message from sentry.models.project import Project from sentry.utils import metrics from .processors import ( IngestMessage, + Retriable, process_attachment_chunk, process_event, process_individual_attachment, @@ -37,14 +39,24 @@ def decode_and_process_chunks( tags={"consumer": consumer_type}, unit="byte", ) - message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) - if message["type"] == "attachment_chunk": - if not reprocess_only_stuck_events: - process_attachment_chunk(message) - return None + try: + message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) + + if message["type"] == "attachment_chunk": + if not reprocess_only_stuck_events: + process_attachment_chunk(message) + return None + + return message + except Exception as exc: + # If the retriable exception was raised, we should not DLQ + if isinstance(exc, Retriable): + raise - return message + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + raise InvalidMessage(raw_value.partition, raw_value.offset) from exc def process_attachments_and_events( diff --git a/tests/sentry/ingest/ingest_consumer/test_dlq.py b/tests/sentry/ingest/ingest_consumer/test_dlq.py index 3f098cdd238c78..aad9266d2d9990 100644 --- a/tests/sentry/ingest/ingest_consumer/test_dlq.py +++ b/tests/sentry/ingest/ingest_consumer/test_dlq.py @@ -8,7 +8,9 @@ from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message, Partition, Topic +from sentry.conf.types.kafka_definition import Topic as TopicNames from sentry.ingest.consumer.factory import IngestStrategyFactory +from sentry.ingest.types import ConsumerType from sentry.testutils.pytest.fixtures import django_db_all @@ -23,8 +25,18 @@ def make_message(payload: bytes, partition: Partition, offset: int) -> Message: ) +@pytest.mark.parametrize( + ("topic_name", "consumer_type"), + [ + (TopicNames.INGEST_EVENTS.value, ConsumerType.Events), + (TopicNames.INGEST_ATTACHMENTS.value, ConsumerType.Attachments), + (TopicNames.INGEST_TRANSACTIONS.value, ConsumerType.Transactions), + ], +) @django_db_all -def test_dlq_invalid_messages(factories) -> None: +def test_dlq_invalid_messages(factories, topic_name, consumer_type) -> None: + # Test is for all consumers that share the IngestStrategyFactory + # Feedback test is located in feedback/consumers organization = factories.create_organization() project = factories.create_project(organization=organization) @@ -40,11 +52,11 @@ def test_dlq_invalid_messages(factories) -> None: bogus_payload = b"bogus message" - partition = Partition(Topic("ingest-events"), 0) + partition = Partition(Topic(topic_name), 0) offset = 5 factory = IngestStrategyFactory( - "events", + consumer_type, reprocess_only_stuck_events=False, num_processes=1, max_batch_size=1,