-
-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(rules): Fire delayed rules #69830
Changes from all commits
724ce1e
cb0ba3f
9607fa0
8f142d4
d3a3e58
292d0f6
1dbc955
5e15399
6684dee
067f155
17fdbb1
22dfa35
3447659
830c3f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,34 @@ | ||
import logging | ||
import uuid | ||
from collections import defaultdict | ||
from datetime import timedelta | ||
from typing import DefaultDict, NamedTuple | ||
|
||
from django.utils import timezone | ||
|
||
from sentry import eventstore | ||
from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry | ||
from sentry.eventstore.models import Event, GroupEvent | ||
from sentry.issues.issue_occurrence import IssueOccurrence | ||
from sentry.models.group import Group | ||
from sentry.models.grouprulestatus import GroupRuleStatus | ||
from sentry.models.project import Project | ||
from sentry.models.rule import Rule | ||
from sentry.rules import rules | ||
from sentry.rules import history, rules | ||
from sentry.rules.conditions.event_frequency import ( | ||
BaseEventFrequencyCondition, | ||
ComparisonType, | ||
EventFrequencyConditionData, | ||
) | ||
from sentry.rules.processing.processor import is_condition_slow, split_conditions_and_filters | ||
from sentry.rules.processing.processor import ( | ||
activate_downstream_actions, | ||
bulk_get_rule_status, | ||
is_condition_slow, | ||
split_conditions_and_filters, | ||
) | ||
from sentry.silo.base import SiloMode | ||
from sentry.tasks.base import instrumented_task | ||
from sentry.utils import metrics | ||
from sentry.utils import json, metrics | ||
from sentry.utils.safe import safe_execute | ||
|
||
logger = logging.getLogger("sentry.rules.delayed_processing") | ||
|
@@ -53,9 +67,9 @@ def get_slow_conditions(rule: Rule) -> list[EventFrequencyConditionData]: | |
return slow_conditions # type: ignore[return-value] | ||
|
||
|
||
def get_rules_to_groups(rulegroup_to_events: dict[str, str]) -> DefaultDict[int, set[int]]: | ||
def get_rules_to_groups(rulegroup_to_event_data: dict[str, str]) -> DefaultDict[int, set[int]]: | ||
rules_to_groups: DefaultDict[int, set[int]] = defaultdict(set) | ||
for rule_group in rulegroup_to_events.keys(): | ||
for rule_group in rulegroup_to_event_data.keys(): | ||
rule_id, group_id = rule_group.split(":") | ||
rules_to_groups[int(rule_id)].add(int(group_id)) | ||
|
||
|
@@ -177,6 +191,53 @@ def get_rules_to_fire( | |
return rules_to_fire | ||
|
||
|
||
def parse_rulegroup_to_event_data( | ||
rulegroup_to_event_data: dict[str, str] | ||
) -> dict[tuple[str, str], dict[str, str]]: | ||
Comment on lines
+194
to
+196
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be nice to define a dict type here for the value, but could be a follow up. |
||
parsed_rulegroup_to_event_data: dict[tuple[str, str], dict[str, str]] = {} | ||
|
||
for rule_group, instance_data in rulegroup_to_event_data.items(): | ||
event_data = json.loads(instance_data) | ||
rule_id, group_id = rule_group.split(":") | ||
parsed_rulegroup_to_event_data[(rule_id, group_id)] = event_data | ||
return parsed_rulegroup_to_event_data | ||
|
||
|
||
def get_group_to_groupevent( | ||
parsed_rulegroup_to_event_data: dict[tuple[str, str], dict[str, str]], | ||
project_id: int, | ||
group_ids: set[int], | ||
) -> dict[Group, GroupEvent]: | ||
group_to_groupevent: dict[Group, GroupEvent] = {} | ||
groups = Group.objects.filter(id__in=group_ids) | ||
group_id_to_group = {group.id: group for group in groups} | ||
for rule_group, instance_data in parsed_rulegroup_to_event_data.items(): | ||
event_id = instance_data.get("event_id") | ||
occurrence_id = instance_data.get("occurrence_id") | ||
group_id = rule_group[1] | ||
group = group_id_to_group.get(int(group_id)) | ||
if group and event_id: | ||
# TODO: fetch events and occurrences in batches | ||
event = Event( | ||
event_id=event_id, | ||
project_id=project_id, | ||
snuba_data={ | ||
"event_id": event_id, | ||
"group_id": group.id, | ||
"project_id": project_id, | ||
}, | ||
) | ||
eventstore.backend.bind_nodes([event]) | ||
group_event = event.for_group(group) | ||
if occurrence_id: | ||
occurrence = IssueOccurrence.fetch(occurrence_id, project_id=project_id) | ||
if occurrence: | ||
group_event.occurrence = occurrence | ||
|
||
group_to_groupevent[group] = group_event | ||
return group_to_groupevent | ||
|
||
|
||
@redis_buffer_registry.add_handler(BufferHookEvent.FLUSH) | ||
def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: | ||
with metrics.timer("delayed_processing.process_all_conditions.duration"): | ||
|
@@ -195,17 +256,16 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: | |
time_limit=60, # 1 minute | ||
silo_mode=SiloMode.REGION, | ||
) | ||
def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None: | ||
# XXX(CEO) this is a temporary return value! | ||
def apply_delayed(project_id: int) -> None: | ||
""" | ||
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass | ||
""" | ||
# STEP 1: Fetch the rulegroup_to_events mapping for the project from redis | ||
# STEP 1: Fetch the rulegroup_to_event_data mapping for the project from redis | ||
project = Project.objects.get_from_cache(id=project_id) | ||
buffer = RedisBuffer() | ||
rulegroup_to_events = buffer.get_hash(model=Project, field={"project_id": project.id}) | ||
rulegroup_to_event_data = buffer.get_hash(model=Project, field={"project_id": project.id}) | ||
# STEP 2: Map each rule to the groups that must be checked for that rule. | ||
rules_to_groups = get_rules_to_groups(rulegroup_to_events) | ||
rules_to_groups = get_rules_to_groups(rulegroup_to_event_data) | ||
|
||
# STEP 3: Fetch the Rule models we need to check | ||
alert_rules = Rule.objects.filter(id__in=list(rules_to_groups.keys())) | ||
|
@@ -226,5 +286,36 @@ def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None: | |
rules_to_fire = get_rules_to_fire( | ||
condition_group_results, rule_to_slow_conditions, rules_to_groups | ||
) | ||
return rules_to_fire | ||
return None | ||
# Step 7: Fire the rule's actions | ||
now = timezone.now() | ||
# TODO: check rulesnooze table again before firing | ||
parsed_rulegroup_to_event_data = parse_rulegroup_to_event_data(rulegroup_to_event_data) | ||
|
||
for rule, group_ids in rules_to_fire.items(): | ||
frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY | ||
freq_offset = now - timedelta(minutes=frequency) | ||
group_to_groupevent = get_group_to_groupevent( | ||
parsed_rulegroup_to_event_data, project.id, group_ids | ||
) | ||
for group, groupevent in group_to_groupevent.items(): | ||
rule_statuses = bulk_get_rule_status(alert_rules, group, project) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this loop be inverted, so that we process each group, fetch all the statuses for it, and then fire it for each rule? I was going to say maybe it doesn't make sense to fetch all the rules at once in this loop, but it looks like we cache them, so this should be ok for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if I did that I'd end up fetching events and occurrences for groups for rules that didn't necessarily fire There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gonna merge for now cause I need to rebase my other PR against this and I've made some follow up tickets for the dangling stuff - if this needs to be added later too I can. |
||
status = rule_statuses[rule.id] | ||
if status.last_active and status.last_active > freq_offset: | ||
return | ||
|
||
updated = ( | ||
GroupRuleStatus.objects.filter(id=status.id) | ||
.exclude(last_active__gt=freq_offset) | ||
.update(last_active=now) | ||
) | ||
|
||
if not updated: | ||
return | ||
|
||
notification_uuid = str(uuid.uuid4()) | ||
groupevent = group_to_groupevent[group] | ||
rule_fire_history = history.record(rule, group, groupevent.event_id, notification_uuid) | ||
for callback, futures in activate_downstream_actions( | ||
rule, groupevent, notification_uuid, rule_fire_history | ||
).values(): | ||
safe_execute(callback, groupevent, futures, _with_transaction=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanna double check this, but do we need to worry about snoozed rules at all? I realize that we only add to the buffer if the rule is not snoozed in the first place, and at worst we have a minute delay so theoretically an action could be fired in a tiny bit >1min after a snooze.
Is it worth considering doing another snooze check in delayed processing?
sentry/src/sentry/rules/processing/processor.py
Lines 340 to 346 in 1730d73
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pretty minor if we fire a rule slightly after it's snoozed, but probably makes sense to recheck. I'd put that in a separate pr to keep this simple