Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): Add functions to query metrics meta tables #69803

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
183 changes: 181 additions & 2 deletions src/sentry/snuba/metrics_layer/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@
import logging
from collections.abc import Mapping
from dataclasses import replace
from datetime import datetime
from datetime import UTC, datetime, timedelta
from typing import Any, Union, cast

from snuba_sdk import (
BooleanCondition,
Column,
Condition,
CurriedFunction,
Direction,
Entity,
Formula,
Metric,
MetricsQuery,
Op,
OrderBy,
Query,
Request,
Timeseries,
)
Expand All @@ -22,7 +27,13 @@

from sentry.exceptions import InvalidParams
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.sentry_metrics.utils import resolve_weak, reverse_resolve_weak, string_to_use_case_id
from sentry.sentry_metrics.utils import (
bulk_reverse_resolve,
resolve_many_weak,
resolve_weak,
reverse_resolve_weak,
string_to_use_case_id,
)
from sentry.snuba.dataset import Dataset
from sentry.snuba.metrics.naming_layer.mapping import get_mri
from sentry.snuba.metrics.naming_layer.mri import parse_mri
Expand Down Expand Up @@ -519,3 +530,171 @@ def convert_snuba_result(
if reverse_resolve:
data_point[key] = reverse_resolve
return snuba_result


def fetch_metric_mris(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to return the project_id -> [MRI] mapping? Since we use this information in our endpoints.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can change that.

org_id: int, project_ids: list[int], use_case_id: UseCaseID, app_id: str = ""
) -> dict[int, list[str]]:
"""
Fetches all the metric MRIs for a set of projects and use case. This will reverse
resolve all the metric IDs into MRIs.
"""
return _query_meta_table(org_id, project_ids, use_case_id, app_id=app_id)


def fetch_metric_tag_keys(
org_id: int, project_ids: list[int], use_case_id: UseCaseID, mri: str, app_id: str = ""
) -> dict[int, list[str]]:
"""
Fetches the tag keys for a given metric MRI. This will reverse
resolve all the tag keys into strings.
"""
return _query_meta_table(org_id, project_ids, use_case_id, mri, app_id)


def _query_meta_table(
org_id: int,
project_ids: list[int],
use_case_id: UseCaseID,
mri: str | None = None,
app_id: str = "",
) -> dict[int, list[str]]:
"""
Helper function for querying the meta table. This will query across all four metric types, and resolve all the resulting
values. If an MRI is provided, it is assumed that this function should find unique tag keys for that MRI.
"""

if mri:
column_name = "tag_key"
metric_id = resolve_weak(use_case_id, org_id, mri)
if metric_id == -1:
raise InvalidParams(f"Unknown metric: {mri}")
extra_condition = Condition(Column("metric_id"), Op.EQ, metric_id)
else:
column_name = "metric_id"
extra_condition = None

conditions = [
Condition(Column("org_id"), Op.EQ, org_id),
Condition(Column("project_id"), Op.IN, project_ids),
Condition(Column("use_case_id"), Op.EQ, use_case_id.value),
Condition(Column("timestamp"), Op.GTE, datetime.now(UTC) - timedelta(days=90)),
Condition(Column("timestamp"), Op.LT, datetime.now(UTC) + timedelta(days=1)),
]
if extra_condition:
conditions.append(extra_condition)

counters_query = (
Query(Entity("generic_metrics_counters_meta"))
.set_select([Column("project_id"), Column(column_name)])
.set_groupby([Column("project_id"), Column(column_name)])
.set_where(conditions)
.set_orderby(
[
OrderBy(Column("project_id"), Direction.ASC),
OrderBy(Column(column_name), Direction.ASC),
]
)
.set_limit(1000)
)

def build_request(query: Query) -> Request:
return Request(
dataset="generic_metrics",
app_id=use_case_id.value if app_id == "" else app_id,
query=query,
tenant_ids={
"organization_id": org_id,
"project_id": project_ids[0],
"referrer": f"generic_metrics_meta_{column_name}",
},
)

requests = [build_request(counters_query)]
for mtype in ["sets", "gauges", "distributions"]:
new_query = counters_query.set_match(Entity(f"generic_metrics_{mtype}_meta"))
new_request = build_request(new_query)
requests.append(new_request)

results = bulk_snuba_queries(requests, f"generic_metrics_meta_{column_name}")
indexed_ids = []
for result in results:
indexed_ids.extend([row[column_name] for row in result["data"]])

resolved_ids = bulk_reverse_resolve(use_case_id, org_id, indexed_ids)
# Group by project ID
grouped_results: dict[int, list[str]] = {}
for result in results:
for row in result["data"]:
mri = resolved_ids[row[column_name]]
grouped_results.setdefault(row["project_id"], list()).append(mri)

return grouped_results


def fetch_metric_tag_values(
org_id: int,
project_id: int,
use_case_id: UseCaseID,
mri: str,
tag_key: str,
tag_value_prefix: str = "",
app_id: str = "",
) -> list[str]:
"""
Find all the unique tag values for a given MRI and tag key. This will reverse resolve
all the values.
"""
parsed_mri = parse_mri(mri)
if parsed_mri is None:
raise InvalidParams(f"'{mri}' is not a valid MRI")

entity = {
"c": "counters",
"d": "distributions",
"g": "gauges",
"s": "sets",
}[parsed_mri.entity]

resolved = resolve_many_weak(use_case_id, org_id, [mri, tag_key])
if len(resolved) != 2:
raise InvalidParams("Unknown metric or tag key")
metric_id, tag_key_id = resolved

conditions = [
Condition(Column("project_id"), Op.EQ, project_id),
Condition(Column("metric_id"), Op.EQ, metric_id),
Condition(Column("tag_key"), Op.EQ, tag_key_id),
Condition(Column("timestamp"), Op.GTE, datetime.now(UTC) - timedelta(days=90)),
Condition(Column("timestamp"), Op.LT, datetime.now(UTC) + timedelta(days=1)),
]

if tag_value_prefix:
conditions.append(Condition(Column("tag_value"), Op.LIKE, f"{tag_value_prefix}%"))

tag_values_query = (
Query(Entity(f"generic_metrics_{entity}_meta_tag_values"))
.set_select([Column("tag_value")])
.set_groupby([Column("tag_value")])
.set_where(conditions)
.set_orderby([OrderBy(Column("tag_value"), Direction.ASC)])
.set_limit(1000)
)

request = Request(
dataset="generic_metrics",
app_id=use_case_id.value if app_id == "" else app_id,
query=tag_values_query,
tenant_ids={
"organization_id": org_id,
"project_id": project_id,
"referrer": "generic_metrics_meta_tag_values",
},
)

results = bulk_snuba_queries([request], "generic_metrics_meta_tag_values")
values = []
for result in results:
values.extend([row["tag_value"] for row in result["data"]])

return values
96 changes: 95 additions & 1 deletion tests/snuba/test_metrics_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.snuba.metrics.naming_layer import SessionMRI, TransactionMRI
from sentry.snuba.metrics.naming_layer.public import TransactionStatusTagValue, TransactionTagsKey
from sentry.snuba.metrics_layer.query import bulk_run_query, run_query
from sentry.snuba.metrics_layer.query import (
bulk_run_query,
fetch_metric_mris,
fetch_metric_tag_keys,
fetch_metric_tag_values,
run_query,
)
from sentry.testutils.cases import BaseMetricsTestCase, TestCase

pytestmark = pytest.mark.sentry_metrics
Expand Down Expand Up @@ -872,3 +878,91 @@ def test_formulas_with_scalar_formulas(self) -> None:
assert len(result["data"]) == 10
for row in result["data"]:
assert row["aggregate_value"] >= 86400


class MQLMetaTest(TestCase, BaseMetricsTestCase):
def ts(self, dt: datetime) -> int:
return int(dt.timestamp())

def setUp(self) -> None:
super().setUp()

self.generic_metrics: Mapping[str, Literal["counter", "set", "distribution", "gauge"]] = {
TransactionMRI.DURATION.value: "distribution",
TransactionMRI.USER.value: "set",
TransactionMRI.COUNT_PER_ROOT_PROJECT.value: "counter",
"g:transactions/test_gauge@none": "gauge",
}
self.now = datetime.now(tz=timezone.utc).replace(microsecond=0)
self.hour_ago = self.now - timedelta(hours=1)
self.org_id = self.project.organization_id
for mri, metric_type in self.generic_metrics.items():
assert metric_type in {"counter", "distribution", "set", "gauge"}
for i in range(2):
value: int | dict[str, int]
if metric_type == "gauge":
value = {
"min": i,
"max": i,
"sum": i,
"count": i,
"last": i,
}
else:
value = i
self.store_metric(
self.org_id,
self.project.id,
metric_type,
mri,
{
"transaction": f"transaction_{i % 2}",
"status_code": "500" if i % 2 == 0 else "200",
"device": "BlackBerry" if i % 2 == 0 else "Nokia",
},
self.ts(self.hour_ago + timedelta(minutes=1 * i)),
value,
UseCaseID.TRANSACTIONS,
)

def test_fetch_metric_mris(self) -> None:
metric_mris = fetch_metric_mris(self.org_id, [self.project.id], UseCaseID.TRANSACTIONS)
assert len(metric_mris) == 1
assert len(metric_mris[self.project.id]) == 4
assert metric_mris[self.project.id] == [
"c:transactions/count_per_root_project@none",
"s:transactions/user@none",
"g:transactions/test_gauge@none",
"d:transactions/duration@millisecond",
]

def test_fetch_metric_tag_keys(self) -> None:
tag_keys = fetch_metric_tag_keys(
self.org_id, [self.project.id], UseCaseID.TRANSACTIONS, "g:transactions/test_gauge@none"
)
assert len(tag_keys) == 1
assert len(tag_keys[self.project.id]) == 3
assert tag_keys[self.project.id] == ["status_code", "device", "transaction"]

def test_fetch_metric_tag_values(self) -> None:
tag_values = fetch_metric_tag_values(
self.org_id,
self.project.id,
UseCaseID.TRANSACTIONS,
"g:transactions/test_gauge@none",
"transaction",
)
assert len(tag_values) == 2
assert tag_values == ["transaction_0", "transaction_1"]

def test_fetch_metric_tag_values_with_prefix(self) -> None:
tag_values = fetch_metric_tag_values(
self.org_id,
self.project.id,
UseCaseID.TRANSACTIONS,
"g:transactions/test_gauge@none",
"status_code",
"5",
)
assert len(tag_values) == 1
assert tag_values == ["500"]