Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rules): Fire delayed rules #69830

Merged
merged 14 commits into from
May 2, 2024
2 changes: 1 addition & 1 deletion src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def push_to_hash(
model: type[models.Model],
filters: dict[str, models.Model | str | int],
field: str,
value: int,
value: str,
) -> None:
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value)
Expand Down
115 changes: 103 additions & 12 deletions src/sentry/rules/processing/delayed_processing.py
Copy link
Member

@schew2381 schew2381 May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanna double check this, but do we need to worry about snoozed rules at all? I realize that we only add to the buffer if the rule is not snoozed in the first place, and at worst we have a minute delay so theoretically an action could be fired in a tiny bit >1min after a snooze.

Is it worth considering doing another snooze check in delayed processing?

snoozed_rules = RuleSnooze.objects.filter(rule__in=rules, user_id=None).values_list(
"rule", flat=True
)
rule_statuses = bulk_get_rule_status(rules, self.group)
for rule in rules:
if rule.id not in snoozed_rules:
self.apply_rule(rule, rule_statuses[rule.id])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pretty minor if we fire a rule slightly after it's snoozed, but probably makes sense to recheck. I'd put that in a separate pr to keep this simple

Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
import logging
import uuid
from collections import defaultdict
from datetime import timedelta
from typing import DefaultDict, NamedTuple

from django.utils import timezone

from sentry import eventstore
from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry
from sentry.eventstore.models import Event, GroupEvent
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.group import Group
from sentry.models.grouprulestatus import GroupRuleStatus
from sentry.models.project import Project
from sentry.models.rule import Rule
from sentry.rules import rules
from sentry.rules import history, rules
from sentry.rules.conditions.event_frequency import (
BaseEventFrequencyCondition,
ComparisonType,
EventFrequencyConditionData,
)
from sentry.rules.processing.processor import is_condition_slow, split_conditions_and_filters
from sentry.rules.processing.processor import (
activate_downstream_actions,
bulk_get_rule_status,
is_condition_slow,
split_conditions_and_filters,
)
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils import json, metrics
from sentry.utils.safe import safe_execute

logger = logging.getLogger("sentry.rules.delayed_processing")
Expand Down Expand Up @@ -53,9 +67,9 @@ def get_slow_conditions(rule: Rule) -> list[EventFrequencyConditionData]:
return slow_conditions # type: ignore[return-value]


def get_rules_to_groups(rulegroup_to_events: dict[str, str]) -> DefaultDict[int, set[int]]:
def get_rules_to_groups(rulegroup_to_event_data: dict[str, str]) -> DefaultDict[int, set[int]]:
rules_to_groups: DefaultDict[int, set[int]] = defaultdict(set)
for rule_group in rulegroup_to_events.keys():
for rule_group in rulegroup_to_event_data.keys():
rule_id, group_id = rule_group.split(":")
rules_to_groups[int(rule_id)].add(int(group_id))

