New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spans): Add a new recombiner consumer #66804
Changes from 8 commits
8967184
c572d4d
fa28176
475de51
4a3e304
c086d65
f223c1b
54058c8
7608e4e
9a0adc7
b9b6bfc
6a14c4d
beeb90c
4c9d463
cf67f96
dabf343
821c8d6
0918a0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import logging | ||
import random | ||
from collections.abc import Mapping | ||
|
||
import sentry_sdk | ||
from arroyo.backends.kafka.consumer import KafkaPayload | ||
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory | ||
from arroyo.processing.strategies.commit import CommitOffsets | ||
from arroyo.processing.strategies.run_task import RunTask | ||
from arroyo.types import BrokerValue, Commit, Message, Partition | ||
|
||
from sentry.spans.consumers.recombine.message import process_segment | ||
from sentry.utils import json | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def process_message(message: Message[KafkaPayload]): | ||
assert isinstance(message.value, BrokerValue) | ||
try: | ||
segments = json.loads(message.payload.value) | ||
shruthilayaj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except Exception: | ||
logger.exception("Failed to process segment payload") | ||
return | ||
|
||
try: | ||
process_segment(segments) | ||
except Exception as e: | ||
if random.random() < 0.05: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why sample this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to log exception instead - had just copied over an old pattern |
||
sentry_sdk.capture_exception(e) | ||
|
||
|
||
class RecombineSegmentStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): | ||
def create_with_partitions( | ||
self, | ||
commit: Commit, | ||
partitions: Mapping[Partition, int], | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
return RunTask( | ||
function=process_message, | ||
next_step=CommitOffsets(commit), | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,9 +37,10 @@ | |
|
||
UNSAFE_FILES = ( | ||
"sentry/event_manager.py", | ||
"/sentry/spans/consumers/process/factory.py", | ||
"/sentry/spans/consumers/recombine/factory.py", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do those have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh good catch! |
||
"sentry/tasks/process_buffer.py", | ||
"sentry/ingest/consumer/processors.py", | ||
"sentry/tasks/spans.py", | ||
# This consumer lives outside of sentry but is just as unsafe. | ||
"outcomes_consumer.py", | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
from datetime import datetime | ||
from unittest import mock | ||
|
||
from arroyo.backends.kafka import KafkaPayload | ||
from arroyo.types import BrokerValue, Message, Partition | ||
from arroyo.types import Topic as ArroyoTopic | ||
|
||
from sentry.conf.types.kafka_definition import Topic | ||
from sentry.spans.consumers.recombine.factory import RecombineSegmentStrategyFactory | ||
from sentry.utils import json | ||
from sentry.utils.kafka_config import get_topic_definition | ||
|
||
|
||
def build_mock_span(**kwargs): | ||
span = { | ||
"duration_ms": 0, | ||
"event_id": "72fcea47d44a444fb132f8d462eeb0b4", | ||
"exclusive_time_ms": 0.006, | ||
"is_segment": False, | ||
"parent_span_id": "93f0e87ad9cc709e", | ||
"profile_id": "7ce060d7ea62432b8355bc9e612676e4", | ||
"project_id": 1, | ||
"received": 1706734067.029479, | ||
"retention_days": 90, | ||
"segment_id": "ace31e54d65652aa", | ||
"sentry_tags": { | ||
"environment": "development", | ||
"op": "relay_fetch_org_options", | ||
"release": "backend@24.2.0.dev0+df7615f2ff7dc3c8802f806477f920bb934bd198", | ||
"transaction": "/api/0/relays/projectconfigs/", | ||
"transaction.method": "POST", | ||
"transaction.op": "http.server", | ||
"user": "ip:127.0.0.1", | ||
}, | ||
"span_id": "95acbe6d30a66717", | ||
"start_timestamp_ms": 1706734066840, | ||
"trace_id": "8e6f22e6169545cc963255d0f29cb76b", | ||
} | ||
|
||
span.update(**kwargs) | ||
return json.dumps(span) | ||
|
||
|
||
def build_mock_message(data, topic=None): | ||
message = mock.Mock() | ||
message.value.return_value = json.dumps(data) | ||
if topic: | ||
message.topic.return_value = topic | ||
return message | ||
|
||
|
||
@mock.patch("sentry.spans.consumers.recombine.factory.process_segment") | ||
def test_consumer_pushes_to_redis(mock_process_segment): | ||
|
||
topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENT)["real_topic_name"]) | ||
partition = Partition(topic, 0) | ||
strategy = RecombineSegmentStrategyFactory().create_with_partitions( | ||
commit=mock.Mock(), | ||
partitions={}, | ||
) | ||
|
||
span_data = build_mock_span() | ||
segment_data = [span_data] | ||
message = build_mock_message(segment_data, topic) | ||
|
||
strategy.submit( | ||
Message( | ||
BrokerValue( | ||
KafkaPayload(b"key", message.value().encode("utf-8"), []), | ||
partition, | ||
1, | ||
datetime.now(), | ||
) | ||
) | ||
) | ||
|
||
strategy.poll() | ||
strategy.join(1) | ||
strategy.terminate() | ||
|
||
mock_process_segment.assert_called_once_with(segment_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this topic actually exist in prod?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it does not. None of these consumers/topics/redis existing in prod yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a concern of deploying this when the infrastructure doesn't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer code is for sure okay to deploy without the infra in place (since ops actually needs to manually deploy the consumers), and the consumers producing to and reading from this topic aren't actually running in production, so I assumed it would be fine. @phacops do you know if that's the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@evanh I'm going to make an ops ticket to get this topic configured and make sure that goes through before I merge this PR just to be safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ops ticket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preference is to merge all the code first and then deploy infra to prod in the right order (details in the ticket), so I'm going to merge this PR cc: @evanh