Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Add a new recombiner consumer #66804

Merged
merged 18 commits into from Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/sentry/conf/server.py
Expand Up @@ -776,7 +776,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.reprocessing2",
"sentry.tasks.sentry_apps",
"sentry.tasks.servicehooks",
"sentry.tasks.spans",
"sentry.tasks.store",
"sentry.tasks.symbolication",
"sentry.tasks.unmerge",
Expand Down Expand Up @@ -939,7 +938,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
Queue("nudge.invite_missing_org_members", routing_key="invite_missing_org_members"),
Queue("auto_resolve_issues", routing_key="auto_resolve_issues"),
Queue("on_demand_metrics", routing_key="on_demand_metrics"),
Queue("spans.process_segment", routing_key="spans.process_segment"),
]

from celery.schedules import crontab
Expand Down Expand Up @@ -3492,6 +3490,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"group-attributes": "default",
"snuba-spans": "default",
"shared-resources-usage": "default",
"buffered-segments": "default",
}


Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Expand Up @@ -46,6 +46,7 @@ class Topic(Enum):
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"
BUFFERED_SEGMENTS = "buffered-segments"


class ConsumerDefinition(TypedDict, total=False):
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -349,6 +349,10 @@ def ingest_events_options() -> list[click.Option]:
"topic": Topic.SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
"detect-performance-issues": {
"topic": Topic.BUFFERED_SEGMENTS,
"strategy_factory": "sentry.spans.consumers.detect_performance_issues.factory.DetectPerformanceIssuesStrategyFactory",
},
**settings.SENTRY_KAFKA_CONSUMERS,
}

Expand Down
Empty file.
53 changes: 53 additions & 0 deletions src/sentry/spans/consumers/detect_performance_issues/factory.py
@@ -0,0 +1,53 @@
import logging
from collections.abc import Mapping
from typing import Any

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec
from sentry_kafka_schemas.codecs import Codec, ValidationError
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegment

from sentry.spans.consumers.detect_performance_issues.message import process_segment

BUFFERED_SEGMENT_SCHEMA: Codec[BufferedSegment] = get_codec("buffered-segments")

logger = logging.getLogger(__name__)


def _deserialize_segment(value: bytes) -> Mapping[str, Any]:
return BUFFERED_SEGMENT_SCHEMA.decode(value)


def process_message(message: Message[KafkaPayload]):
try:
segment = _deserialize_segment(message.payload.value)
except ValidationError:
logger.exception("Failed to deserialize segment payload")
return

process_segment(segment["spans"])


def _process_message(message: Message[KafkaPayload]):
assert isinstance(message.value, BrokerValue)

try:
process_message(message)
except Exception:
logger.exception("Failed to process segment payload")


class DetectPerformanceIssuesStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=_process_message,
next_step=CommitOffsets(commit),
)
@@ -1,5 +1,4 @@
import logging
import random
from collections.abc import Mapping, Sequence, Set
from copy import deepcopy
from typing import Any
Expand All @@ -18,11 +17,8 @@
)
from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.models.project import Project
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.canonical import CanonicalKeyDict
from sentry.utils.sdk import mark_scope_as_unsafe

SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")

