diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 60834a223ced6b..689442ea7f2198 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -366,7 +366,7 @@ def get_stream_processor( synchronize_commit_group: str | None = None, healthcheck_file_path: str | None = None, enable_dlq: bool = False, - validate_schema: bool = False, + enforce_schema: bool = False, group_instance_id: str | None = None, ) -> StreamProcessor: from sentry.utils import kafka_config @@ -464,12 +464,12 @@ def build_consumer_config(group_id: str): "--synchronize_commit_group and --synchronize_commit_log_topic are required arguments for this consumer" ) - # Validate schema if "validate_schema" is set - validate_schema = consumer_definition.get("validate_schema") or False + # Validate schema if enforce_schema is true or "validate_schema" is set + validate_schema = enforce_schema or consumer_definition.get("validate_schema") or False if validate_schema: strategy_factory = ValidateSchemaStrategyFactoryWrapper( - consumer_topic.value, validate_schema, strategy_factory + consumer_topic.value, enforce_schema, strategy_factory ) if healthcheck_file_path is not None: diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index f623257ddf1a1a..e352c45fea3a17 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -480,7 +480,7 @@ def dev_consumer(consumer_names): synchronize_commit_log_topic=None, enable_dlq=False, healthcheck_file_path=None, - validate_schema=True, + enforce_schema=True, ) for consumer_name in consumer_names ] diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py index 38fecef6aa9978..1a78fe5cda0336 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py @@ -76,7 +76,7 @@ def inner(type, project=default_project): normalized_event = dict(em.get_data()) message = { "type": "event", - "start_time": time.time(), + "start_time": int(time.time()), "event_id": event_id, "project_id": int(project_id), "payload": json.dumps(normalized_event), @@ -126,6 +126,7 @@ def test_ingest_consumer_reads_from_topic_and_calls_celery_task( group_id=random_group_id, auto_offset_reset="earliest", strict_offset_reset=False, + enforce_schema=True, ) with task_runner(): @@ -191,6 +192,7 @@ def test_ingest_consumer_gets_event_unstuck( group_id=random_group_id, auto_offset_reset="earliest", strict_offset_reset=False, + enforce_schema=True, ) with task_runner(): diff --git a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py index 34d204b29bb6ee..4052ec663a2d10 100644 --- a/tests/sentry/post_process_forwarder/test_post_process_forwarder.py +++ b/tests/sentry/post_process_forwarder/test_post_process_forwarder.py @@ -35,6 +35,8 @@ def kafka_message_payload() -> Any: "organization_id": 1, "project_id": 1, "primary_hash": "311ee66a5b8e697929804ceb1c456ffe", + "data": {"received": time.time()}, + "message": "hello world", }, { "is_new": False, @@ -95,7 +97,7 @@ def get_test_stream_processor( max_poll_interval_ms=None, enable_dlq=False, healthcheck_file_path=None, - validate_schema=False, + enforce_schema=True, ) def run_post_process_forwarder_streaming_consumer(self, ppf_mode: str) -> None: diff --git a/tests/snuba/incidents/test_tasks.py b/tests/snuba/incidents/test_tasks.py index a84207e064c2f5..a3bc4dced68f20 100644 --- a/tests/snuba/incidents/test_tasks.py +++ b/tests/snuba/incidents/test_tasks.py @@ -192,5 +192,6 @@ def test_arroyo(self): group_id="hi", strict_offset_reset=True, auto_offset_reset="earliest", + enforce_schema=True, ) self.run_test(consumer)