Expand Down Expand Up @@ -177,6 +191,53 @@ def get_rules_to_fire(
return rules_to_fire


def parse_rulegroup_to_event_data(
rulegroup_to_event_data: dict[str, str]
) -> dict[tuple[str, str], dict[str, str]]:
Comment on lines +194 to +196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to define a dict type here for the value, but could be a follow up.

parsed_rulegroup_to_event_data: dict[tuple[str, str], dict[str, str]] = {}

for rule_group, instance_data in rulegroup_to_event_data.items():
event_data = json.loads(instance_data)
rule_id, group_id = rule_group.split(":")
parsed_rulegroup_to_event_data[(rule_id, group_id)] = event_data
return parsed_rulegroup_to_event_data


def get_group_to_groupevent(
parsed_rulegroup_to_event_data: dict[tuple[str, str], dict[str, str]],
project_id: int,
group_ids: set[int],
) -> dict[Group, GroupEvent]:
group_to_groupevent: dict[Group, GroupEvent] = {}
groups = Group.objects.filter(id__in=group_ids)
group_id_to_group = {group.id: group for group in groups}
for rule_group, instance_data in parsed_rulegroup_to_event_data.items():
event_id = instance_data.get("event_id")
occurrence_id = instance_data.get("occurrence_id")
group_id = rule_group[1]
group = group_id_to_group.get(int(group_id))
if group and event_id:
# TODO: fetch events and occurrences in batches
event = Event(
event_id=event_id,
project_id=project_id,
snuba_data={
"event_id": event_id,
"group_id": group.id,
"project_id": project_id,
},
)
eventstore.backend.bind_nodes([event])
group_event = event.for_group(group)
if occurrence_id:
occurrence = IssueOccurrence.fetch(occurrence_id, project_id=project_id)
if occurrence:
group_event.occurrence = occurrence

group_to_groupevent[group] = group_event
return group_to_groupevent


@redis_buffer_registry.add_handler(BufferHookEvent.FLUSH)
def process_delayed_alert_conditions(buffer: RedisBuffer) -> None:
with metrics.timer("delayed_processing.process_all_conditions.duration"):
Expand All @@ -195,17 +256,16 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None:
time_limit=60, # 1 minute
silo_mode=SiloMode.REGION,
)
def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None:
# XXX(CEO) this is a temporary return value!
def apply_delayed(project_id: int) -> None:
"""
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
"""
# STEP 1: Fetch the rulegroup_to_events mapping for the project from redis
# STEP 1: Fetch the rulegroup_to_event_data mapping for the project from redis
project = Project.objects.get_from_cache(id=project_id)
buffer = RedisBuffer()
rulegroup_to_events = buffer.get_hash(model=Project, field={"project_id": project.id})
rulegroup_to_event_data = buffer.get_hash(model=Project, field={"project_id": project.id})
# STEP 2: Map each rule to the groups that must be checked for that rule.
rules_to_groups = get_rules_to_groups(rulegroup_to_events)
rules_to_groups = get_rules_to_groups(rulegroup_to_event_data)

# STEP 3: Fetch the Rule models we need to check
alert_rules = Rule.objects.filter(id__in=list(rules_to_groups.keys()))
Expand All @@ -226,5 +286,36 @@ def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None:
rules_to_fire = get_rules_to_fire(
condition_group_results, rule_to_slow_conditions, rules_to_groups
)
return rules_to_fire
return None
# Step 7: Fire the rule's actions
now = timezone.now()
# TODO: check rulesnooze table again before firing
parsed_rulegroup_to_event_data = parse_rulegroup_to_event_data(rulegroup_to_event_data)

for rule, group_ids in rules_to_fire.items():
frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY
freq_offset = now - timedelta(minutes=frequency)
group_to_groupevent = get_group_to_groupevent(
parsed_rulegroup_to_event_data, project.id, group_ids
)
for group, groupevent in group_to_groupevent.items():
rule_statuses = bulk_get_rule_status(alert_rules, group, project)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this loop be inverted, so that we process each group, fetch all the statuses for it, and then fire it for each rule?

I was going to say maybe it doesn't make sense to fetch all the rules at once in this loop, but it looks like we cache them, so this should be ok for now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if I did that I'd end up fetching events and occurrences for groups for rules that didn't necessarily fire

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna merge for now cause I need to rebase my other PR against this and I've made some follow up tickets for the dangling stuff - if this needs to be added later too I can.

status = rule_statuses[rule.id]
if status.last_active and status.last_active > freq_offset:
return

updated = (
GroupRuleStatus.objects.filter(id=status.id)
.exclude(last_active__gt=freq_offset)
.update(last_active=now)
)

if not updated:
return

notification_uuid = str(uuid.uuid4())
groupevent = group_to_groupevent[group]
rule_fire_history = history.record(rule, group, groupevent.event_id, notification_uuid)
for callback, futures in activate_downstream_actions(
rule, groupevent, notification_uuid, rule_fire_history
).values():
safe_execute(callback, groupevent, futures, _with_transaction=False)
9 changes: 6 additions & 3 deletions src/sentry/rules/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sentry.models.environment import Environment
from sentry.models.group import Group
from sentry.models.grouprulestatus import GroupRuleStatus
from sentry.models.project import Project
from sentry.models.rule import Rule
from sentry.models.rulefirehistory import RuleFireHistory
from sentry.models.rulesnooze import RuleSnooze
Expand Down Expand Up @@ -82,7 +83,9 @@ def build_rule_status_cache_key(rule_id: int, group_id: int) -> str:
return "grouprulestatus:1:%s" % hash_values([group_id, rule_id])


