Skip to content

Commit

Permalink
feat: DLQ unprocessable messages on ingest-events (#66236)
Browse files Browse the repository at this point in the history
This PR depends on getsentry/sentry-kafka-schemas#230

It attempts to be somewhat cautious about DLQing and not DLQ
anything that is even potentially retriable. This could be tweaked
later.

If a non-retriable exception is raised, the consumer tries to further determine whether the message
is actually bad (or it's some transient retriable error) by validating the message against
the schema. The message will be DLQed only if that also fails.
  • Loading branch information
lynnagara committed Mar 15, 2024
1 parent 2646ee1 commit 9ba8562
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 82 deletions.
1 change: 1 addition & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -259,6 +259,7 @@ def ingest_events_options() -> list[click.Option]:
"static_args": {
"consumer_type": "events",
},
"dlq_topic": Topic.INGEST_EVENTS_DLQ,
},
"ingest-feedback-events": {
"topic": Topic.INGEST_FEEDBACK_EVENTS,
Expand Down
145 changes: 78 additions & 67 deletions src/sentry/ingest/consumer/processors.py
Expand Up @@ -32,6 +32,10 @@
IngestMessage = Mapping[str, Any]


class Retriable(Exception):
pass


def trace_func(**span_kwargs):
def wrapper(f):
@functools.wraps(f)
Expand Down Expand Up @@ -127,78 +131,85 @@ def process_event(
):
return

# If we only want to reprocess "stuck" events, we check if this event is already in the
# `processing_store`. We only continue here if the event *is* present, as that will eventually
# process and consume the event from the `processing_store`, whereby getting it "unstuck".
if reprocess_only_stuck_events and not event_processing_store.exists(data):
return

with metrics.timer("ingest_consumer._store_event"):
cache_key = event_processing_store.store(data)

# Raise the retriable exception and skip DLQ if anything below this point fails as it may be caused by
# intermittent network issue
try:
# Records rc-processing usage broken down by
# event type.
event_type = data.get("type")
if event_type == "error":
app_feature = "errors"
elif event_type == "transaction":
app_feature = "transactions"
else:
app_feature = None

if app_feature is not None:
record(settings.EVENT_PROCESSING_STORE, app_feature, len(payload), UsageUnit.BYTES)
except Exception:
pass

if attachments:
with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"):
attachment_objects = [
CachedAttachment(type=attachment.pop("attachment_type"), **attachment)
for attachment in attachments
]

attachment_cache.set(cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT)

if data.get("type") == "transaction":
# No need for preprocess/process for transactions thus submit
# directly transaction specific save_event task.
save_event_transaction.delay(
cache_key=cache_key,
data=None,
start_time=start_time,
event_id=event_id,
project_id=project_id,
)
elif data.get("type") == "feedback":
if features.has("organizations:user-feedback-ingest", project.organization, actor=None):
save_event_feedback.delay(
cache_key=None, # no need to cache as volume is low
data=data,
start_time=start_time,
event_id=event_id,
project_id=project_id,
)
else:
# Preprocess this event, which spawns either process_event or
# save_event. Pass data explicitly to avoid fetching it again from the
# cache.
with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"):
preprocess_event(
# If we only want to reprocess "stuck" events, we check if this event is already in the
# `processing_store`. We only continue here if the event *is* present, as that will eventually
# process and consume the event from the `processing_store`, whereby getting it "unstuck".
if reprocess_only_stuck_events and not event_processing_store.exists(data):
return

with metrics.timer("ingest_consumer._store_event"):
cache_key = event_processing_store.store(data)

try:
# Records rc-processing usage broken down by
# event type.
event_type = data.get("type")
if event_type == "error":
app_feature = "errors"
elif event_type == "transaction":
app_feature = "transactions"
else:
app_feature = None

if app_feature is not None:
record(settings.EVENT_PROCESSING_STORE, app_feature, len(payload), UsageUnit.BYTES)
except Exception:
pass

if attachments:
with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"):
attachment_objects = [
CachedAttachment(type=attachment.pop("attachment_type"), **attachment)
for attachment in attachments
]

attachment_cache.set(
cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT
)

if data.get("type") == "transaction":
# No need for preprocess/process for transactions thus submit
# directly transaction specific save_event task.
save_event_transaction.delay(
cache_key=cache_key,
data=data,
data=None,
start_time=start_time,
event_id=event_id,
project=project,
has_attachments=bool(attachments),
project_id=project_id,
)

# remember for an 1 hour that we saved this event (deduplication protection)
cache.set(deduplication_key, "", CACHE_TIMEOUT)

# emit event_accepted once everything is done
event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event)
elif data.get("type") == "feedback":
if features.has("organizations:user-feedback-ingest", project.organization, actor=None):
save_event_feedback.delay(
cache_key=None, # no need to cache as volume is low
data=data,
start_time=start_time,
event_id=event_id,
project_id=project_id,
)
else:
# Preprocess this event, which spawns either process_event or
# save_event. Pass data explicitly to avoid fetching it again from the
# cache.
with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"):
preprocess_event(
cache_key=cache_key,
data=data,
start_time=start_time,
event_id=event_id,
project=project,
has_attachments=bool(attachments),
)

# remember for an 1 hour that we saved this event (deduplication protection)
cache.set(deduplication_key, "", CACHE_TIMEOUT)

# emit event_accepted once everything is done
event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event)
except Exception as exc:
raise Retriable(exc)


@trace_func(name="ingest_consumer.process_attachment_chunk")
Expand Down
67 changes: 53 additions & 14 deletions src/sentry/ingest/consumer/simple_event.py
@@ -1,17 +1,28 @@
import logging

import msgpack
import sentry_kafka_schemas
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.types import Message
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.utils import metrics

from .processors import IngestMessage, process_event
from .processors import IngestMessage, Retriable, process_event

logger = logging.getLogger(__name__)


consumer_type_to_default_topic = {
"events": Topic.INGEST_EVENTS,
"transactions": Topic.INGEST_TRANSACTIONS,
"attachments": Topic.INGEST_ATTACHMENTS,
"ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS,
}


def process_simple_event_message(
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool
) -> None:
Expand All @@ -36,19 +47,47 @@ def process_simple_event_message(
tags={"consumer": consumer_type},
unit="byte",
)
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

message_type = message["type"]
project_id = message["project_id"]
try:
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False)

