Skip to content

Commit

Permalink
chore(spans): Add options for process spans consumer (#66687)
Browse files Browse the repository at this point in the history
Add options for enabling and rolling out process spans consumer,
these go in `process_message` so the messages are still consumed
but no-op if flag isn't enabled
  • Loading branch information
shruthilayaj committed Mar 13, 2024
1 parent 5193373 commit 9519c29
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/sentry/options/defaults.py
Expand Up @@ -2104,3 +2104,16 @@
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
)

# Standalone spans
register(
"standalone-spans.process-spans-consumer.enable",
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"standalone-spans.process-spans-consumer.project-allowlist",
type=Sequence,
default=[],
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
7 changes: 7 additions & 0 deletions src/sentry/spans/consumers/process/factory.py
Expand Up @@ -11,6 +11,7 @@
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent

from sentry import options
from sentry.spans.buffer.redis import RedisSpansBuffer
from sentry.tasks.spans import process_segment

Expand All @@ -25,6 +26,9 @@ def _deserialize_span(value: bytes) -> Mapping[str, Any]:


def process_message(message: Message[KafkaPayload]):
if not options.get("standalone-spans.process-spans-consumer.enable"):
return

assert isinstance(message.value, BrokerValue)
try:
span = _deserialize_span(message.payload.value)
Expand All @@ -34,6 +38,9 @@ def process_message(message: Message[KafkaPayload]):
logger.exception("Failed to process span payload")
return

if project_id not in options.get("standalone-spans.process-spans-consumer.project-allowlist"):
return

client = RedisSpansBuffer()
new_segment = client.write_span(project_id, segment_id, message.payload.value)
if new_segment:
Expand Down
83 changes: 83 additions & 0 deletions tests/sentry/spans/consumers/process/test_factory.py
Expand Up @@ -8,6 +8,7 @@
from sentry.conf.types.kafka_definition import Topic
from sentry.spans.buffer.redis import get_redis_client
from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory
from sentry.testutils.helpers.options import override_options
from sentry.utils import json
from sentry.utils.kafka_config import get_topic_definition

Expand Down Expand Up @@ -50,6 +51,12 @@ def build_mock_message(data, topic=None):
return message


@override_options(
{
"standalone-spans.process-spans-consumer.enable": True,
"standalone-spans.process-spans-consumer.project-allowlist": [1],
}
)
@mock.patch("sentry.spans.consumers.process.factory.process_segment")
def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
redis_client = get_redis_client()
Expand Down Expand Up @@ -84,6 +91,12 @@ def test_consumer_pushes_to_redis_and_schedules_task(process_segment):
process_segment.apply_async.assert_called_once_with(args=[1, "ace31e54d65652aa"], countdown=120)


@override_options(
{
"standalone-spans.process-spans-consumer.enable": True,
"standalone-spans.process-spans-consumer.project-allowlist": [1],
}
)
@mock.patch("sentry.spans.consumers.process.factory.process_segment")
def test_second_span_in_segment_does_not_queue_task(process_segment):
redis_client = get_redis_client()
Expand Down Expand Up @@ -128,3 +141,73 @@ def test_second_span_in_segment_does_not_queue_task(process_segment):
message.value(),
]
process_segment.apply_async.assert_called_once_with(args=[1, "ace31e54d65652aa"], countdown=120)


@override_options(
{
"standalone-spans.process-spans-consumer.enable": False,
"standalone-spans.process-spans-consumer.project-allowlist": [1],
}
)
@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer")
def test_option_disabled(mock_buffer):
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = ProcessSpansStrategyFactory().create_with_partitions(
commit=mock.Mock(),
partitions={},
)

span_data = build_mock_span()
message = build_mock_message(span_data, topic)

strategy.submit(
Message(
BrokerValue(
KafkaPayload(b"key", message.value().encode("utf-8"), []),
partition,
1,
datetime.now(),
)
)
)

strategy.poll()
strategy.join(1)
strategy.terminate()
mock_buffer.assert_not_called()


@override_options(
{
"standalone-spans.process-spans-consumer.enable": True,
"standalone-spans.process-spans-consumer.project-allowlist": [],
}
)
@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer")
def test_option_project_rollout(mock_buffer):
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = ProcessSpansStrategyFactory().create_with_partitions(
commit=mock.Mock(),
partitions={},
)

span_data = build_mock_span()
message = build_mock_message(span_data, topic)

strategy.submit(
Message(
BrokerValue(
KafkaPayload(b"key", message.value().encode("utf-8"), []),
partition,
1,
datetime.now(),
)
)
)

strategy.poll()
strategy.join(1)
strategy.terminate()
mock_buffer.assert_not_called()

0 comments on commit 9519c29

Please sign in to comment.