/
configuration.py
189 lines (152 loc) · 7.46 KB
/
configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# Note: It must be possible to import this module directly without having to
# initialize Sentry. I.e., opening a bare python shell and typing `import
# sentry.sentry_metrics.configuration` should work.
#
# If not, the parallel indexer breaks.
from collections.abc import Mapping, MutableMapping
from dataclasses import dataclass
from enum import Enum
from typing import Any
import sentry_sdk
from sentry.conf.types.kafka_definition import Topic
# The maximum length of a column that is indexed in postgres. It is important to keep this in
# sync between the consumers and the models defined in src/sentry/sentry_metrics/models.py
MAX_INDEXED_COLUMN_LENGTH = 200
class UseCaseKey(Enum):
RELEASE_HEALTH = "release-health"
PERFORMANCE = "performance"
# Rate limiter namespaces, the postgres (PG)
# values are the same as UseCaseKey to keep
# backwards compatibility
RELEASE_HEALTH_PG_NAMESPACE = "releasehealth"
PERFORMANCE_PG_NAMESPACE = "performance"
RELEASE_HEALTH_CS_NAMESPACE = "releasehealth.cs"
PERFORMANCE_CS_NAMESPACE = "performance.cs"
RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME = (
"sentry-metrics.indexer.release-health.schema-validation-rules"
)
GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME = (
"sentry-metrics.indexer.generic-metrics.schema-validation-rules"
)
class IndexerStorage(Enum):
POSTGRES = "postgres"
MOCK = "mock"
@dataclass(frozen=True)
class MetricsIngestConfiguration:
db_backend: IndexerStorage
db_backend_options: Mapping[str, Any]
input_topic: str
output_topic: Topic
use_case_id: UseCaseKey
internal_metrics_tag: str | None
writes_limiter_cluster_options: Mapping[str, Any]
writes_limiter_namespace: str
cardinality_limiter_cluster_options: Mapping[str, Any]
cardinality_limiter_namespace: str
should_index_tag_values: bool
schema_validation_rule_option_name: str | None = None
is_output_sliced: bool | None = False
_METRICS_INGEST_CONFIG_BY_USE_CASE: MutableMapping[
tuple[UseCaseKey, IndexerStorage], MetricsIngestConfiguration
] = dict()
def _register_ingest_config(config: MetricsIngestConfiguration) -> None:
_METRICS_INGEST_CONFIG_BY_USE_CASE[(config.use_case_id, config.db_backend)] = config
def get_ingest_config(
use_case_key: UseCaseKey, db_backend: IndexerStorage
) -> MetricsIngestConfiguration:
if len(_METRICS_INGEST_CONFIG_BY_USE_CASE) == 0:
from django.conf import settings
_register_ingest_config(
MetricsIngestConfiguration(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_METRICS,
output_topic=Topic.SNUBA_METRICS,
use_case_id=UseCaseKey.RELEASE_HEALTH,
internal_metrics_tag="release-health",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS,
writes_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
cardinality_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_CARDINALITY_LIMITER_OPTIONS,
cardinality_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
should_index_tag_values=True,
schema_validation_rule_option_name=RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME,
)
)
_register_ingest_config(
MetricsIngestConfiguration(
db_backend=IndexerStorage.POSTGRES,
db_backend_options={},
input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
output_topic=Topic.SNUBA_GENERIC_METRICS,
use_case_id=UseCaseKey.PERFORMANCE,
internal_metrics_tag="perf",
writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE,
writes_limiter_namespace=PERFORMANCE_PG_NAMESPACE,
cardinality_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_CARDINALITY_LIMITER_OPTIONS_PERFORMANCE,
cardinality_limiter_namespace=PERFORMANCE_PG_NAMESPACE,
is_output_sliced=settings.SENTRY_METRICS_INDEXER_ENABLE_SLICED_PRODUCER,
should_index_tag_values=False,
schema_validation_rule_option_name=GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME,
)
)
if (use_case_key, db_backend) == (UseCaseKey.RELEASE_HEALTH, IndexerStorage.MOCK):
_register_ingest_config(
MetricsIngestConfiguration(
db_backend=IndexerStorage.MOCK,
db_backend_options={},
input_topic="topic",
output_topic="output-topic",
use_case_id=use_case_key,
internal_metrics_tag="release-health",
writes_limiter_cluster_options={},
writes_limiter_namespace="test-namespace-rh",
cardinality_limiter_cluster_options={},
cardinality_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
should_index_tag_values=True,
schema_validation_rule_option_name=RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME,
)
)
if (use_case_key, db_backend) == (UseCaseKey.PERFORMANCE, IndexerStorage.MOCK):
_register_ingest_config(
MetricsIngestConfiguration(
db_backend=IndexerStorage.MOCK,
db_backend_options={},
input_topic="topic",
output_topic="output-topic",
use_case_id=use_case_key,
internal_metrics_tag="perf",
writes_limiter_cluster_options={},
writes_limiter_namespace="test-namespace-perf",
cardinality_limiter_cluster_options={},
cardinality_limiter_namespace=PERFORMANCE_PG_NAMESPACE,
should_index_tag_values=False,
schema_validation_rule_option_name=GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME,
)
)
return _METRICS_INGEST_CONFIG_BY_USE_CASE[(use_case_key, db_backend)]
def initialize_subprocess_state(config: MetricsIngestConfiguration) -> None:
"""
Initialization function for the subprocesses of the metrics indexer.
`config` is pickleable, and this function lives in a module that can be
imported without any upfront initialization of the Django app. Meaning that
an object like
`functools.partial(initialize_sentry_and_global_consumer_state, config)` is
pickleable as well (which we pass as initialization callback to arroyo).
This function should ideally be kept minimal and not contain too much
logic. Commonly reusable bits should be added to
sentry.utils.arroyo.RunTaskWithMultiprocessing.
We already rely on sentry.utils.arroyo.RunTaskWithMultiprocessing to copy
statsd tags into the subprocess, eventually we should do the same for
Sentry tags.
"""
sentry_sdk.set_tag("sentry_metrics.use_case_key", config.use_case_id.value)
def initialize_main_process_state(config: MetricsIngestConfiguration) -> None:
"""
Initialization function for the main process of the metrics indexer.
This primarily sets global tags for instrumentation in both our
statsd/metrics usage and the Sentry SDK.
"""
sentry_sdk.set_tag("sentry_metrics.use_case_key", config.use_case_id.value)
from sentry.utils.metrics import add_global_tags
global_tag_map = {"pipeline": config.internal_metrics_tag or ""}
add_global_tags(_all_threads=True, **global_tag_map)