Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ingest): add DLQ tests for txns and attachments and DLQ support for attachments #67495

Merged
merged 5 commits into from Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 19 additions & 7 deletions src/sentry/ingest/consumer/attachment_event.py
Expand Up @@ -4,13 +4,15 @@

import msgpack
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.types import Message
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.models.project import Project
from sentry.utils import metrics

from .processors import (
IngestMessage,
Retriable,
process_attachment_chunk,
process_event,
process_individual_attachment,
Expand All @@ -37,14 +39,24 @@ def decode_and_process_chunks(
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

if message["type"] == "attachment_chunk":
if not reprocess_only_stuck_events:
process_attachment_chunk(message)
return None
try:
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

if message["type"] == "attachment_chunk":
if not reprocess_only_stuck_events:
process_attachment_chunk(message)
return None

return message
except Exception as exc:
# If the retriable exception was raised, we should not DLQ
if isinstance(exc, Retriable):
raise

return message
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)
raise InvalidMessage(raw_value.partition, raw_value.offset) from exc


def process_attachments_and_events(
Expand Down
18 changes: 15 additions & 3 deletions tests/sentry/ingest/ingest_consumer/test_dlq.py
Expand Up @@ -8,7 +8,9 @@
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message, Partition, Topic

from sentry.conf.types.kafka_definition import Topic as TopicNames
from sentry.ingest.consumer.factory import IngestStrategyFactory
from sentry.ingest.types import ConsumerType
from sentry.testutils.pytest.fixtures import django_db_all


Expand All @@ -23,8 +25,18 @@ def make_message(payload: bytes, partition: Partition, offset: int) -> Message:
)


@pytest.mark.parametrize(
("topic_name", "consumer_type"),
[
(TopicNames.INGEST_EVENTS.value, ConsumerType.Events),
(TopicNames.INGEST_ATTACHMENTS.value, ConsumerType.Attachments),
(TopicNames.INGEST_TRANSACTIONS.value, ConsumerType.Transactions),
],
)
@django_db_all
def test_dlq_invalid_messages(factories) -> None:
def test_dlq_invalid_messages(factories, topic_name, consumer_type) -> None:
# Test is for all consumers that share the IngestStrategyFactory
# Feedback test is located in feedback/consumers
organization = factories.create_organization()
project = factories.create_project(organization=organization)

Expand All @@ -40,11 +52,11 @@ def test_dlq_invalid_messages(factories) -> None:

bogus_payload = b"bogus message"

partition = Partition(Topic("ingest-events"), 0)
partition = Partition(Topic(topic_name), 0)
offset = 5

factory = IngestStrategyFactory(
"events",
consumer_type,
reprocess_only_stuck_events=False,
num_processes=1,
max_batch_size=1,
Expand Down