Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Add a new recombiner consumer #66804

Merged
merged 18 commits into from Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/sentry/conf/server.py
Expand Up @@ -778,7 +778,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.reprocessing2",
"sentry.tasks.sentry_apps",
"sentry.tasks.servicehooks",
"sentry.tasks.spans",
"sentry.tasks.store",
"sentry.tasks.symbolication",
"sentry.tasks.unmerge",
Expand Down Expand Up @@ -942,7 +941,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
Queue("nudge.invite_missing_org_members", routing_key="invite_missing_org_members"),
Queue("auto_resolve_issues", routing_key="auto_resolve_issues"),
Queue("on_demand_metrics", routing_key="on_demand_metrics"),
Queue("spans.process_segment", routing_key="spans.process_segment"),
]

from celery.schedules import crontab
Expand Down Expand Up @@ -3515,6 +3513,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
"group-attributes": "default",
"snuba-spans": "default",
"shared-resources-usage": "default",
"buffered-segment": "default",
}


Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Expand Up @@ -45,6 +45,7 @@ class Topic(Enum):
GROUP_ATTRIBUTES = "group-attributes"
SHARED_RESOURCES_USAGE = "shared-resources-usage"
SNUBA_SPANS = "snuba-spans"
BUFFERED_SEGMENT = "buffered-segment"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this topic actually exist in prod?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does not. None of these consumers/topics/redis existing in prod yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a concern of deploying this when the infrastructure doesn't exist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer code is for sure okay to deploy without the infra in place (since ops actually needs to manually deploy the consumers), and the consumers producing to and reading from this topic aren't actually running in production, so I assumed it would be fine. @phacops do you know if that's the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evanh I'm going to make an ops ticket to get this topic configured and make sure that goes through before I merge this PR just to be safe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ops ticket

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preference is to merge all the code first and then deploy infra to prod in the right order (details in the ticket), so I'm going to merge this PR cc: @evanh



class ConsumerDefinition(TypedDict, total=False):
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/consumers/__init__.py
Expand Up @@ -347,6 +347,10 @@ def ingest_events_options() -> list[click.Option]:
"topic": Topic.SNUBA_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
},
"recombine-segment": {
"topic": Topic.BUFFERED_SEGMENT,
"strategy_factory": "sentry.spans.consumers.recombine.factory.RecombineSegmentStrategyFactory",
},
**settings.SENTRY_KAFKA_CONSUMERS,
}

Expand Down
9 changes: 1 addition & 8 deletions src/sentry/spans/consumers/process/factory.py
Expand Up @@ -13,7 +13,6 @@

from sentry import options
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import process_segment

logger = logging.getLogger(__name__)
SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")
Expand Down Expand Up @@ -42,13 +41,7 @@ def process_message(message: Message[KafkaPayload]):
return

client = RedisSpansBuffer()
new_segment = client.write_span(project_id, segment_id, message.payload.value)
if new_segment:
# This function currently does nothing.
process_segment.apply_async(
args=[project_id, segment_id],
countdown=PROCESS_SEGMENT_DELAY,
)
client.write_span(project_id, segment_id, message.payload.value)


class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
Empty file.
42 changes: 42 additions & 0 deletions src/sentry/spans/consumers/recombine/factory.py
@@ -0,0 +1,42 @@
import logging
import random
from collections.abc import Mapping

import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import BrokerValue, Commit, Message, Partition

from sentry.spans.consumers.recombine.message import process_segment
from sentry.utils import json

logger = logging.getLogger(__name__)


def process_message(message: Message[KafkaPayload]):
assert isinstance(message.value, BrokerValue)
try:
segments = json.loads(message.payload.value)
shruthilayaj marked this conversation as resolved.
Show resolved Hide resolved
except Exception:
logger.exception("Failed to process segment payload")
return

try:
process_segment(segments)
except Exception as e:
if random.random() < 0.05:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sample this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to log exception instead - had just copied over an old pattern

sentry_sdk.capture_exception(e)


class RecombineSegmentStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=process_message,
next_step=CommitOffsets(commit),
)
@@ -1,5 +1,4 @@
import logging
import random
from collections.abc import Mapping, Sequence, Set
from copy import deepcopy
from typing import Any
Expand All @@ -18,11 +17,8 @@
)
from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.models.project import Project
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.canonical import CanonicalKeyDict
from sentry.utils.sdk import mark_scope_as_unsafe

SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")

Expand Down Expand Up @@ -155,13 +151,13 @@ def transform_spans_to_event_dict(spans):
return event


def _process_segment(project_id, segment_id):
client = RedisSpansBuffer()
spans = client.read_segment(project_id, segment_id)

with sentry_sdk.start_span(op="sentry.tasks.spans.transform_spans_to_event_dict"):
def process_segment(spans: list[str | bytes]):
with sentry_sdk.start_span(
op="sentry.consumers.recombine.process_segment.transform_spans_to_event_dict"
):
event = transform_spans_to_event_dict(spans)

project_id = event["project_id"]
with metrics.timer("tasks.spans.project.get_from_cache"):
project = Project.objects.get_from_cache(id=project_id)

Expand All @@ -186,17 +182,3 @@ def _process_segment(project_id, segment_id):
_update_occurrence_group_type(jobs, projects)

return jobs


@instrumented_task(
name="sentry.tasks.spans.process_segment",
queue="spans.process_segment",
max_retries=0,
)
def process_segment(project_id, segment_id):
mark_scope_as_unsafe()
try:
_process_segment(project_id, segment_id)
except Exception as err:
if random.random() < 0.05:
sentry_sdk.capture_exception(err)
3 changes: 2 additions & 1 deletion src/sentry/utils/sdk.py
Expand Up @@ -37,9 +37,10 @@

