diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index df9311ea9efc45..c5ec2958406e3a 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -259,6 +259,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "consumer_type": "events", }, + "dlq_topic": Topic.INGEST_EVENTS_DLQ, }, "ingest-feedback-events": { "topic": Topic.INGEST_FEEDBACK_EVENTS, diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index 5773bca1c11931..42e20cec2d16c3 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -32,6 +32,10 @@ IngestMessage = Mapping[str, Any] +class Retriable(Exception): + pass + + def trace_func(**span_kwargs): def wrapper(f): @functools.wraps(f) @@ -127,78 +131,85 @@ def process_event( ): return - # If we only want to reprocess "stuck" events, we check if this event is already in the - # `processing_store`. We only continue here if the event *is* present, as that will eventually - # process and consume the event from the `processing_store`, whereby getting it "unstuck". - if reprocess_only_stuck_events and not event_processing_store.exists(data): - return - - with metrics.timer("ingest_consumer._store_event"): - cache_key = event_processing_store.store(data) - + # Raise the retriable exception and skip DLQ if anything below this point fails as it may be caused by + # intermittent network issue try: - # Records rc-processing usage broken down by - # event type. - event_type = data.get("type") - if event_type == "error": - app_feature = "errors" - elif event_type == "transaction": - app_feature = "transactions" - else: - app_feature = None - - if app_feature is not None: - record(settings.EVENT_PROCESSING_STORE, app_feature, len(payload), UsageUnit.BYTES) - except Exception: - pass - - if attachments: - with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"): - attachment_objects = [ - CachedAttachment(type=attachment.pop("attachment_type"), **attachment) - for attachment in attachments - ] - - attachment_cache.set(cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT) - - if data.get("type") == "transaction": - # No need for preprocess/process for transactions thus submit - # directly transaction specific save_event task. - save_event_transaction.delay( - cache_key=cache_key, - data=None, - start_time=start_time, - event_id=event_id, - project_id=project_id, - ) - elif data.get("type") == "feedback": - if features.has("organizations:user-feedback-ingest", project.organization, actor=None): - save_event_feedback.delay( - cache_key=None, # no need to cache as volume is low - data=data, - start_time=start_time, - event_id=event_id, - project_id=project_id, - ) - else: - # Preprocess this event, which spawns either process_event or - # save_event. Pass data explicitly to avoid fetching it again from the - # cache. - with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): - preprocess_event( + # If we only want to reprocess "stuck" events, we check if this event is already in the + # `processing_store`. We only continue here if the event *is* present, as that will eventually + # process and consume the event from the `processing_store`, whereby getting it "unstuck". + if reprocess_only_stuck_events and not event_processing_store.exists(data): + return + + with metrics.timer("ingest_consumer._store_event"): + cache_key = event_processing_store.store(data) + + try: + # Records rc-processing usage broken down by + # event type. + event_type = data.get("type") + if event_type == "error": + app_feature = "errors" + elif event_type == "transaction": + app_feature = "transactions" + else: + app_feature = None + + if app_feature is not None: + record(settings.EVENT_PROCESSING_STORE, app_feature, len(payload), UsageUnit.BYTES) + except Exception: + pass + + if attachments: + with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"): + attachment_objects = [ + CachedAttachment(type=attachment.pop("attachment_type"), **attachment) + for attachment in attachments + ] + + attachment_cache.set( + cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT + ) + + if data.get("type") == "transaction": + # No need for preprocess/process for transactions thus submit + # directly transaction specific save_event task. + save_event_transaction.delay( cache_key=cache_key, - data=data, + data=None, start_time=start_time, event_id=event_id, - project=project, - has_attachments=bool(attachments), + project_id=project_id, ) - - # remember for an 1 hour that we saved this event (deduplication protection) - cache.set(deduplication_key, "", CACHE_TIMEOUT) - - # emit event_accepted once everything is done - event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event) + elif data.get("type") == "feedback": + if features.has("organizations:user-feedback-ingest", project.organization, actor=None): + save_event_feedback.delay( + cache_key=None, # no need to cache as volume is low + data=data, + start_time=start_time, + event_id=event_id, + project_id=project_id, + ) + else: + # Preprocess this event, which spawns either process_event or + # save_event. Pass data explicitly to avoid fetching it again from the + # cache. + with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): + preprocess_event( + cache_key=cache_key, + data=data, + start_time=start_time, + event_id=event_id, + project=project, + has_attachments=bool(attachments), + ) + + # remember for an 1 hour that we saved this event (deduplication protection) + cache.set(deduplication_key, "", CACHE_TIMEOUT) + + # emit event_accepted once everything is done + event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event) + except Exception as exc: + raise Retriable(exc) @trace_func(name="ingest_consumer.process_attachment_chunk") diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index d705fc2fa305de..e1278ab8890c06 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -1,17 +1,28 @@ import logging import msgpack +import sentry_kafka_schemas from arroyo.backends.kafka.consumer import KafkaPayload -from arroyo.types import Message +from arroyo.dlq import InvalidMessage +from arroyo.types import BrokerValue, Message +from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.utils import metrics -from .processors import IngestMessage, process_event +from .processors import IngestMessage, Retriable, process_event logger = logging.getLogger(__name__) +consumer_type_to_default_topic = { + "events": Topic.INGEST_EVENTS, + "transactions": Topic.INGEST_TRANSACTIONS, + "attachments": Topic.INGEST_ATTACHMENTS, + "ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS, +} + + def process_simple_event_message( raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool ) -> None: @@ -36,19 +47,47 @@ def process_simple_event_message( tags={"consumer": consumer_type}, unit="byte", ) - message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) - message_type = message["type"] - project_id = message["project_id"] + try: + message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) + + message_type = message["type"] + project_id = message["project_id"] - if message_type != "event": - raise ValueError(f"Unsupported message type: {message_type}") + if message_type != "event": + raise ValueError(f"Unsupported message type: {message_type}") - try: - with metrics.timer("ingest_consumer.fetch_project"): - project = Project.objects.get_from_cache(id=project_id) - except Project.DoesNotExist: - logger.exception("Project for ingested event does not exist: %s", project_id) - return + try: + with metrics.timer("ingest_consumer.fetch_project"): + project = Project.objects.get_from_cache(id=project_id) + except Project.DoesNotExist: + logger.exception("Project for ingested event does not exist: %s", project_id) + return + + return process_event(message, project, reprocess_only_stuck_events) + + except Exception as exc: + # If the retriable exception was raised, we should not DLQ + if isinstance(exc, Retriable): + raise + + # If no retriable exception was raised, check the schema to decide whether to DLQ + default_topic = consumer_type_to_default_topic[consumer_type].value + + # TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise + # the exception if it's a different topic. This can be removed once attachments and transactions + # have schemas too. + if default_topic != "ingest-events": + raise + + codec = sentry_kafka_schemas.get_codec(default_topic) + + try: + codec.decode(raw_payload, validate=True) + except Exception: + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + + raise InvalidMessage(raw_value.partition, raw_value.offset) - return process_event(message, project, reprocess_only_stuck_events) + raise diff --git a/tests/relay_integration/test_sdk.py b/tests/relay_integration/test_sdk.py index e09817076f5362..b2f78ff032a97e 100644 --- a/tests/relay_integration/test_sdk.py +++ b/tests/relay_integration/test_sdk.py @@ -89,7 +89,7 @@ def test_recursion_breaker(settings, post_event_with_sdk): with mock.patch( "sentry.event_manager.EventManager.save", spec=Event, side_effect=ValueError("oh no!") ) as save: - with pytest.raises(ValueError): + with pytest.raises(Exception): post_event_with_sdk({"message": "internal client test", "event_id": event_id}) assert_mock_called_once_with_partial(save, settings.SENTRY_PROJECT, cache_key=f"e:{event_id}:1") diff --git a/tests/sentry/ingest/ingest_consumer/test_dlq.py b/tests/sentry/ingest/ingest_consumer/test_dlq.py new file mode 100644 index 00000000000000..3f098cdd238c78 --- /dev/null +++ b/tests/sentry/ingest/ingest_consumer/test_dlq.py @@ -0,0 +1,70 @@ +import time +from datetime import datetime +from unittest.mock import Mock + +import msgpack +import pytest +from arroyo.backends.kafka import KafkaPayload +from arroyo.dlq import InvalidMessage +from arroyo.types import BrokerValue, Message, Partition, Topic + +from sentry.ingest.consumer.factory import IngestStrategyFactory +from sentry.testutils.pytest.fixtures import django_db_all + + +def make_message(payload: bytes, partition: Partition, offset: int) -> Message: + return Message( + BrokerValue( + KafkaPayload(None, payload, []), + partition, + offset, + datetime.now(), + ) + ) + + +@django_db_all +def test_dlq_invalid_messages(factories) -> None: + organization = factories.create_organization() + project = factories.create_project(organization=organization) + + valid_payload = msgpack.packb( + { + "type": "event", + "project_id": project.id, + "payload": b"{}", + "start_time": int(time.time()), + "event_id": "aaa", + } + ) + + bogus_payload = b"bogus message" + + partition = Partition(Topic("ingest-events"), 0) + offset = 5 + + factory = IngestStrategyFactory( + "events", + reprocess_only_stuck_events=False, + num_processes=1, + max_batch_size=1, + max_batch_time=1, + input_block_size=None, + output_block_size=None, + ) + strategy = factory.create_with_partitions(Mock(), Mock()) + + # Valid payload raises original error + with pytest.raises(Exception) as exc_info: + message = make_message(valid_payload, partition, offset) + strategy.submit(message) + assert not isinstance(exc_info.value, InvalidMessage) + + # Invalid payload raises InvalidMessage error + + with pytest.raises(InvalidMessage) as exc_info: + message = make_message(bogus_payload, partition, offset) + strategy.submit(message) + + assert exc_info.value.partition == partition + assert exc_info.value.offset == offset