From ecdcd3593459f78e67892e9991cea1469714edee Mon Sep 17 00:00:00 2001 From: Andrew Liu Date: Thu, 21 Mar 2024 16:43:00 -0700 Subject: [PATCH 1/4] test(ingest): add testing for txn and attachment dlqs --- src/sentry/consumers/__init__.py | 8 ++++---- src/sentry/ingest/consumer/factory.py | 2 +- src/sentry/ingest/types.py | 8 ++++---- tests/sentry/ingest/ingest_consumer/test_dlq.py | 14 +++++++++++++- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index b5839fee68ecb1..835ace738b347c 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -258,7 +258,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.Events, + "consumer_type": ConsumerType.EVENTS, }, "dlq_topic": Topic.INGEST_EVENTS_DLQ, }, @@ -267,7 +267,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.Feedback, + "consumer_type": ConsumerType.FEEDBACK, }, "dlq_topic": Topic.INGEST_FEEDBACK_EVENTS_DLQ, }, @@ -276,7 +276,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.Attachments, + "consumer_type": ConsumerType.ATTACHMENTS, }, "dlq_topic": Topic.INGEST_ATTACHMENTS_DLQ, }, @@ -285,7 +285,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.Transactions, + "consumer_type": ConsumerType.TRANSACTIONS, }, "dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ, }, diff --git a/src/sentry/ingest/consumer/factory.py b/src/sentry/ingest/consumer/factory.py index f2c548c6075002..ed5f4c903d4306 100644 --- a/src/sentry/ingest/consumer/factory.py +++ b/src/sentry/ingest/consumer/factory.py @@ -70,7 +70,7 @@ def __init__( output_block_size: int | None, ): self.consumer_type = consumer_type - self.is_attachment_topic = consumer_type == ConsumerType.Attachments + self.is_attachment_topic = consumer_type == ConsumerType.ATTACHMENTS self.reprocess_only_stuck_events = reprocess_only_stuck_events self.multi_process = None diff --git a/src/sentry/ingest/types.py b/src/sentry/ingest/types.py index d9ca4198dbc01b..1735f8ba16cbe3 100644 --- a/src/sentry/ingest/types.py +++ b/src/sentry/ingest/types.py @@ -3,7 +3,7 @@ class ConsumerType: Defines the types of ingestion consumers """ - Events = "events" # consumes simple events ( from the Events topic) - Attachments = "attachments" # consumes events with attachments ( from the Attachments topic) - Transactions = "transactions" # consumes transaction events ( from the Transactions topic) - Feedback = "feedback" # consumes user feedback ( from the ingest-feedback-events topic) + EVENTS = "events" # consumes simple events ( from the Events topic) + ATTACHMENTS = "attachments" # consumes events with attachments ( from the Attachments topic) + TRANSACTIONS = "transactions" # consumes transaction events ( from the Transactions topic) + FEEDBACK = "feedback" # consumes user feedback ( from the ingest-feedback-events topic) diff --git a/tests/sentry/ingest/ingest_consumer/test_dlq.py b/tests/sentry/ingest/ingest_consumer/test_dlq.py index 3f098cdd238c78..12c340d7fd530d 100644 --- a/tests/sentry/ingest/ingest_consumer/test_dlq.py +++ b/tests/sentry/ingest/ingest_consumer/test_dlq.py @@ -8,7 +8,9 @@ from arroyo.dlq import InvalidMessage from arroyo.types import BrokerValue, Message, Partition, Topic +from sentry.conf.types.kafka_definition import Topic as TopicNames from sentry.ingest.consumer.factory import IngestStrategyFactory +from sentry.ingest.types import ConsumerType from sentry.testutils.pytest.fixtures import django_db_all @@ -23,8 +25,18 @@ def make_message(payload: bytes, partition: Partition, offset: int) -> Message: ) +@pytest.mark.parametrize( + ("topic_name", "consumer_type"), + [ + (TopicNames.INGEST_EVENTS.value, ConsumerType.EVENTS), + (TopicNames.INGEST_ATTACHMENTS.value, ConsumerType.ATTACHMENTS), + (TopicNames.INGEST_TRANSACTIONS.value, ConsumerType.TRANSACTIONS), + ], +) @django_db_all -def test_dlq_invalid_messages(factories) -> None: +def test_dlq_invalid_messages(factories, topic_name, consumer_type) -> None: + # Test is for all consumers that share the IngestStrategyFactory + # Feedback test is located in feedback/consumers organization = factories.create_organization() project = factories.create_project(organization=organization) From 5dd1e40e6f89472539741e8ca3579efe19edc5b6 Mon Sep 17 00:00:00 2001 From: Andrew Liu Date: Thu, 21 Mar 2024 16:45:35 -0700 Subject: [PATCH 2/4] stick with uncapitalized ConsumerTypes --- src/sentry/consumers/__init__.py | 8 ++++---- src/sentry/ingest/consumer/factory.py | 2 +- src/sentry/ingest/types.py | 8 ++++---- tests/sentry/ingest/ingest_consumer/test_dlq.py | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 835ace738b347c..b5839fee68ecb1 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -258,7 +258,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.EVENTS, + "consumer_type": ConsumerType.Events, }, "dlq_topic": Topic.INGEST_EVENTS_DLQ, }, @@ -267,7 +267,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.FEEDBACK, + "consumer_type": ConsumerType.Feedback, }, "dlq_topic": Topic.INGEST_FEEDBACK_EVENTS_DLQ, }, @@ -276,7 +276,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.ATTACHMENTS, + "consumer_type": ConsumerType.Attachments, }, "dlq_topic": Topic.INGEST_ATTACHMENTS_DLQ, }, @@ -285,7 +285,7 @@ def ingest_events_options() -> list[click.Option]: "strategy_factory": "sentry.ingest.consumer.factory.IngestStrategyFactory", "click_options": ingest_events_options(), "static_args": { - "consumer_type": ConsumerType.TRANSACTIONS, + "consumer_type": ConsumerType.Transactions, }, "dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ, }, diff --git a/src/sentry/ingest/consumer/factory.py b/src/sentry/ingest/consumer/factory.py index ed5f4c903d4306..f2c548c6075002 100644 --- a/src/sentry/ingest/consumer/factory.py +++ b/src/sentry/ingest/consumer/factory.py @@ -70,7 +70,7 @@ def __init__( output_block_size: int | None, ): self.consumer_type = consumer_type - self.is_attachment_topic = consumer_type == ConsumerType.ATTACHMENTS + self.is_attachment_topic = consumer_type == ConsumerType.Attachments self.reprocess_only_stuck_events = reprocess_only_stuck_events self.multi_process = None diff --git a/src/sentry/ingest/types.py b/src/sentry/ingest/types.py index 1735f8ba16cbe3..d9ca4198dbc01b 100644 --- a/src/sentry/ingest/types.py +++ b/src/sentry/ingest/types.py @@ -3,7 +3,7 @@ class ConsumerType: Defines the types of ingestion consumers """ - EVENTS = "events" # consumes simple events ( from the Events topic) - ATTACHMENTS = "attachments" # consumes events with attachments ( from the Attachments topic) - TRANSACTIONS = "transactions" # consumes transaction events ( from the Transactions topic) - FEEDBACK = "feedback" # consumes user feedback ( from the ingest-feedback-events topic) + Events = "events" # consumes simple events ( from the Events topic) + Attachments = "attachments" # consumes events with attachments ( from the Attachments topic) + Transactions = "transactions" # consumes transaction events ( from the Transactions topic) + Feedback = "feedback" # consumes user feedback ( from the ingest-feedback-events topic) diff --git a/tests/sentry/ingest/ingest_consumer/test_dlq.py b/tests/sentry/ingest/ingest_consumer/test_dlq.py index 12c340d7fd530d..71a649632e0913 100644 --- a/tests/sentry/ingest/ingest_consumer/test_dlq.py +++ b/tests/sentry/ingest/ingest_consumer/test_dlq.py @@ -28,9 +28,9 @@ def make_message(payload: bytes, partition: Partition, offset: int) -> Message: @pytest.mark.parametrize( ("topic_name", "consumer_type"), [ - (TopicNames.INGEST_EVENTS.value, ConsumerType.EVENTS), - (TopicNames.INGEST_ATTACHMENTS.value, ConsumerType.ATTACHMENTS), - (TopicNames.INGEST_TRANSACTIONS.value, ConsumerType.TRANSACTIONS), + (TopicNames.INGEST_EVENTS.value, ConsumerType.Events), + (TopicNames.INGEST_ATTACHMENTS.value, ConsumerType.Attachments), + (TopicNames.INGEST_TRANSACTIONS.value, ConsumerType.Transactions), ], ) @django_db_all From e1111031b18b4fc3a58f0cd3bf5ae31dd10333a4 Mon Sep 17 00:00:00 2001 From: Andrew Liu Date: Thu, 21 Mar 2024 16:51:13 -0700 Subject: [PATCH 3/4] actually use the parameters (oops) --- tests/sentry/ingest/ingest_consumer/test_dlq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sentry/ingest/ingest_consumer/test_dlq.py b/tests/sentry/ingest/ingest_consumer/test_dlq.py index 71a649632e0913..aad9266d2d9990 100644 --- a/tests/sentry/ingest/ingest_consumer/test_dlq.py +++ b/tests/sentry/ingest/ingest_consumer/test_dlq.py @@ -52,11 +52,11 @@ def test_dlq_invalid_messages(factories, topic_name, consumer_type) -> None: bogus_payload = b"bogus message" - partition = Partition(Topic("ingest-events"), 0) + partition = Partition(Topic(topic_name), 0) offset = 5 factory = IngestStrategyFactory( - "events", + consumer_type, reprocess_only_stuck_events=False, num_processes=1, max_batch_size=1, From bedfad5c90bc615b2b7288496ac9d8120074a796 Mon Sep 17 00:00:00 2001 From: Andrew Liu Date: Thu, 21 Mar 2024 16:56:19 -0700 Subject: [PATCH 4/4] raise InvalidMessage in attachment processing --- .../ingest/consumer/attachment_event.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/sentry/ingest/consumer/attachment_event.py b/src/sentry/ingest/consumer/attachment_event.py index 78a6ccaec7184b..fcaa0bc5ccae89 100644 --- a/src/sentry/ingest/consumer/attachment_event.py +++ b/src/sentry/ingest/consumer/attachment_event.py @@ -4,13 +4,15 @@ import msgpack from arroyo.backends.kafka.consumer import KafkaPayload -from arroyo.types import Message +from arroyo.dlq import InvalidMessage +from arroyo.types import BrokerValue, Message from sentry.models.project import Project from sentry.utils import metrics from .processors import ( IngestMessage, + Retriable, process_attachment_chunk, process_event, process_individual_attachment, @@ -37,14 +39,24 @@ def decode_and_process_chunks( tags={"consumer": consumer_type}, unit="byte", ) - message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) - if message["type"] == "attachment_chunk": - if not reprocess_only_stuck_events: - process_attachment_chunk(message) - return None + try: + message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) + + if message["type"] == "attachment_chunk": + if not reprocess_only_stuck_events: + process_attachment_chunk(message) + return None + + return message + except Exception as exc: + # If the retriable exception was raised, we should not DLQ + if isinstance(exc, Retriable): + raise - return message + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + raise InvalidMessage(raw_value.partition, raw_value.offset) from exc def process_attachments_and_events(