From 17e9b335a0c2b36988076322d71d27ca35ba109f Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 11 Mar 2024 11:43:49 -0700 Subject: [PATCH 1/3] dev: Fix schema validation in devserver --- src/sentry/consumers/__init__.py | 8 ++++---- src/sentry/runner/commands/run.py | 2 +- .../ingest/ingest_consumer/test_ingest_consumer_kafka.py | 2 ++ .../post_process_forwarder/test_post_process_forwarder.py | 2 +- tests/snuba/incidents/test_tasks.py | 1 + 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 60834a223ced6b..689442ea7f2198 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -366,7 +366,7 @@ def get_stream_processor( synchronize_commit_group: str | None = None, healthcheck_file_path: str | None = None, enable_dlq: bool = False, - validate_schema: bool = False, + enforce_schema: bool = False, group_instance_id: str | None = None, ) -> StreamProcessor: from sentry.utils import kafka_config @@ -464,12 +464,12 @@ def build_consumer_config(group_id: str): "--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer" ) - # Validate schema if "validate_schema" is set - validate_schema = consumer_definition.get("validate_schema") or False + # Validate schema if enforce_schema is true or "validate_schema" is set + validate_schema = enforce_schema or consumer_definition.get("validate_schema") or False if validate_schema: strategy_factory = ValidateSchemaStrategyFactoryWrapper( - consumer_topic.value, validate_schema, strategy_factory + consumer_topic.value, enforce_schema, strategy_factory ) if healthcheck_file_path is not None: diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index f623257ddf1a1a..e352c45fea3a17 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -480,7 +480,7 @@ def dev_consumer(consumer_names): synchronize_commit_log_topic=None, enable_dlq=False, healthcheck_file_path=None, - validate_schema=True, + enforce_schema=True, ) for consumer_name in consumer_names ] diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py index 38fecef6aa9978..b92c1428004d6b 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py @@ -126,6 +126,7 @@ def test_ingest_consumer_reads_from_topic_and_calls_celery_task( group_id=random_group_id, auto_offset_reset="earliest", strict_offset_reset=False, + enforce_schema=True, ) with task_runner(): @@ -191,6 +192,7 @@ def test_ingest_consumer_gets_event_unstuck( group_id=random_group_id, auto_offset_reset="earliest", strict_offset_reset=False, + enforce_schema=True, ) with task_runner(): diff --git a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py index 19754b4948d3a0..b9f253ddc7ccfd 100644 --- a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py +++ b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py @@ -100,7 +100,7 @@ def get_test_stream_processor( max_poll_interval_ms=None, enable_dlq=False, healthcheck_file_path=None, - validate_schema=False, + enforce_schema=True, ) def run_post_process_forwarder_streaming_consumer(self, ppf_mode: str) -> None: diff --git a/tests/snuba/incidents/test_tasks.py b/tests/snuba/incidents/test_tasks.py index a84207e064c2f5..a3bc4dced68f20 100644 --- a/tests/snuba/incidents/test_tasks.py +++ b/tests/snuba/incidents/test_tasks.py @@ -192,5 +192,6 @@ def test_arroyo(self): group_id="hi", strict_offset_reset=True, auto_offset_reset="earliest", + enforce_schema=True, ) self.run_test(consumer) From 56ea334fcd894e348993ba5fcc38b2ead08271c1 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 11 Mar 2024 12:31:06 -0700 Subject: [PATCH 2/3] fix test --- .../sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py index b92c1428004d6b..1a78fe5cda0336 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py @@ -76,7 +76,7 @@ def inner(type, project=default_project): normalized_event = dict(em.get_data()) message = { "type": "event", - "start_time": time.time(), + "start_time": int(time.time()), "event_id": event_id, "project_id": int(project_id), "payload": json.dumps(normalized_event), From 72cdc2f9e0697f89eea96861dd0ace765a17a53b Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 12 Mar 2024 13:46:58 -0700 Subject: [PATCH 3/3] make test message valid --- .../post_process_forwarder/test_post_process_forwarder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py index 89fe8aa29efdad..4052ec663a2d10 100644 --- a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py +++ b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py @@ -35,6 +35,8 @@ def kafka_message_payload() -> Any: "organization_id": 1, "project_id": 1, "primary_hash": "311ee66a5b8e697929804ceb1c456ffe", + "data": {"received": time.time()}, + "message": "hello world", }, { "is_new": False,