Skip to content

Commit

Permalink
test(ingest): add DLQ tests for txns and attachments and DLQ support …
Browse files Browse the repository at this point in the history
…for attachments (#67495)
  • Loading branch information
aliu3ntry committed Mar 25, 2024
1 parent aab6555 commit 497d057
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
26 changes: 19 additions & 7 deletions src/sentry/ingest/consumer/attachment_event.py
Expand Up @@ -4,13 +4,15 @@

import msgpack
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.types import Message
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.models.project import Project
from sentry.utils import metrics

from .processors import (
IngestMessage,
Retriable,
process_attachment_chunk,
process_event,
process_individual_attachment,
Expand All @@ -37,14 +39,24 @@ def decode_and_process_chunks(
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

if message["type"] == "attachment_chunk":
if not reprocess_only_stuck_events:
process_attachment_chunk(message)
return None
try:
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

if message["type"] == "attachment_chunk":
if not reprocess_only_stuck_events:
process_attachment_chunk(message)
return None

return message
except Exception as exc:
# If the retriable exception was raised, we should not DLQ
if isinstance(exc, Retriable):
raise

return message
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset) from exc


def process_attachments_and_events(
Expand Down
18 changes: 15 additions & 3 deletions tests/sentry/ingest/ingest_consumer/test_dlq.py
Expand Up @@ -8,7 +8,9 @@
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message, Partition, Topic

from sentry.conf.types.kafka_definition import Topic as TopicNames
from sentry.ingest.consumer.factory import IngestStrategyFactory
from sentry.ingest.types import ConsumerType
from sentry.testutils.pytest.fixtures import django_db_all


Expand All @@ -23,8 +25,18 @@ def make_message(payload: bytes, partition: Partition, offset: int) -> Message:
)


@pytest.mark.parametrize(
("topic_name", "consumer_type"),
[
(TopicNames.INGEST_EVENTS.value, ConsumerType.Events),
(TopicNames.INGEST_ATTACHMENTS.value, ConsumerType.Attachments),
(TopicNames.INGEST_TRANSACTIONS.value, ConsumerType.Transactions),
],
)
@django_db_all
def test_dlq_invalid_messages(factories) -> None:
def test_dlq_invalid_messages(factories, topic_name, consumer_type) -> None:
# Test is for all consumers that share the IngestStrategyFactory
# Feedback test is located in feedback/consumers
organization = factories.create_organization()
project = factories.create_project(organization=organization)

Expand All @@ -40,11 +52,11 @@ def test_dlq_invalid_messages(factories) -> None:

bogus_payload = b"bogus message"

partition = Partition(Topic("ingest-events"), 0)
partition = Partition(Topic(topic_name), 0)
offset = 5

factory = IngestStrategyFactory(
"events",
consumer_type,
reprocess_only_stuck_events=False,
num_processes=1,
max_batch_size=1,
Expand Down

0 comments on commit 497d057

Please sign in to comment.