UNSAFE_FILES = (
"sentry/event_manager.py",
"/sentry/spans/consumers/process/factory.py",
"/sentry/spans/consumers/recombine/factory.py",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do those have a / prefixed in contrast to the others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch!

"sentry/tasks/process_buffer.py",
"sentry/ingest/consumer/processors.py",
"sentry/tasks/spans.py",
# This consumer lives outside of sentry but is just as unsafe.
"outcomes_consumer.py",
)
Expand Down
8 changes: 2 additions & 6 deletions tests/sentry/spans/consumers/process/test_factory.py
Expand Up @@ -57,8 +57,7 @@ def build_mock_message(data, topic=None):
"standalone-spans.process-spans-consumer.project-allowlist": [1],
}
)
@mock.patch("sentry.spans.consumers.process.factory.process_segment")
def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
def test_consumer_pushes_to_redis():
redis_client = get_redis_client()

topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
Expand Down Expand Up @@ -88,7 +87,6 @@ def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
assert redis_client.lrange("segment:ace31e54d65652aa:1:process-segment", 0, -1) == [
message.value()
]
process_segment.apply_async.assert_called_once_with(args=[1, "ace31e54d65652aa"], countdown=120)


@override_options(
Expand All @@ -97,8 +95,7 @@ def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
"standalone-spans.process-spans-consumer.project-allowlist": [1],
}
)
@mock.patch("sentry.spans.consumers.process.factory.process_segment")
def test_second_span_in_segment_does_not_queue_task(process_segment):
def test_second_span_in_segment_does_not_queue_task():
redis_client = get_redis_client()

topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
Expand Down Expand Up @@ -140,7 +137,6 @@ def test_second_span_in_segment_does_not_queue_task(process_segment):
message.value(),
message.value(),
]
process_segment.apply_async.assert_called_once_with(args=[1, "ace31e54d65652aa"], countdown=120)


@override_options(
Expand Down
Empty file.
81 changes: 81 additions & 0 deletions tests/sentry/spans/consumers/recombine/test_factory.py
@@ -0,0 +1,81 @@
from datetime import datetime
from unittest import mock

from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition
from arroyo.types import Topic as ArroyoTopic

from sentry.conf.types.kafka_definition import Topic
from sentry.spans.consumers.recombine.factory import RecombineSegmentStrategyFactory
from sentry.utils import json
from sentry.utils.kafka_config import get_topic_definition


def build_mock_span(**kwargs):
span = {
"duration_ms": 0,
"event_id": "72fcea47d44a444fb132f8d462eeb0b4",
"exclusive_time_ms": 0.006,
"is_segment": False,
"parent_span_id": "93f0e87ad9cc709e",
"profile_id": "7ce060d7ea62432b8355bc9e612676e4",
"project_id": 1,
"received": 1706734067.029479,
"retention_days": 90,
"segment_id": "ace31e54d65652aa",
"sentry_tags": {
"environment": "development",
"op": "relay_fetch_org_options",
"release": "backend@24.2.0.dev0+df7615f2ff7dc3c8802f806477f920bb934bd198",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": "ip:127.0.0.1",
},
"span_id": "95acbe6d30a66717",
"start_timestamp_ms": 1706734066840,
"trace_id": "8e6f22e6169545cc963255d0f29cb76b",
}

span.update(**kwargs)
return json.dumps(span)


def build_mock_message(data, topic=None):
message = mock.Mock()
message.value.return_value = json.dumps(data)
if topic:
message.topic.return_value = topic
return message


@mock.patch("sentry.spans.consumers.recombine.factory.process_segment")
def test_consumer_pushes_to_redis(mock_process_segment):

topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENT)["real_topic_name"])
partition = Partition(topic, 0)
strategy = RecombineSegmentStrategyFactory().create_with_partitions(
commit=mock.Mock(),
partitions={},
)

span_data = build_mock_span()
segment_data = [span_data]
message = build_mock_message(segment_data, topic)

strategy.submit(
Message(
BrokerValue(
KafkaPayload(b"key", message.value().encode("utf-8"), []),
partition,
1,
datetime.now(),
)
)
)

strategy.poll()
strategy.join(1)
strategy.terminate()

mock_process_segment.assert_called_once_with(segment_data)
@@ -1,9 +1,7 @@
import uuid
from unittest import mock

from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import _process_segment
from sentry.spans.consumers.recombine.message import process_segment
from sentry.testutils.cases import TestCase
from sentry.utils import json

Expand Down Expand Up @@ -44,8 +42,7 @@ class TestSpansTask(TestCase):
def setUp(self):
self.project = self.create_project()

@mock.patch.object(RedisSpansBuffer, "read_segment")
def test_n_plus_one_issue_detection(self, mock_read_segment):
def test_n_plus_one_issue_detection(self):
segment_span = build_mock_span(project_id=self.project.id)
child_span = build_mock_span(
project_id=self.project.id,
Expand Down Expand Up @@ -80,8 +77,7 @@ def repeating_span():
repeating_spans = [repeating_span() for _ in range(7)]
spans = [segment_span, child_span, cause_span] + repeating_spans

mock_read_segment.return_value = spans
job = _process_segment(self.project.id, "a49b42af9fb69da0")[0]
job = process_segment(spans)[0]

assert (
job["performance_problems"][0].fingerprint
Expand Down