def bulk_get_rule_status(rules: Sequence[Rule], group: Group) -> Mapping[int, GroupRuleStatus]:
def bulk_get_rule_status(
rules: Sequence[Rule], group: Group, project: Project
) -> Mapping[int, GroupRuleStatus]:
keys = [build_rule_status_cache_key(rule.id, group.id) for rule in rules]
cache_results: Mapping[str, GroupRuleStatus] = cache.get_many(keys)
missing_rule_ids: set[int] = set()
Expand All @@ -109,7 +112,7 @@ def bulk_get_rule_status(rules: Sequence[Rule], group: Group) -> Mapping[int, Gr
# might be created between when we queried above and attempt to create the rows now.
GroupRuleStatus.objects.bulk_create(
[
GroupRuleStatus(rule_id=rule_id, group=group, project=group.project)
GroupRuleStatus(rule_id=rule_id, group=group, project=project)
for rule_id in missing_rule_ids
],
ignore_conflicts=True,
Expand Down Expand Up @@ -340,7 +343,7 @@ def apply(
snoozed_rules = RuleSnooze.objects.filter(rule__in=rules, user_id=None).values_list(
"rule", flat=True
)
rule_statuses = bulk_get_rule_status(rules, self.group)
rule_statuses = bulk_get_rule_status(rules, self.group, self.project)
for rule in rules:
if rule.id not in snoozed_rules:
self.apply_rule(rule, rule_statuses[rule.id])
Expand Down
30 changes: 14 additions & 16 deletions tests/sentry/buffer/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,46 +286,44 @@ def test_enqueue(self):
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=event_id,
value=json.dumps({"event_id": event_id, "occurrence_id": None}),
)
self.buf.push_to_hash(
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group2_id}",
value=event2_id,
value=json.dumps({"event_id": event2_id, "occurrence_id": None}),
)
self.buf.push_to_hash(
model=Project,
filters={"project_id": project_id2},
field=f"{rule2_id}:{group3_id}",
value=event3_id,
value=json.dumps({"event_id": event3_id, "occurrence_id": None}),
)

project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
assert project_ids
project_ids_to_rule_data = self.group_rule_data_by_project_id(self.buf, project_ids)
assert project_ids_to_rule_data[project_id][0].get(f"{rule_id}:{group_id}") == str(event_id)
assert project_ids_to_rule_data[project_id][1].get(f"{rule_id}:{group2_id}") == str(
event2_id
)
assert project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}") == str(
event3_id
)
result = json.loads(project_ids_to_rule_data[project_id][0].get(f"{rule_id}:{group_id}"))
assert result.get("event_id") == event_id
result = json.loads(project_ids_to_rule_data[project_id][1].get(f"{rule_id}:{group2_id}"))
assert result.get("event_id") == event2_id
result = json.loads(project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}"))
assert result.get("event_id") == event3_id

# overwrite the value to event4_id
self.buf.push_to_hash(
model=Project,
filters={"project_id": project_id2},
field=f"{rule2_id}:{group3_id}",
value=event4_id,
value=json.dumps({"event_id": event4_id, "occurrence_id": None}),
)

project_ids_to_rule_data = project_ids_to_rule_data = self.group_rule_data_by_project_id(
self.buf, project_ids
)
assert project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}") == str(
event4_id
)
result = json.loads(project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}"))
assert result.get("event_id") == event4_id

def test_buffer_hook_registry(self):
"""Test that we can add an event to the registry and that the callback is invoked"""
Expand Down Expand Up @@ -366,15 +364,15 @@ def test_delete_batch(self):
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=event_id,
value=json.dumps({"event_id": event_id, "occurrence_id": None}),
)
with freeze_time(one_minute_from_now):
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project2_id)
self.buf.push_to_hash(
model=Project,
filters={"project_id": project2_id},
field=f"{rule2_id}:{group2_id}",
value=event2_id,
value=json.dumps({"event_id": event2_id, "occurrence_id": None}),
)

# retrieve them
Expand Down