Expand Down Expand Up @@ -104,64 +100,60 @@ def _update_occurrence_group_type(jobs: Sequence[Job], projects: ProjectsMapping


def transform_spans_to_event_dict(spans):
event: dict[str, Any] = {"type": "transaction", "level": "info", "contexts": {}}
deserialized_spans: list[dict[str, Any]] = []
event: dict[str, Any] = {"type": "transaction", "contexts": {}}
processed_spans: list[dict[str, Any]] = []
for span in spans:
try:
deserialized_span = dict(_deserialize_span(span))
except Exception:
logger.exception("Failed to process span payload")
continue
sentry_tags = span.get("sentry_tags", {})

sentry_tags = deserialized_span.get("sentry_tags", {})

if deserialized_span["is_segment"] is True:
event["event_id"] = deserialized_span.get("event_id")
event["project_id"] = deserialized_span["project_id"]
if span["is_segment"] is True:
event["event_id"] = span.get("event_id")
event["project_id"] = span["project_id"]
event["transaction"] = sentry_tags.get("transaction")
event["release"] = sentry_tags.get("release")
event["environment"] = sentry_tags.get("environment")

event["platform"] = sentry_tags.get("platform")

event["contexts"]["trace"] = {
"trace_id": deserialized_span["trace_id"],
"trace_id": span["trace_id"],
"type": "trace",
"op": sentry_tags.get("transaction.op"),
"span_id": deserialized_span["span_id"],
"span_id": span["span_id"],
}
event["received"] = deserialized_span["received"]
event["timestamp"] = (
deserialized_span["start_timestamp_ms"] + deserialized_span["duration_ms"]
) / 1000
event["received"] = span["received"]
event["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000
event["start_timestamp"] = span["start_timestamp_ms"] / 1000

if (profile_id := deserialized_span.get("profile_id")) is not None:
if (profile_id := span.get("profile_id")) is not None:
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"}

if (op := sentry_tags.get("op")) is not None:
deserialized_span["op"] = op
span["op"] = op

deserialized_span["start_timestamp"] = deserialized_span["start_timestamp_ms"] / 1000
deserialized_span["timestamp"] = (
deserialized_span["start_timestamp_ms"] + deserialized_span["duration_ms"]
) / 1000
span["start_timestamp"] = span["start_timestamp_ms"] / 1000
span["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000

deserialized_spans.append(deserialized_span)
processed_spans.append(span)

# The performance detectors expect the span list to be ordered/flattened in the way they
# are structured in the tree. This is an implicit assumption in the performance detectors.
# So we build a tree and flatten it depth first.
# TODO: See if we can update the detectors to work without this assumption so we can
# just pass it a list of spans.
tree, root_span_id = build_tree(deserialized_spans)
tree, root_span_id = build_tree(processed_spans)
flattened_spans = flatten_tree(tree, root_span_id)
event["spans"] = flattened_spans

return event


def _process_segment(project_id, segment_id):
client = RedisSpansBuffer()
spans = client.read_segment(project_id, segment_id)

with sentry_sdk.start_span(op="sentry.tasks.spans.transform_spans_to_event_dict"):
def process_segment(spans: list[dict[str, Any]]):
with sentry_sdk.start_span(
op="sentry.consumers.detect_performance_issues.process_segment.transform_spans_to_event_dict"
):
event = transform_spans_to_event_dict(spans)

project_id = event["project_id"]
with metrics.timer("tasks.spans.project.get_from_cache"):
project = Project.objects.get_from_cache(id=project_id)

Expand All @@ -186,17 +178,3 @@ def _process_segment(project_id, segment_id):
_update_occurrence_group_type(jobs, projects)

return jobs


@instrumented_task(
name="sentry.tasks.spans.process_segment",
queue="spans.process_segment",
max_retries=0,
)
def process_segment(project_id, segment_id):
mark_scope_as_unsafe()
try:
_process_segment(project_id, segment_id)
except Exception as err:
if random.random() < 0.05:
sentry_sdk.capture_exception(err)
9 changes: 1 addition & 8 deletions src/sentry/spans/consumers/process/factory.py
Expand Up @@ -13,7 +13,6 @@

from sentry import options
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import process_segment

logger = logging.getLogger(__name__)
SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")
Expand Down Expand Up @@ -42,13 +41,7 @@ def process_message(message: Message[KafkaPayload]):
return

client = RedisSpansBuffer()
new_segment = client.write_span(project_id, segment_id, message.payload.value)
if new_segment:
# This function currently does nothing.
process_segment.apply_async(
args=[project_id, segment_id],
countdown=PROCESS_SEGMENT_DELAY,
)
client.write_span(project_id, segment_id, message.payload.value)


class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
8 changes: 5 additions & 3 deletions src/sentry/utils/sdk.py
Expand Up @@ -37,9 +37,10 @@

UNSAFE_FILES = (
"sentry/event_manager.py",
"sentry/spans/consumers/process/factory.py",
"sentry/spans/consumers/detect_performance_issues/factory.py",
"sentry/tasks/process_buffer.py",
"sentry/ingest/consumer/processors.py",
"sentry/tasks/spans.py",
# This consumer lives outside of sentry but is just as unsafe.
"outcomes_consumer.py",
)
Expand Down Expand Up @@ -612,8 +613,9 @@ def bind_organization_context(organization: Organization | RpcOrganization) -> N
helper = settings.SENTRY_ORGANIZATION_CONTEXT_HELPER

# XXX(dcramer): this is duplicated in organizationContext.jsx on the frontend
with configure_scope() as scope, sentry_sdk.start_span(
op="other", description="bind_organization_context"
with (
configure_scope() as scope,
sentry_sdk.start_span(op="other", description="bind_organization_context"),
):
# This can be used to find errors that may have been mistagged
check_tag_for_scope_bleed("organization.slug", organization.slug)
Expand Down
Empty file.
@@ -0,0 +1,54 @@
from datetime import datetime
from unittest import mock

from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition
from arroyo.types import Topic as ArroyoTopic

from sentry.conf.types.kafka_definition import Topic
from sentry.spans.consumers.detect_performance_issues.factory import (
DetectPerformanceIssuesStrategyFactory,
)
from sentry.utils import json
from sentry.utils.kafka_config import get_topic_definition
from tests.sentry.spans.consumers.process.test_factory import build_mock_span


def build_mock_message(data, topic=None):
message = mock.Mock()
message.value.return_value = json.dumps(data)
if topic:
message.topic.return_value = topic
return message


@mock.patch("sentry.spans.consumers.detect_performance_issues.factory.process_segment")
def test_consumer_processes_segment(mock_process_segment):

topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = DetectPerformanceIssuesStrategyFactory().create_with_partitions(
commit=mock.Mock(),
partitions={},
)

span_data = build_mock_span(project_id=1)
segment_data = {"spans": [span_data]}
message = build_mock_message(segment_data, topic)

strategy.submit(
Message(
BrokerValue(
KafkaPayload(b"key", message.value().encode("utf-8"), []),
partition,
1,
datetime.now(),
)
)
)

strategy.poll()
strategy.join(1)
strategy.terminate()

mock_process_segment.assert_called_once_with(segment_data["spans"])
@@ -1,51 +1,16 @@
import uuid
from unittest import mock

from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import _process_segment
from sentry.spans.consumers.detect_performance_issues.message import process_segment
from sentry.testutils.cases import TestCase
from sentry.utils import json


def build_mock_span(project_id, span_op=None, **kwargs):
span = {
"description": "OrganizationNPlusOne",
"duration_ms": 107,
"event_id": "61ccae71d70f45bb9b1f2ccb7f7a49ec",
"exclusive_time_ms": 107.359,
"is_segment": True,
"parent_span_id": "b35b839c02985f33",
"profile_id": "dbae2b82559649a1a34a2878134a007b",
"project_id": project_id,
"received": 1707953019.044972,
"retention_days": 90,
"segment_id": "a49b42af9fb69da0",
"sentry_tags": {
"browser.name": "Google Chrome",
"environment": "development",
"op": span_op or "base.dispatch.sleep",
"release": "backend@24.2.0.dev0+699ce0cd1281cc3c7275d0a474a595375c769ae8",
"transaction": "/api/0/organizations/{organization_slug}/n-plus-one/",
"transaction.method": "GET",
"transaction.op": "http.server",
"user": "id:1",
},
"span_id": "a49b42af9fb69da0",
"start_timestamp_ms": 1707953018865,
"trace_id": "94576097f3a64b68b85a59c7d4e3ee2a",
}

span.update(**kwargs)
return json.dumps(span)
from tests.sentry.spans.consumers.process.test_factory import build_mock_span


class TestSpansTask(TestCase):
def setUp(self):
self.project = self.create_project()

@mock.patch.object(RedisSpansBuffer, "read_segment")
def test_n_plus_one_issue_detection(self, mock_read_segment):
def test_n_plus_one_issue_detection(self):
segment_span = build_mock_span(project_id=self.project.id)
child_span = build_mock_span(
project_id=self.project.id,
Expand Down Expand Up @@ -80,8 +45,7 @@ def repeating_span():
repeating_spans = [repeating_span() for _ in range(7)]
spans = [segment_span, child_span, cause_span] + repeating_spans

mock_read_segment.return_value = spans
job = _process_segment(self.project.id, "a49b42af9fb69da0")[0]
job = process_segment(spans)[0]

assert (
job["performance_problems"][0].fingerprint
Expand Down