Skip to content

Commit

Permalink
chore: refactor trigger name
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Mar 20, 2024
1 parent 8feb0f9 commit 46201d3
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def air_quality_data(
df = pd.DataFrame(
luchtmeetnet_api.request("measurements", context=context, request_params=rp)
)
# We don't want to keep retrying for a station that is rasing code 500
# We don't want to keep retrying for a station that is raising code 500
except HTTPError as e:
if e.response.status_code == 500:
raise Failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@
from luchtmeetnet_ingestion.assets import air_quality_data


# Currently, not supported for partitioned assets
# Currently not supported on a per-partition basis, but can do this for entire asset
# https://github.com/dagster-io/dagster/discussions/17194
#
# Issue for adding per-partition checks here:
# https://github.com/dagster-io/dagster/issues/17005
@asset_check(
asset=air_quality_data,
)
def values_above_zero(air_quality_data: pd.DataFrame):
check = air_quality_data.min()["value"] >= 0
return AssetCheckResult(passed=bool(check))


# Abuse asset checks to add hooks for assets
# https://github.com/dagster-io/dagster/issues/20471
81 changes: 60 additions & 21 deletions shared/dagster_utils/src/dagster_utils/factories/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,50 @@ def gcp_metric_job_success_hook_factory(
)()


class GcpMetricLabels:
def __init__(self):
self.__labels = {}

def add(self, key: str, value: str):
self.__labels[key] = value
return self

@property
def labels(self):
return self.__labels


def parse_tags(tags: typing.Mapping[str, str]) -> typing.Iterator[typing.Dict[str, str]]:
for k, v in tags.items():
if k == "dagster/partition":
yield {"partition": v}
elif k in ["dagster/schedule_name", "dagster/sensor_name", "dagster/backfill"]:
yield {"trigger_type": k.split("/")[-1], "trigger_name": v}
else:
continue


def generate_gcp_metric_labels(context: HookContext, labels: GcpMetricLabels) -> GcpMetricLabels:
run = context.instance.get_run_by_id(context.run_id)
context.log.debug(run.tags)
labels.add("location", run.external_job_origin.location_name)
for tag_dict in parse_tags(run.tags):
for k, v in tag_dict.items():
labels.add(k, v)
return labels


def post_metric(context: HookContext, value: int, labels: typing.Dict[str, str]):
_labels = generate_gcp_metric_labels(context, GcpMetricLabels())
for k, v in labels.items():
_labels.add(k, v)
context.resources.gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": value},
metric_labels=_labels.labels,
)


class GcpMetricJobSuccessHookFactory(DagsterObjectFactory):
def __init__(
self,
Expand All @@ -28,33 +72,21 @@ def __init__(
self.gcp_resource_name = gcp_resource_name

def __call__(self) -> typing.Callable:
def post_metric(context: HookContext, value: int):
run = context.instance.get_run_by_id(context.run_id)
labels = {
"job_name": context.job_name,
"run_id": context.run_id,
"location": run.external_job_origin.location_name,
}
tag_list = ["dagster/backfill", "dagster/partition", "dagster/schedule_name"]
context.log.debug(run.tags)
for tag in tag_list:
tag_name = tag.replace("/", "_")
if run.tags.get(tag) is not None:
labels[tag_name] = run.tags.get(tag)
context.resources.gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": value},
metric_labels=labels,
)

if self.on_success:

@success_hook(
name=self.name,
required_resource_keys={self.gcp_resource_name},
)
def _function(context: HookContext):
post_metric(context, 1)
post_metric(
context,
1,
{
"job_name": context.job_name,
"run_id": context.run_id,
},
)

else:

Expand All @@ -63,6 +95,13 @@ def _function(context: HookContext):
required_resource_keys={self.gcp_resource_name},
)
def _function(context: HookContext):
post_metric(context, 0)
post_metric(
context,
0,
{
"job_name": context.job_name,
"run_id": context.run_id,
},
)

return _function

0 comments on commit 46201d3

Please sign in to comment.