message_type = message["type"]
project_id = message["project_id"]

if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")
if message_type != "event":
raise ValueError(f"Unsupported message type: {message_type}")

try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return
try:
with metrics.timer("ingest_consumer.fetch_project"):
project = Project.objects.get_from_cache(id=project_id)
except Project.DoesNotExist:
logger.exception("Project for ingested event does not exist: %s", project_id)
return

return process_event(message, project, reprocess_only_stuck_events)

except Exception as exc:
# If the retriable exception was raised, we should not DLQ
if isinstance(exc, Retriable):
raise

# If no retriable exception was raised, check the schema to decide whether to DLQ
default_topic = consumer_type_to_default_topic[consumer_type].value

# TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise
# the exception if it's a different topic. This can be removed once attachments and transactions
# have schemas too.
if default_topic != "ingest-events":
raise

codec = sentry_kafka_schemas.get_codec(default_topic)

try:
codec.decode(raw_payload, validate=True)
except Exception:
raw_value = raw_message.value
assert isinstance(raw_value, BrokerValue)

raise InvalidMessage(raw_value.partition, raw_value.offset)

return process_event(message, project, reprocess_only_stuck_events)
raise
2 changes: 1 addition & 1 deletion tests/relay_integration/test_sdk.py
Expand Up @@ -89,7 +89,7 @@ def test_recursion_breaker(settings, post_event_with_sdk):
with mock.patch(
"sentry.event_manager.EventManager.save", spec=Event, side_effect=ValueError("oh no!")
) as save:
with pytest.raises(ValueError):
with pytest.raises(Exception):
post_event_with_sdk({"message": "internal client test", "event_id": event_id})

assert_mock_called_once_with_partial(save, settings.SENTRY_PROJECT, cache_key=f"e:{event_id}:1")
Expand Down
70 changes: 70 additions & 0 deletions tests/sentry/ingest/ingest_consumer/test_dlq.py
@@ -0,0 +1,70 @@
import time
from datetime import datetime
from unittest.mock import Mock

import msgpack
import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message, Partition, Topic

from sentry.ingest.consumer.factory import IngestStrategyFactory
from sentry.testutils.pytest.fixtures import django_db_all


def make_message(payload: bytes, partition: Partition, offset: int) -> Message:
return Message(
BrokerValue(
KafkaPayload(None, payload, []),
partition,
offset,
datetime.now(),
)
)


@django_db_all
def test_dlq_invalid_messages(factories) -> None:
organization = factories.create_organization()
project = factories.create_project(organization=organization)

valid_payload = msgpack.packb(
{
"type": "event",
"project_id": project.id,
"payload": b"{}",
"start_time": int(time.time()),
"event_id": "aaa",
}
)

bogus_payload = b"bogus message"

partition = Partition(Topic("ingest-events"), 0)
offset = 5

factory = IngestStrategyFactory(
"events",
reprocess_only_stuck_events=False,
num_processes=1,
max_batch_size=1,
max_batch_time=1,
input_block_size=None,
output_block_size=None,
)
strategy = factory.create_with_partitions(Mock(), Mock())

# Valid payload raises original error
with pytest.raises(Exception) as exc_info:
message = make_message(valid_payload, partition, offset)
strategy.submit(message)
assert not isinstance(exc_info.value, InvalidMessage)

# Invalid payload raises InvalidMessage error

with pytest.raises(InvalidMessage) as exc_info:
message = make_message(bogus_payload, partition, offset)
strategy.submit(message)

assert exc_info.value.partition == partition
assert exc_info.value.offset == offset

0 comments on commit 9ba8562

Please sign in to comment.