Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freshness sensor does not run when assets are pending or failed #21949

Closed
mr-mcox opened this issue May 17, 2024 · 3 comments
Closed

Freshness sensor does not run when assets are pending or failed #21949

mr-mcox opened this issue May 17, 2024 · 3 comments
Assignees
Labels
area: sensor Related to Sensors type: bug Something isn't working

Comments

@mr-mcox
Copy link

mr-mcox commented May 17, 2024

Dagster version

1.7.5

What's the issue?

I am encountering an issue with Dagster 1.7.5 when defining and running freshness checks. When one of the jobs is running, the freshness check evaluation result is "Running" and the freshness checks sensor fails at every evaluation with the following error:

dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor freshness_checks_sensor

  File "/usr/local/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 388, in get_external_sensor_execution
    with user_code_error_boundary(
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
    raise new_error from e

The above exception was caused by the following exception:
dagster._check.CheckError: Expected non-None value: Expected the dagster event to be an asset check evaluation.

  File "/usr/local/lib/python3.11/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 394, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/sensor_definition.py", line 795, in evaluate_tick
    result = self._evaluation_fn(context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/sensor_definition.py", line 1101, in _wrapped_fn
    raw_evaluation_result = fn(**context_param, **resource_args_populated)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/asset_check_factories/freshness_checks/sensor.py", line 98, in the_sensor
    evaluation = check.not_none(
                 ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1003, in not_none
    raise CheckError(f"Expected non-None value: {additional_message}")

This behavior is undesirable because it means that if a job is running, freshness cannot be checked, which defeats the purpose of checking freshness.

What did you expect to happen?

I expected that the freshness checks would be executed by the sensor, which would inform me if a job was taking too long to run. Additionally, I assumed that one running job (Job A) would not prevent freshness checks from running on another job (Job B).

How to reproduce?

Here is a simplified version of the code I am using to define freshness checks and the corresponding sensor:

from dagster import AssetKey, build_sensor_for_freshness_checks, build_last_update_freshness_checks
from datetime import timedelta

# Define freshness checks
job_a_complete_freshness_check = build_last_update_freshness_checks(
    assets=[AssetKey(["final_asset", "job_a"])],
    lower_bound_delta=timedelta(days=1),
)

job_b_complete_freshness_check = build_last_update_freshness_checks(
    assets=[AssetKey(["final_asset", "job_b"])],
    lower_bound_delta=timedelta(days=1),
)

all_freshness_checks = (
    job_a_complete_freshness_check +
    job_b_complete_freshness_check  # type: ignore[operator]
)

# Define sensor for freshness checks
freshness_checks_sensor = build_sensor_for_freshness_checks(
    freshness_checks=all_freshness_checks,
    minimum_interval_seconds=60 * 30,
)

The final_asset asset is at the end of a long-running job. When one of the jobs is running, the freshness check evaluation result shows "Running", and the freshness checks sensor fails with the error mentioned above.

Additionally, if one of the assets fails to materialize in the latest run (eg an upstream dependency fails), the sensor fails to run in exactly the same manner.

Deployment type

Dagster Helm chart

Deployment details

Likely irrelevant deployment details:

  • Jobs are executed using the kubernetes executor
  • Dagster is deployed on Google Kubernetes Engine

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@mr-mcox mr-mcox added the type: bug Something isn't working label May 17, 2024
@mr-mcox mr-mcox changed the title Freshness Sensor does not run Freshness sensor does not run when assets are pending or failed May 17, 2024
@garethbrickman garethbrickman added area: asset-checks Related to Asset Checks area: sensor Related to Sensors and removed area: asset-checks Related to Asset Checks labels May 20, 2024
@dpeng817
Copy link
Contributor

dpeng817 commented May 23, 2024

Should be fixed by this PR: #22013. Once I can get confirmation from a few users that they are no longer running into this, we can close.

@mr-mcox
Copy link
Author

mr-mcox commented May 30, 2024

I can confirm that undesirable behavior has ceased after deploying this change. From my perspective, this issue is resolved - thank you!

@dpeng817
Copy link
Contributor

Closing for now!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: sensor Related to Sensors type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants