Skip to content

Commit

Permalink
ref(delayed rules): Add instrumentation (#70693)
Browse files Browse the repository at this point in the history
Add instrumentation and logging to the delayed rule processor to measure
how long the bulkier functions are taking and how many rules and groups
we're processing.

Closes https://getsentry.atlassian.net/browse/ALRT-19 and
getsentry/team-core-product-foundations#308 (a
dupe) as a follow up to
#69830 (review)
  • Loading branch information
ceorourke committed May 13, 2024
1 parent c81dd1a commit 9ff054d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 32 deletions.
66 changes: 35 additions & 31 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
condition_groups = get_condition_groups(alert_rules, rules_to_groups)
# Step 5: Instantiate each unique condition, and evaluate the relevant
# group_ids that apply for that condition
condition_group_results = get_condition_group_results(condition_groups, project)
with metrics.timer("delayed_processing.get_condition_group_results.duration"):
condition_group_results = get_condition_group_results(condition_groups, project)
# Step 6: For each rule and group applying to that rule, check if the group
# meets the conditions of the rule (basically doing BaseEventFrequencyCondition.passes)
rule_to_slow_conditions = get_rule_to_slow_conditions(alert_rules)
Expand All @@ -363,39 +364,42 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
now = datetime.now(tz=timezone.utc)
parsed_rulegroup_to_event_data = parse_rulegroup_to_event_data(rulegroup_to_event_data)

for rule, group_ids in rules_to_fire.items():
frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY
freq_offset = now - timedelta(minutes=frequency)
group_to_groupevent = get_group_to_groupevent(
parsed_rulegroup_to_event_data, project.id, group_ids
)
for group, groupevent in group_to_groupevent.items():
rule_statuses = bulk_get_rule_status(alert_rules, group, project)
status = rule_statuses[rule.id]
if status.last_active and status.last_active > freq_offset:
logger.info(
"delayed_processing.last_active",
extra={"last_active": status.last_active, "freq_offset": freq_offset},
with metrics.timer("delayed_processing.fire_rules.duration"):
for rule, group_ids in rules_to_fire.items():
frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY
freq_offset = now - timedelta(minutes=frequency)
group_to_groupevent = get_group_to_groupevent(
parsed_rulegroup_to_event_data, project.id, group_ids
)
for group, groupevent in group_to_groupevent.items():
rule_statuses = bulk_get_rule_status(alert_rules, group, project)
status = rule_statuses[rule.id]
if status.last_active and status.last_active > freq_offset:
logger.info(
"delayed_processing.last_active",
extra={"last_active": status.last_active, "freq_offset": freq_offset},
)
return

updated = (
GroupRuleStatus.objects.filter(id=status.id)
.exclude(last_active__gt=freq_offset)
.update(last_active=now)
)
return

updated = (
GroupRuleStatus.objects.filter(id=status.id)
.exclude(last_active__gt=freq_offset)
.update(last_active=now)
)
if not updated:
logger.info("delayed_processing.not_updated", extra={"status_id": status.id})
return

if not updated:
logger.info("delayed_processing.not_updated", extra={"status_id": status.id})
return

notification_uuid = str(uuid.uuid4())
groupevent = group_to_groupevent[group]
rule_fire_history = history.record(rule, group, groupevent.event_id, notification_uuid)
for callback, futures in activate_downstream_actions(
rule, groupevent, notification_uuid, rule_fire_history
).values():
safe_execute(callback, groupevent, futures, _with_transaction=False)
notification_uuid = str(uuid.uuid4())
groupevent = group_to_groupevent[group]
rule_fire_history = history.record(
rule, group, groupevent.event_id, notification_uuid
)
for callback, futures in activate_downstream_actions(
rule, groupevent, notification_uuid, rule_fire_history
).values():
safe_execute(callback, groupevent, futures, _with_transaction=False)

# Step 8: Clean up Redis buffer data
hashes_to_delete = [
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/rules/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sentry.rules.conditions.event_frequency import EventFrequencyConditionData
from sentry.rules.filters.base import EventFilter
from sentry.types.rules import RuleFuture
from sentry.utils import json
from sentry.utils import json, metrics
from sentry.utils.hashlib import hash_values
from sentry.utils.safe import safe_execute

Expand Down Expand Up @@ -275,6 +275,7 @@ def enqueue_rule(self, rule: Rule) -> None:
field=f"{rule.id}:{self.group.id}",
value=value,
)
metrics.incr("delayed_rule.group_added")

def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None:
"""
Expand Down

0 comments on commit 9ff054d

Please sign in to comment.