Skip to content

Commit

Permalink
pass json blob with occurrence id to buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ceorourke committed May 1, 2024
1 parent 46bd8ee commit 9cc9050
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def push_to_hash(
model: type[models.Model],
filters: dict[str, models.Model | str | int],
field: str,
value: int,
value: str,
) -> None:
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value)
Expand Down
67 changes: 40 additions & 27 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from sentry import eventstore
from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry
from sentry.eventstore.models import Event
from sentry.eventstore.models import Event, GroupEvent
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.group import Group
from sentry.models.grouprulestatus import GroupRuleStatus
from sentry.models.project import Project
Expand All @@ -24,7 +25,7 @@
)
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils import json, metrics
from sentry.utils.safe import safe_execute

logger = logging.getLogger("sentry.rules.delayed_processing")
Expand Down Expand Up @@ -183,25 +184,39 @@ def get_rules_to_fire(
return rules_to_fire


def get_group_id_to_event(
rulegroup_to_events: dict[str, str], project: Project
) -> dict[int, Event]:
group_id_to_event: dict[int, Event] = {}
for rule_group, event_id in rulegroup_to_events.items():
def get_group_to_groupevent(
rulegroup_to_events: dict[str, str], project: Project, group_ids: set[int]
) -> dict[Group, GroupEvent]:
group_to_groupevent: dict[Group, GroupEvent] = {}
groups = Group.objects.filter(id__in=group_ids)
group_id_to_group = {group.id: group for group in groups}
for rule_group, instance_id in rulegroup_to_events.items():
event_data = json.loads(instance_id)
event_id = event_data.get("event_id")
occurrence_id = event_data.get("occurrence_id")
_, group_id = rule_group.split(":")
event = Event(
event_id=event_id,
project_id=project.id,
snuba_data={
"event_id": event_id,
"group_id": group_id,
"project_id": project.id,
},
)
group_id_to_event[int(group_id)] = event
group_id = int(group_id)
if group_id in [group.id for group in groups]:
group = group_id_to_group.get(group_id)
if group:
event = Event(
event_id=event_id,
project_id=project.id,
snuba_data={
"event_id": event_id,
"group_id": group.id,
"project_id": project.id,
},
)
eventstore.backend.bind_nodes([event])
group_event = event.for_group(group)
if occurrence_id:
occurrence = IssueOccurrence.fetch(occurrence_id, project_id=project.id)
if occurrence:
group_event.occurrence_id = occurrence.id

eventstore.backend.bind_nodes(list(group_id_to_event.values()))
return group_id_to_event
group_to_groupevent[group] = group_event
return group_to_groupevent


@redis_buffer_registry.add_handler(BufferHookEvent.FLUSH)
Expand Down Expand Up @@ -253,14 +268,12 @@ def apply_delayed(project_id: int) -> None:
condition_group_results, rule_to_slow_conditions, rules_to_groups
)
# Step 7: Ready, aim, fire!!
group_id_to_event = get_group_id_to_event(rulegroup_to_events, project)

now = timezone.now()
for rule, group_ids in rules_to_fire.items():
frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY
freq_offset = now - timedelta(minutes=frequency)
groups = Group.objects.filter(id__in=group_ids)
for group in groups:
group_to_groupevent = get_group_to_groupevent(rulegroup_to_events, project, group_ids)
for group, groupevent in group_to_groupevent.items():
rule_statuses = bulk_get_rule_status(alert_rules, group, project)
status = rule_statuses[rule.id]
if status.last_active and status.last_active > freq_offset:
Expand All @@ -275,9 +288,9 @@ def apply_delayed(project_id: int) -> None:
if not updated:
return

