Skip to content

Commit

Permalink
dev: Fix schema validation in devserver (#66715)
Browse files Browse the repository at this point in the history
This ensures schema validation is run on every message on topics where a
schema is registered in the devserver. The goal of this change is to
help catch invalid messages in dev before changes are deployed.
  • Loading branch information
lynnagara committed Mar 12, 2024
1 parent 46838b1 commit 825cf9f
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 7 deletions.
8 changes: 4 additions & 4 deletions src/sentry/consumers/__init__.py
Expand Up @@ -366,7 +366,7 @@ def get_stream_processor(
synchronize_commit_group: str | None = None,
healthcheck_file_path: str | None = None,
enable_dlq: bool = False,
validate_schema: bool = False,
enforce_schema: bool = False,
group_instance_id: str | None = None,
) -> StreamProcessor:
from sentry.utils import kafka_config
Expand Down Expand Up @@ -464,12 +464,12 @@ def build_consumer_config(group_id: str):
"--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer"
)

# Validate schema if "validate_schema" is set
validate_schema = consumer_definition.get("validate_schema") or False
# Validate schema if enforce_schema is true or "validate_schema" is set
validate_schema = enforce_schema or consumer_definition.get("validate_schema") or False

if validate_schema:
strategy_factory = ValidateSchemaStrategyFactoryWrapper(
consumer_topic.value, validate_schema, strategy_factory
consumer_topic.value, enforce_schema, strategy_factory
)

if healthcheck_file_path is not None:
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/runner/commands/run.py
Expand Up @@ -480,7 +480,7 @@ def dev_consumer(consumer_names):
synchronize_commit_log_topic=None,
enable_dlq=False,
healthcheck_file_path=None,
validate_schema=True,
enforce_schema=True,
)
for consumer_name in consumer_names
]
Expand Down
Expand Up @@ -76,7 +76,7 @@ def inner(type, project=default_project):
normalized_event = dict(em.get_data())
message = {
"type": "event",
"start_time": time.time(),
"start_time": int(time.time()),
"event_id": event_id,
"project_id": int(project_id),
"payload": json.dumps(normalized_event),
Expand Down Expand Up @@ -126,6 +126,7 @@ def test_ingest_consumer_reads_from_topic_and_calls_celery_task(
group_id=random_group_id,
auto_offset_reset="earliest",
strict_offset_reset=False,
enforce_schema=True,
)

with task_runner():
Expand Down Expand Up @@ -191,6 +192,7 @@ def test_ingest_consumer_gets_event_unstuck(
group_id=random_group_id,
auto_offset_reset="earliest",
strict_offset_reset=False,
enforce_schema=True,
)

with task_runner():
Expand Down
Expand Up @@ -35,6 +35,8 @@ def kafka_message_payload() -> Any:
"organization_id": 1,
"project_id": 1,
"primary_hash": "311ee66a5b8e697929804ceb1c456ffe",
"data": {"received": time.time()},
"message": "hello world",
},
{
"is_new": False,
Expand Down Expand Up @@ -95,7 +97,7 @@ def get_test_stream_processor(
max_poll_interval_ms=None,
enable_dlq=False,
healthcheck_file_path=None,
validate_schema=False,
enforce_schema=True,
)

def run_post_process_forwarder_streaming_consumer(self, ppf_mode: str) -> None:
Expand Down
1 change: 1 addition & 0 deletions tests/snuba/incidents/test_tasks.py
Expand Up @@ -192,5 +192,6 @@ def test_arroyo(self):
group_id="hi",
strict_offset_reset=True,
auto_offset_reset="earliest",
enforce_schema=True,
)
self.run_test(consumer)

0 comments on commit 825cf9f

Please sign in to comment.