Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Add a new recombiner consumer #66804

Merged
merged 18 commits into from Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/sentry/conf/server.py
Expand Up @@ -778,7 +778,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.reprocessing2",
"sentry.tasks.sentry_apps",
"sentry.tasks.servicehooks",
"sentry.tasks.spans",
"sentry.tasks.store",
"sentry.tasks.symbolication",
"sentry.tasks.unmerge",
Expand Down Expand Up @@ -942,7 +941,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
Queue("nudge.invite_missing_org_members", routing_key="invite_missing_org_members"),
Queue("auto_resolve_issues", routing_key="auto_resolve_issues"),
Queue("on_demand_metrics", routing_key="on_demand_metrics"),
Queue("spans.process_segment", routing_key="spans.process_segment"),
]

from celery.schedules import crontab
Expand Down Expand Up @@ -3515,6 +3513,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"group-attributes": "default",
"snuba-spans": "default",
"shared-resources-usage": "default",
"buffered-segments": "default",
}


Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Expand Up @@ -45,6 +45,7 @@ class Topic(Enum):
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"
BUFFERED_SEGMENTS = "buffered-segments"


class ConsumerDefinition(TypedDict, total=False):
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -347,6 +347,10 @@ def ingest_events_options() -> list[click.Option]:
"topic": Topic.SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
"recombine-segment": {
"topic": Topic.BUFFERED_SEGMENTS,
"strategy_factory": "sentry.spans.consumers.recombine.factory.RecombineSegmentStrategyFactory",
},
**settings.SENTRY_KAFKA_CONSUMERS,
}

Expand Down
9 changes: 1 addition & 8 deletions src/sentry/spans/consumers/process/factory.py
Expand Up @@ -13,7 +13,6 @@

from sentry import options
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import process_segment

logger = logging.getLogger(__name__)
SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")
Expand Down Expand Up @@ -42,13 +41,7 @@ def process_message(message: Message[KafkaPayload]):
return

client = RedisSpansBuffer()
new_segment = client.write_span(project_id, segment_id, message.payload.value)
if new_segment:
# This function currently does nothing.
process_segment.apply_async(
args=[project_id, segment_id],
countdown=PROCESS_SEGMENT_DELAY,
)
client.write_span(project_id, segment_id, message.payload.value)


class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
Empty file.
53 changes: 53 additions & 0 deletions src/sentry/spans/consumers/recombine/factory.py
@@ -0,0 +1,53 @@
import logging
from collections.abc import Mapping
from typing import Any

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec
from sentry_kafka_schemas.codecs import Codec, ValidationError
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegment

from sentry.spans.consumers.recombine.message import process_segment

BUFFERED_SEGMENT_SCHEMA: Codec[BufferedSegment] = get_codec("buffered-segments")

logger = logging.getLogger(__name__)


def _deserialize_segment(value: bytes) -> Mapping[str, Any]:
return BUFFERED_SEGMENT_SCHEMA.decode(value)


def process_message(message: Message[KafkaPayload]):
try:
segment = _deserialize_segment(message.payload.value)
except ValidationError:
logger.exception("Failed to deserialize segment payload")
return

process_segment(segment["spans"])


def _process_message(message: Message[KafkaPayload]):
assert isinstance(message.value, BrokerValue)

try:
process_message(message)
except Exception:
logger.exception("Failed to process segment payload")


class RecombineSegmentStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=_process_message,
next_step=CommitOffsets(commit),
)
@@ -1,5 +1,4 @@
import logging
import random
from collections.abc import Mapping, Sequence, Set
from copy import deepcopy
from typing import Any
Expand All @@ -18,11 +17,8 @@
)
from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.models.project import Project
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.canonical import CanonicalKeyDict
from sentry.utils.sdk import mark_scope_as_unsafe

SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")