event = group_id_to_event[group.id]
notification_uuid = str(uuid.uuid4())
rule_fire_history = history.record(rule, group, event.event_id, notification_uuid)
activate_downstream_actions(
rule, event.for_group(group), notification_uuid, rule_fire_history
groupevent = group_to_groupevent[group]
rule_fire_history = history.record(rule, group, groupevent.event_id, notification_uuid)
safe_execute(
activate_downstream_actions, rule, groupevent, notification_uuid, rule_fire_history
)
30 changes: 14 additions & 16 deletions tests/sentry/buffer/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,46 +286,44 @@ def test_enqueue(self):
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=event_id,
value=json.dumps({"event_id": event_id, "occurrence_id": None}),
)
self.buf.push_to_hash(
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group2_id}",
value=event2_id,
value=json.dumps({"event_id": event2_id, "occurrence_id": None}),
)
self.buf.push_to_hash(
model=Project,
filters={"project_id": project_id2},
field=f"{rule2_id}:{group3_id}",
value=event3_id,
value=json.dumps({"event_id": event3_id, "occurrence_id": None}),
)

project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
assert project_ids
project_ids_to_rule_data = self.group_rule_data_by_project_id(self.buf, project_ids)
assert project_ids_to_rule_data[project_id][0].get(f"{rule_id}:{group_id}") == str(event_id)
assert project_ids_to_rule_data[project_id][1].get(f"{rule_id}:{group2_id}") == str(
event2_id
)
assert project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}") == str(
event3_id
)
result = json.loads(project_ids_to_rule_data[project_id][0].get(f"{rule_id}:{group_id}"))
assert result.get("event_id") == event_id
result = json.loads(project_ids_to_rule_data[project_id][1].get(f"{rule_id}:{group2_id}"))
assert result.get("event_id") == event2_id
result = json.loads(project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}"))
assert result.get("event_id") == event3_id

# overwrite the value to event4_id
self.buf.push_to_hash(
model=Project,
filters={"project_id": project_id2},
field=f"{rule2_id}:{group3_id}",
value=event4_id,
value=json.dumps({"event_id": event4_id, "occurrence_id": None}),
)

project_ids_to_rule_data = project_ids_to_rule_data = self.group_rule_data_by_project_id(
self.buf, project_ids
)
assert project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}") == str(
event4_id
)
result = json.loads(project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}"))
assert result.get("event_id") == event4_id

def test_buffer_hook_registry(self):
"""Test that we can add an event to the registry and that the callback is invoked"""
Expand Down Expand Up @@ -366,15 +364,15 @@ def test_delete_batch(self):
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=event_id,
value=json.dumps({"event_id": event_id, "occurrence_id": None}),
)
with freeze_time(one_minute_from_now):
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project2_id)
self.buf.push_to_hash(
model=Project,
filters={"project_id": project2_id},
field=f"{rule2_id}:{group2_id}",
value=event2_id,
value=json.dumps({"event_id": event2_id, "occurrence_id": None}),
)

# retrieve them
Expand Down
38 changes: 20 additions & 18 deletions tests/sentry/rules/processing/test_delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sentry.testutils.cases import APITestCase, TestCase
from sentry.testutils.factories import DEFAULT_EVENT_DATA
from sentry.testutils.helpers.datetime import iso_format
from sentry.utils import json
from tests.snuba.rules.conditions.test_event_frequency import BaseEventFrequencyPercentTest

