Skip to content

Commit

Permalink
feat(spans): Add a new recombiner consumer (#66804)
Browse files Browse the repository at this point in the history
We're moving away from celery tasks + rabbit and instead
want a second consumer that processes segments. This
PR adds the new topic and consumer and refactors code
away from tasks. 

This code doesn't run in prod yet.

In a follow up, the `process-spans` consumer will fetch
segments that are ready to be processed and push them
to the `buffered-segments` topic
  • Loading branch information
shruthilayaj committed Mar 21, 2024
1 parent a8d8013 commit ff45ba4
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 165 deletions.
3 changes: 1 addition & 2 deletions src/sentry/conf/server.py
Expand Up @@ -779,7 +779,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.reprocessing2",
"sentry.tasks.sentry_apps",
"sentry.tasks.servicehooks",
"sentry.tasks.spans",
"sentry.tasks.store",
"sentry.tasks.symbolication",
"sentry.tasks.unmerge",
Expand Down Expand Up @@ -942,7 +941,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
Queue("nudge.invite_missing_org_members", routing_key="invite_missing_org_members"),
Queue("auto_resolve_issues", routing_key="auto_resolve_issues"),
Queue("on_demand_metrics", routing_key="on_demand_metrics"),
Queue("spans.process_segment", routing_key="spans.process_segment"),
]

from celery.schedules import crontab
Expand Down Expand Up @@ -3511,6 +3509,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"group-attributes": "default",
"snuba-spans": "default",
"shared-resources-usage": "default",
"buffered-segments": "default",
}


Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Expand Up @@ -48,6 +48,7 @@ class Topic(Enum):
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"
BUFFERED_SEGMENTS = "buffered-segments"


class ConsumerDefinition(TypedDict, total=False):
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -349,6 +349,10 @@ def ingest_events_options() -> list[click.Option]:
"topic": Topic.SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
"detect-performance-issues": {
"topic": Topic.BUFFERED_SEGMENTS,
"strategy_factory": "sentry.spans.consumers.detect_performance_issues.factory.DetectPerformanceIssuesStrategyFactory",
},
**settings.SENTRY_KAFKA_CONSUMERS,
}

Expand Down
Empty file.
53 changes: 53 additions & 0 deletions src/sentry/spans/consumers/detect_performance_issues/factory.py
@@ -0,0 +1,53 @@
import logging
from collections.abc import Mapping
from typing import Any

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec
from sentry_kafka_schemas.codecs import Codec, ValidationError
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegment

from sentry.spans.consumers.detect_performance_issues.message import process_segment

BUFFERED_SEGMENT_SCHEMA: Codec[BufferedSegment] = get_codec("buffered-segments")

logger = logging.getLogger(__name__)


def _deserialize_segment(value: bytes) -> Mapping[str, Any]:
return BUFFERED_SEGMENT_SCHEMA.decode(value)


def process_message(message: Message[KafkaPayload]):
try:
segment = _deserialize_segment(message.payload.value)
except ValidationError:
logger.exception("Failed to deserialize segment payload")
return

process_segment(segment["spans"])


def _process_message(message: Message[KafkaPayload]):
assert isinstance(message.value, BrokerValue)

try:
process_message(message)
except Exception:
logger.exception("Failed to process segment payload")


class DetectPerformanceIssuesStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=_process_message,
next_step=CommitOffsets(commit),
)
@@ -1,5 +1,4 @@
import logging
import random
from collections.abc import Mapping, Sequence, Set
from copy import deepcopy
from typing import Any
Expand All @@ -18,11 +17,8 @@
)
from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.models.project import Project
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.canonical import CanonicalKeyDict
from sentry.utils.sdk import mark_scope_as_unsafe

SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")

