-
-
Notifications
You must be signed in to change notification settings - Fork 4k
/
test_tasks.py
197 lines (170 loc) · 6.87 KB
/
test_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from copy import deepcopy
from functools import cached_property
from uuid import uuid4
from arroyo.utils import metrics
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from django.conf import settings
from django.core import mail
from sentry.conf.types.kafka_definition import Topic
from sentry.incidents.action_handlers import (
EmailActionHandler,
generate_incident_trigger_email_context,
)
from sentry.incidents.logic import (
CRITICAL_TRIGGER_LABEL,
create_alert_rule_trigger,
create_alert_rule_trigger_action,
)
from sentry.incidents.models.alert_rule import AlertRuleTriggerAction
from sentry.incidents.models.incident import (
Incident,
IncidentActivity,
IncidentStatus,
IncidentType,
TriggerStatus,
)
from sentry.incidents.utils.constants import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
from sentry.snuba.query_subscriptions.consumer import subscriber_registry
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.datetime import freeze_time
from sentry.testutils.skips import requires_kafka
from sentry.utils import json, kafka_config
from sentry.utils.batching_kafka_consumer import create_topics
pytestmark = [requires_kafka]
@freeze_time()
class HandleSnubaQueryUpdateTest(TestCase):
def setUp(self):
super().setUp()
self.topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS
self.orig_registry = deepcopy(subscriber_registry)
cluster_options = kafka_config.get_kafka_admin_cluster_options(
"default", {"allow.auto.create.topics": "true"}
)
self.admin_client = AdminClient(cluster_options)
topic_defn = kafka_config.get_topic_definition(self.topic)
self.real_topic = topic_defn["real_topic_name"]
self.cluster = topic_defn["cluster"]
create_topics(self.cluster, [self.real_topic])
def tearDown(self):
super().tearDown()
subscriber_registry.clear()
subscriber_registry.update(self.orig_registry)
self.admin_client.delete_topics([self.real_topic])
metrics._metrics_backend = None
@cached_property
def subscription(self):
return self.rule.snuba_query.subscriptions.get()
@cached_property
def rule(self):
with self.tasks():
rule = self.create_alert_rule(
name="some rule",
query="",
aggregate="count()",
time_window=1,
threshold_period=1,
resolve_threshold=10,
)
trigger = create_alert_rule_trigger(rule, CRITICAL_TRIGGER_LABEL, 100)
create_alert_rule_trigger_action(
trigger,
AlertRuleTriggerAction.Type.EMAIL,
AlertRuleTriggerAction.TargetType.USER,
str(self.user.id),
)
return rule
@cached_property
def trigger(self):
return self.rule.alertruletrigger_set.get()
@cached_property
def action(self):
return self.trigger.alertruletriggeraction_set.get()
@cached_property
def producer(self):
conf = {
"bootstrap.servers": settings.KAFKA_CLUSTERS[self.cluster]["common"][
"bootstrap.servers"
],
"session.timeout.ms": 6000,
}
return Producer(conf)
@cached_property
def topic(self):
return uuid4().hex
def run_test(self, consumer):
# Full integration test to ensure that when a subscription receives an update
# the `QuerySubscriptionConsumer` successfully retries the subscription and
# calls the correct callback, which should result in an incident being created.
message = {
"version": 3,
"payload": {
"subscription_id": self.subscription.subscription_id,
"result": {
"data": [{"some_col": 101}],
"meta": [{"name": "count", "type": "UInt64"}],
},
"request": {
"some": "data",
"query": """MATCH (metrics_counters) SELECT sum(value) AS value BY
tags[3] WHERE org_id = 1 AND project_id IN tuple(1) AND metric_id = 16
AND tags[3] IN tuple(13, 4)""",
},
"entity": "metrics_counters",
"timestamp": "2020-01-01T01:23:45.1234",
},
}
self.producer.produce(self.real_topic, json.dumps(message))
self.producer.flush()
def active_incident():
return Incident.objects.filter(
type=IncidentType.ALERT_TRIGGERED.value, alert_rule=self.rule
).exclude(status=IncidentStatus.CLOSED.value)
original_callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE]
def shutdown_callback(*args, **kwargs):
# We want to just exit after the callback so that we can see the result of
# processing.
original_callback(*args, **kwargs)
consumer.signal_shutdown()
subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = shutdown_callback
with self.feature(["organizations:incidents", "organizations:performance-view"]):
assert not active_incident().exists()
with self.tasks(), self.capture_on_commit_callbacks(execute=True):
consumer.run()
assert active_incident().exists()
assert len(mail.outbox) == 1
handler = EmailActionHandler(self.action, active_incident().get(), self.project)
incident_activity = (
IncidentActivity.objects.filter(incident=handler.incident).order_by("-id").first()
)
message_builder = handler.build_message(
generate_incident_trigger_email_context(
handler.project,
handler.incident,
handler.action.alert_rule_trigger,
TriggerStatus.ACTIVE,
IncidentStatus.CRITICAL,
notification_uuid=str(incident_activity.notification_uuid),
),
TriggerStatus.ACTIVE,
self.user.id,
)
out = mail.outbox[0]
assert isinstance(out, mail.EmailMultiAlternatives)
assert out.to == [self.user.email]
assert out.subject == message_builder.subject
built_message = message_builder.build(self.user.email)
assert out.body == built_message.body
def test_arroyo(self):
from sentry.consumers import get_stream_processor
consumer = get_stream_processor(
"metrics-subscription-results",
consumer_args=["--max-batch-size=1", "--max-batch-time-ms=1000", "--processes=1"],
topic=None,
cluster=None,
group_id="hi",
strict_offset_reset=True,
auto_offset_reset="earliest",
enforce_schema=True,
)
self.run_test(consumer)