pytestmark = pytest.mark.sentry_metrics
Expand Down Expand Up @@ -57,12 +58,13 @@ def create_event_frequency_condition(
condition_id = f"sentry.rules.conditions.event_frequency.{id}"
return {"interval": interval, "id": condition_id, "value": value}

def push_to_hash(self, project_id, rule_id, group_id, event_id):
def push_to_hash(self, project_id, rule_id, group_id, event_id=None, occurrence_id=None):
value = json.dumps({"event_id": event_id, "occurrence_id": occurrence_id})
self.redis_buffer.push_to_hash(
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=event_id,
value=value,
)

def setUp(self):
Expand Down Expand Up @@ -179,8 +181,8 @@ def test_apply_delayed_rules_to_fire(self):
assert self.group1
assert self.group2
assert len(rule_fire_histories) == 2
assert rule_fire_histories[0] == (self.rule1.id, self.group1.id)
assert rule_fire_histories[1] == (self.rule2.id, self.group2.id)
assert (self.rule1.id, self.group1.id) in rule_fire_histories
assert (self.rule2.id, self.group2.id) in rule_fire_histories

apply_delayed(self.project_two.id)
rule_fire_histories = RuleFireHistory.objects.filter(
Expand All @@ -192,8 +194,8 @@ def test_apply_delayed_rules_to_fire(self):
assert len(rule_fire_histories) == 2
assert self.group3
assert self.group4
assert rule_fire_histories[0] == (self.rule3.id, self.group3.id)
assert rule_fire_histories[1] == (self.rule4.id, self.group4.id)
assert (self.rule3.id, self.group3.id) in rule_fire_histories
assert (self.rule4.id, self.group4.id) in rule_fire_histories

def test_apply_delayed_same_condition_diff_value(self):
"""
Expand Down Expand Up @@ -221,9 +223,9 @@ def test_apply_delayed_same_condition_diff_value(self):
project=self.project,
).values_list("rule", "group")
assert len(rule_fire_histories) == 3
assert rule_fire_histories[0] == (self.rule1.id, self.group1.id)
assert rule_fire_histories[1] == (self.rule1.id, group5.id)
assert rule_fire_histories[2] == (rule5.id, group5.id)
assert (self.rule1.id, self.group1.id) in rule_fire_histories
assert (self.rule1.id, group5.id) in rule_fire_histories
assert (rule5.id, group5.id) in rule_fire_histories

def test_apply_delayed_same_condition_diff_interval(self):
"""
Expand All @@ -250,8 +252,8 @@ def test_apply_delayed_same_condition_diff_interval(self):
project=self.project,
).values_list("rule", "group")
assert len(rule_fire_histories) == 2
assert rule_fire_histories[0] == (self.rule1.id, self.group1.id)
assert rule_fire_histories[1] == (diff_interval_rule.id, group5.id)
assert (self.rule1.id, self.group1.id) in rule_fire_histories
assert (diff_interval_rule.id, group5.id) in rule_fire_histories

def test_apply_delayed_same_condition_diff_env(self):
"""
Expand Down Expand Up @@ -279,8 +281,8 @@ def test_apply_delayed_same_condition_diff_env(self):
project=self.project,
).values_list("rule", "group")
assert len(rule_fire_histories) == 2
assert rule_fire_histories[0] == (self.rule1.id, self.group1.id)
assert rule_fire_histories[1] == (diff_env_rule.id, group5.id)
assert (self.rule1.id, self.group1.id) in rule_fire_histories
assert (diff_env_rule.id, group5.id) in rule_fire_histories

def test_apply_delayed_two_rules_one_fires(self):
"""
Expand Down Expand Up @@ -313,8 +315,8 @@ def test_apply_delayed_two_rules_one_fires(self):
project=self.project,
).values_list("rule", "group")
assert len(rule_fire_histories) == 2
assert rule_fire_histories[0] == (self.rule1.id, self.group1.id)
assert rule_fire_histories[1] == (self.rule1.id, group5.id)
assert (self.rule1.id, self.group1.id) in rule_fire_histories
assert (self.rule1.id, group5.id) in rule_fire_histories

def test_apply_delayed_action_match_all(self):
"""
Expand Down Expand Up @@ -356,6 +358,6 @@ def test_apply_delayed_action_match_all(self):
project=self.project,
).values_list("rule", "group")
assert len(rule_fire_histories) == 3
assert rule_fire_histories[0] == (self.rule1.id, self.group1.id)
assert rule_fire_histories[1] == (self.rule1.id, group5.id)
assert rule_fire_histories[2] == (two_conditions_match_all_rule.id, group5.id)
assert (self.rule1.id, self.group1.id) in rule_fire_histories
assert (self.rule1.id, group5.id) in rule_fire_histories
assert (two_conditions_match_all_rule.id, group5.id) in rule_fire_histories

0 comments on commit 9cc9050

Please sign in to comment.