-
-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(rules): Fire delayed rules #69830
Changes from 10 commits
724ce1e
cb0ba3f
9607fa0
8f142d4
d3a3e58
292d0f6
1dbc955
5e15399
6684dee
067f155
17fdbb1
22dfa35
3447659
830c3f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,34 @@ | ||
import logging | ||
import uuid | ||
from collections import defaultdict | ||
from datetime import timedelta | ||
from typing import DefaultDict, NamedTuple | ||
|
||
from django.utils import timezone | ||
|
||
from sentry import eventstore | ||
from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry | ||
from sentry.eventstore.models import Event, GroupEvent | ||
from sentry.issues.issue_occurrence import IssueOccurrence | ||
from sentry.models.group import Group | ||
from sentry.models.grouprulestatus import GroupRuleStatus | ||
from sentry.models.project import Project | ||
from sentry.models.rule import Rule | ||
from sentry.rules import rules | ||
from sentry.rules import history, rules | ||
from sentry.rules.conditions.event_frequency import ( | ||
BaseEventFrequencyCondition, | ||
ComparisonType, | ||
EventFrequencyConditionData, | ||
) | ||
from sentry.rules.processing.processor import is_condition_slow, split_conditions_and_filters | ||
from sentry.rules.processing.processor import ( | ||
activate_downstream_actions, | ||
bulk_get_rule_status, | ||
is_condition_slow, | ||
split_conditions_and_filters, | ||
) | ||
from sentry.silo.base import SiloMode | ||
from sentry.tasks.base import instrumented_task | ||
from sentry.utils import metrics | ||
from sentry.utils import json, metrics | ||
from sentry.utils.safe import safe_execute | ||
|
||
logger = logging.getLogger("sentry.rules.delayed_processing") | ||
|
@@ -177,6 +191,41 @@ def get_rules_to_fire( | |
return rules_to_fire | ||
|
||
|
||
def get_group_to_groupevent( | ||
rulegroup_to_events: dict[str, str], project_id: int, group_ids: set[int] | ||
ceorourke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> dict[Group, GroupEvent]: | ||
group_to_groupevent: dict[Group, GroupEvent] = {} | ||
groups = Group.objects.filter(id__in=group_ids) | ||
group_id_to_group = {group.id: group for group in groups} | ||
for rule_group, instance_id in rulegroup_to_events.items(): | ||
ceorourke marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to parse these in the function that fetches There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh I hadn't checked and assumed there was a function for this. It's nbd, but it might be better to transform it separately and just pass it in. Feel free to ignore though |
||
event_data = json.loads(instance_id) | ||
event_id = event_data.get("event_id") | ||
occurrence_id = event_data.get("occurrence_id") | ||
_, group_id = rule_group.split(":") | ||
group_id = int(group_id) | ||
if group_id in [group.id for group in groups]: | ||
ceorourke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
group = group_id_to_group.get(group_id) | ||
if group: | ||
event = Event( | ||
event_id=event_id, | ||
project_id=project_id, | ||
snuba_data={ | ||
"event_id": event_id, | ||
"group_id": group.id, | ||
"project_id": project_id, | ||
}, | ||
) | ||
eventstore.backend.bind_nodes([event]) | ||
group_event = event.for_group(group) | ||
if occurrence_id: | ||
occurrence = IssueOccurrence.fetch(occurrence_id, project_id=project_id) | ||
if occurrence: | ||
group_event.occurrence_id = occurrence.id | ||
|
||
group_to_groupevent[group] = group_event | ||
return group_to_groupevent | ||
|
||
|
||
@redis_buffer_registry.add_handler(BufferHookEvent.FLUSH) | ||
def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: | ||
with metrics.timer("delayed_processing.process_all_conditions.duration"): | ||
|
@@ -195,8 +244,7 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: | |
time_limit=60, # 1 minute | ||
silo_mode=SiloMode.REGION, | ||
) | ||
def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None: | ||
# XXX(CEO) this is a temporary return value! | ||
def apply_delayed(project_id: int) -> None: | ||
""" | ||
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass | ||
""" | ||
|
@@ -226,5 +274,30 @@ def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None: | |
rules_to_fire = get_rules_to_fire( | ||
condition_group_results, rule_to_slow_conditions, rules_to_groups | ||
) | ||
return rules_to_fire | ||
return None | ||
# Step 7: Fire the rule's actions | ||
now = timezone.now() | ||
for rule, group_ids in rules_to_fire.items(): | ||
frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY | ||
freq_offset = now - timedelta(minutes=frequency) | ||
group_to_groupevent = get_group_to_groupevent(rulegroup_to_events, project.id, group_ids) | ||
for group, groupevent in group_to_groupevent.items(): | ||
rule_statuses = bulk_get_rule_status(alert_rules, group, project) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this loop be inverted, so that we process each group, fetch all the statuses for it, and then fire it for each rule? I was going to say maybe it doesn't make sense to fetch all the rules at once in this loop, but it looks like we cache them, so this should be ok for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if I did that I'd end up fetching events and occurrences for groups for rules that didn't necessarily fire There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gonna merge for now cause I need to rebase my other PR against this and I've made some follow up tickets for the dangling stuff - if this needs to be added later too I can. |
||
status = rule_statuses[rule.id] | ||
if status.last_active and status.last_active > freq_offset: | ||
return | ||
|
||
updated = ( | ||
GroupRuleStatus.objects.filter(id=status.id) | ||
.exclude(last_active__gt=freq_offset) | ||
.update(last_active=now) | ||
) | ||
|
||
if not updated: | ||
return | ||
|
||
notification_uuid = str(uuid.uuid4()) | ||
groupevent = group_to_groupevent[group] | ||
rule_fire_history = history.record(rule, group, groupevent.event_id, notification_uuid) | ||
safe_execute( | ||
activate_downstream_actions, rule, groupevent, notification_uuid, rule_fire_history | ||
) | ||
ceorourke marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanna double check this, but do we need to worry about snoozed rules at all? I realize that we only add to the buffer if the rule is not snoozed in the first place, and at worst we have a minute delay so theoretically an action could be fired in a tiny bit >1min after a snooze.
Is it worth considering doing another snooze check in delayed processing?
sentry/src/sentry/rules/processing/processor.py
Lines 340 to 346 in 1730d73
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pretty minor if we fire a rule slightly after it's snoozed, but probably makes sense to recheck. I'd put that in a separate pr to keep this simple