Expand Down Expand Up @@ -104,64 +100,60 @@ def _update_occurrence_group_type(jobs: Sequence[Job], projects: ProjectsMapping


def transform_spans_to_event_dict(spans):
event: dict[str, Any] = {"type": "transaction", "level": "info", "contexts": {}}
deserialized_spans: list[dict[str, Any]] = []
event: dict[str, Any] = {"type": "transaction", "contexts": {}}
processed_spans: list[dict[str, Any]] = []
for span in spans:
try:
deserialized_span = dict(_deserialize_span(span))
except Exception:
logger.exception("Failed to process span payload")
continue
sentry_tags = span.get("sentry_tags", {})

sentry_tags = deserialized_span.get("sentry_tags", {})

if deserialized_span["is_segment"] is True:
event["event_id"] = deserialized_span.get("event_id")
event["project_id"] = deserialized_span["project_id"]
if span["is_segment"] is True:
event["event_id"] = span.get("event_id")
event["project_id"] = span["project_id"]
event["transaction"] = sentry_tags.get("transaction")
event["release"] = sentry_tags.get("release")
event["environment"] = sentry_tags.get("environment")

event["platform"] = sentry_tags.get("platform")

event["contexts"]["trace"] = {
"trace_id": deserialized_span["trace_id"],
"trace_id": span["trace_id"],
"type": "trace",
"op": sentry_tags.get("transaction.op"),
"span_id": deserialized_span["span_id"],
"span_id": span["span_id"],
}
event["received"] = deserialized_span["received"]
event["timestamp"] = (
deserialized_span["start_timestamp_ms"] + deserialized_span["duration_ms"]
) / 1000
event["received"] = span["received"]
event["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000
event["start_timestamp"] = span["start_timestamp_ms"] / 1000

if (profile_id := deserialized_span.get("profile_id")) is not None:
if (profile_id := span.get("profile_id")) is not None:
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"}

if (op := sentry_tags.get("op")) is not None:
deserialized_span["op"] = op
span["op"] = op

deserialized_span["start_timestamp"] = deserialized_span["start_timestamp_ms"] / 1000
deserialized_span["timestamp"] = (
deserialized_span["start_timestamp_ms"] + deserialized_span["duration_ms"]
) / 1000
span["start_timestamp"] = span["start_timestamp_ms"] / 1000
span["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000

deserialized_spans.append(deserialized_span)
processed_spans.append(span)

# The performance detectors expect the span list to be ordered/flattened in the way they
# are structured in the tree. This is an implicit assumption in the performance detectors.
# So we build a tree and flatten it depth first.
# TODO: See if we can update the detectors to work without this assumption so we can
# just pass it a list of spans.
tree, root_span_id = build_tree(deserialized_spans)
tree, root_span_id = build_tree(processed_spans)
flattened_spans = flatten_tree(tree, root_span_id)
event["spans"] = flattened_spans

return event


def _process_segment(project_id, segment_id):
client = RedisSpansBuffer()
spans = client.read_segment(project_id, segment_id)

with sentry_sdk.start_span(op="sentry.tasks.spans.transform_spans_to_event_dict"):
def process_segment(spans: list[dict[str, Any]]):
with sentry_sdk.start_span(
op="sentry.consumers.detect_performance_issues.process_segment.transform_spans_to_event_dict"
):
event = transform_spans_to_event_dict(spans)

project_id = event["project_id"]
with metrics.timer("tasks.spans.project.get_from_cache"):
project = Project.objects.get_from_cache(id=project_id)

Expand All @@ -186,17 +178,3 @@ def _process_segment(project_id, segment_id):
_update_occurrence_group_type(jobs, projects)

return jobs


@instrumented_task(
name="sentry.tasks.spans.process_segment",
queue="spans.process_segment",
max_retries=0,
)
def process_segment(project_id, segment_id):
mark_scope_as_unsafe()
try:
_process_segment(project_id, segment_id)
except Exception as err:
if random.random() < 0.05:
sentry_sdk.capture_exception(err)
9 changes: 1 addition & 8 deletions src/sentry/spans/consumers/process/factory.py
Expand Up @@ -13,7 +13,6 @@

from sentry import options
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import process_segment

logger = logging.getLogger(__name__)
SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")
Expand Down Expand Up @@ -42,13 +41,7 @@ def process_message(message: Message[KafkaPayload]):
return

client = RedisSpansBuffer()
new_segment = client.write_span(project_id, segment_id, message.payload.value)
if new_segment:
# This function currently does nothing.
process_segment.apply_async(
args=[project_id, segment_id],
countdown=PROCESS_SEGMENT_DELAY,
)
client.write_span(project_id, segment_id, message.payload.value)


class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/utils/sdk.py
Expand Up @@ -37,9 +37,10 @@

UNSAFE_FILES = (
"sentry/event_manager.py",
"sentry/spans/consumers/process/factory.py",
"sentry/spans/consumers/detect_performance_issues/factory.py",
"sentry/tasks/process_buffer.py",
"sentry/ingest/consumer/processors.py",
"sentry/tasks/spans.py",
# This consumer lives outside of sentry but is just as unsafe.
"outcomes_consumer.py",
)
Expand Down
Empty file.
@@ -0,0 +1,54 @@
from datetime import datetime
from unittest import mock

from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition
from arroyo.types import Topic as ArroyoTopic

from sentry.conf.types.kafka_definition import Topic
from sentry.spans.consumers.detect_performance_issues.factory import (
DetectPerformanceIssuesStrategyFactory,
)
from sentry.utils import json
from sentry.utils.kafka_config import get_topic_definition
from tests.sentry.spans.consumers.process.test_factory import build_mock_span


def build_mock_message(data, topic=None):
message = mock.Mock()
message.value.return_value = json.dumps(data)
if topic:
message.topic.return_value = topic
return message


@mock.patch("sentry.spans.consumers.detect_performance_issues.factory.process_segment")
def test_consumer_processes_segment(mock_process_segment):

topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = DetectPerformanceIssuesStrategyFactory().create_with_partitions(
commit=mock.Mock(),
partitions={},
)

span_data = build_mock_span(project_id=1)
segment_data = {"spans": [span_data]}
message = build_mock_message(segment_data, topic)

strategy.submit(
Message(
BrokerValue(
KafkaPayload(b"key", message.value().encode("utf-8"), []),
partition,
1,
datetime.now(),
)
)
)

strategy.poll()
strategy.join(1)
strategy.terminate()

mock_process_segment.assert_called_once_with(segment_data["spans"])
@@ -1,51 +1,16 @@
import uuid
from unittest import mock

from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import _process_segment
from sentry.spans.consumers.detect_performance_issues.message import process_segment
from sentry.testutils.cases import TestCase
from sentry.utils import json


def build_mock_span(project_id, span_op=None, **kwargs):
span = {
"description": "OrganizationNPlusOne",
"duration_ms": 107,
"event_id": "61ccae71d70f45bb9b1f2ccb7f7a49ec",
"exclusive_time_ms": 107.359,
"is_segment": True,
"parent_span_id": "b35b839c02985f33",
"profile_id": "dbae2b82559649a1a34a2878134a007b",
"project_id": project_id,
"received": 1707953019.044972,
"retention_days": 90,
"segment_id": "a49b42af9fb69da0",
"sentry_tags": {
"browser.name": "Google Chrome",
"environment": "development",
"op": span_op or "base.dispatch.sleep",
"release": "backend@24.2.0.dev0+699ce0cd1281cc3c7275d0a474a595375c769ae8",
"transaction": "/api/0/organizations/{organization_slug}/n-plus-one/",
"transaction.method": "GET",
"transaction.op": "http.server",
"user": "id:1",
},
"span_id": "a49b42af9fb69da0",
"start_timestamp_ms": 1707953018865,
"trace_id": "94576097f3a64b68b85a59c7d4e3ee2a",
}

span.update(**kwargs)
return json.dumps(span)
from tests.sentry.spans.consumers.process.test_factory import build_mock_span


class TestSpansTask(TestCase):
def setUp(self):
self.project = self.create_project()

@mock.patch.object(RedisSpansBuffer, "read_segment")
def test_n_plus_one_issue_detection(self, mock_read_segment):
def test_n_plus_one_issue_detection(self):
segment_span = build_mock_span(project_id=self.project.id)
child_span = build_mock_span(
project_id=self.project.id,
Expand Down Expand Up @@ -80,8 +45,7 @@ def repeating_span():
repeating_spans = [repeating_span() for _ in range(7)]
spans = [segment_span, child_span, cause_span] + repeating_spans

mock_read_segment.return_value = spans
job = _process_segment(self.project.id, "a49b42af9fb69da0")[0]
job = process_segment(spans)[0]

assert (
job["performance_problems"][0].fingerprint
Expand Down

0 comments on commit ff45ba4

Please sign in to comment.