Expand Down Expand Up @@ -104,64 +100,60 @@ def _update_occurrence_group_type(jobs: Sequence[Job], projects: ProjectsMapping


def transform_spans_to_event_dict(spans):
event: dict[str, Any] = {"type": "transaction", "level": "info", "contexts": {}}
deserialized_spans: list[dict[str, Any]] = []
event: dict[str, Any] = {"type": "transaction", "contexts": {}}
processed_spans: list[dict[str, Any]] = []
for span in spans:
try:
deserialized_span = dict(_deserialize_span(span))
except Exception:
logger.exception("Failed to process span payload")
continue
sentry_tags = span.get("sentry_tags", {})

sentry_tags = deserialized_span.get("sentry_tags", {})

if deserialized_span["is_segment"] is True:
event["event_id"] = deserialized_span.get("event_id")
event["project_id"] = deserialized_span["project_id"]
if span["is_segment"] is True:
event["event_id"] = span.get("event_id")
event["project_id"] = span["project_id"]
event["transaction"] = sentry_tags.get("transaction")
event["release"] = sentry_tags.get("release")
event["environment"] = sentry_tags.get("environment")

event["platform"] = sentry_tags.get("platform")

event["contexts"]["trace"] = {
"trace_id": deserialized_span["trace_id"],
"trace_id": span["trace_id"],
"type": "trace",
"op": sentry_tags.get("transaction.op"),
"span_id": deserialized_span["span_id"],
"span_id": span["span_id"],
}
event["received"] = deserialized_span["received"]
event["timestamp"] = (
deserialized_span["start_timestamp_ms"] + deserialized_span["duration_ms"]
) / 1000
event["received"] = span["received"]
event["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000
event["start_timestamp"] = span["start_timestamp_ms"] / 1000

if (profile_id := deserialized_span.get("profile_id")) is not None:
if (profile_id := span.get("profile_id")) is not None:
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"}

if (op := sentry_tags.get("op")) is not None:
deserialized_span["op"] = op
span["op"] = op

deserialized_span["start_timestamp"] = deserialized_span["start_timestamp_ms"] / 1000
deserialized_span["timestamp"] = (
deserialized_span["start_timestamp_ms"] + deserialized_span["duration_ms"]
) / 1000
span["start_timestamp"] = span["start_timestamp_ms"] / 1000
span["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000

deserialized_spans.append(deserialized_span)
processed_spans.append(span)

# The performance detectors expect the span list to be ordered/flattened in the way they
# are structured in the tree. This is an implicit assumption in the performance detectors.
# So we build a tree and flatten it depth first.
# TODO: See if we can update the detectors to work without this assumption so we can
# just pass it a list of spans.
tree, root_span_id = build_tree(deserialized_spans)
tree, root_span_id = build_tree(processed_spans)
flattened_spans = flatten_tree(tree, root_span_id)
event["spans"] = flattened_spans

return event


def _process_segment(project_id, segment_id):
client = RedisSpansBuffer()
spans = client.read_segment(project_id, segment_id)

with sentry_sdk.start_span(op="sentry.tasks.spans.transform_spans_to_event_dict"):
def process_segment(spans: dict[str, Any]):
with sentry_sdk.start_span(
op="sentry.consumers.recombine.process_segment.transform_spans_to_event_dict"
):
event = transform_spans_to_event_dict(spans)

project_id = event["project_id"]
with metrics.timer("tasks.spans.project.get_from_cache"):
project = Project.objects.get_from_cache(id=project_id)

Expand All @@ -186,17 +178,3 @@ def _process_segment(project_id, segment_id):
_update_occurrence_group_type(jobs, projects)

return jobs


@instrumented_task(
name="sentry.tasks.spans.process_segment",
queue="spans.process_segment",
max_retries=0,
)
def process_segment(project_id, segment_id):
mark_scope_as_unsafe()
try:
_process_segment(project_id, segment_id)
except Exception as err:
if random.random() < 0.05:
sentry_sdk.capture_exception(err)
3 changes: 2 additions & 1 deletion src/sentry/utils/sdk.py
Expand Up @@ -37,9 +37,10 @@

UNSAFE_FILES = (
"sentry/event_manager.py",
"/sentry/spans/consumers/process/factory.py",
"/sentry/spans/consumers/recombine/factory.py",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do those have a / prefixed in contrast to the others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch!

"sentry/tasks/process_buffer.py",
"sentry/ingest/consumer/processors.py",
"sentry/tasks/spans.py",
# This consumer lives outside of sentry but is just as unsafe.
"outcomes_consumer.py",
)
Expand Down