Skip to content

Commit

Permalink
fix(insight): query support for InsightCachingState mechanism (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes committed May 15, 2024
1 parent fc9aea5 commit 5b80840
Show file tree
Hide file tree
Showing 17 changed files with 529 additions and 251 deletions.
247 changes: 112 additions & 135 deletions ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiments.ambr

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ export interface BreakdownFilter {
breakdown_hide_other_aggregation?: boolean | null // hides the "other" field for trends
}

// TODO: Rename to `DashboardFilters` for consistency with `HogQLFilters`
export interface DashboardFilter {
date_from?: string | null
date_to?: string | null
Expand Down
63 changes: 20 additions & 43 deletions posthog/api/insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from functools import lru_cache
from typing import Any, Optional, Union, cast

from sentry_sdk import capture_exception, set_tag
import structlog
from django.db import transaction
from django.db.models import Count, Prefetch, QuerySet
Expand Down Expand Up @@ -55,11 +54,8 @@
from posthog.hogql.errors import ExposedHogQLError
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.apply_dashboard_filters import WRAPPER_NODE_KINDS
from posthog.hogql_queries.legacy_compatibility.feature_flag import (
hogql_insights_replace_filters,
should_use_hogql_backend_in_insight_serialization,
)
from posthog.hogql_queries.legacy_compatibility.filter_to_query import filter_to_query
from posthog.hogql_queries.legacy_compatibility.feature_flag import hogql_insights_replace_filters
from posthog.hogql_queries.legacy_compatibility.flagged_conversion_manager import flagged_conversion_to_query_based
from posthog.kafka_client.topics import KAFKA_METRICS_TIME_TO_SEE_DATA
from posthog.models import DashboardTile, Filter, Insight, User
from posthog.models.activity_logging.activity_log import (
Expand Down Expand Up @@ -545,30 +541,11 @@ def to_representation(self, instance: Insight):
def insight_result(self, insight: Insight) -> InsightResult:
from posthog.caching.calculate_results import calculate_for_query_based_insight

dashboard = self.context.get("dashboard", None)
dashboard_tile = self.dashboard_tile_from_context(insight, dashboard)

if insight.query:
try:
return calculate_for_query_based_insight(
insight, dashboard=dashboard, refresh_requested=refresh_requested_by_client(self.context["request"])
)
except ExposedHogQLError as e:
raise ValidationError(str(e))
dashboard: Optional[Dashboard] = self.context.get("dashboard")

if not self.context["request"].user.is_anonymous and should_use_hogql_backend_in_insight_serialization(
self.context["request"].user
):
# TRICKY: As running `filters`-based insights on the HogQL-based engine is a transitional mechanism,
# we fake the insight being properly `query`-based.
# To prevent the lie from accidentally being saved to Postgres, we roll it back in the `finally` branch.
try:
insight.query = filter_to_query(insight.filters).model_dump()
except:
# If `filter_to_query` failed, let's capture this and proceed with legacy filters
set_tag("filter_to_query_todo", True)
capture_exception()
else:
with flagged_conversion_to_query_based(insight):
if insight.query:
# Uses query
try:
return calculate_for_query_based_insight(
insight,
Expand All @@ -577,21 +554,21 @@ def insight_result(self, insight: Insight) -> InsightResult:
)
except ExposedHogQLError as e:
raise ValidationError(str(e))
finally:
insight.query = None

is_shared = self.context.get("is_shared", False)
refresh_insight_now, refresh_frequency = should_refresh_insight(
insight,
dashboard_tile,
request=self.context["request"],
is_shared=is_shared,
)
if refresh_insight_now:
INSIGHT_REFRESH_INITIATED_COUNTER.labels(is_shared=is_shared).inc()
return synchronously_update_cache(insight, dashboard, refresh_frequency=refresh_frequency)
else:
# Uses legacy filters
dashboard_tile = self.dashboard_tile_from_context(insight, dashboard)
is_shared = self.context.get("is_shared", False)
refresh_insight_now, refresh_frequency = should_refresh_insight(
insight,
dashboard_tile,
request=self.context["request"],
is_shared=is_shared,
)
if refresh_insight_now:
INSIGHT_REFRESH_INITIATED_COUNTER.labels(is_shared=is_shared).inc()
return synchronously_update_cache(insight, dashboard, refresh_frequency=refresh_frequency)

return fetch_cached_insight_result(dashboard_tile or insight, refresh_frequency)
return fetch_cached_insight_result(dashboard_tile or insight, refresh_frequency)

@lru_cache(maxsize=1) # each serializer instance should only deal with one insight/tile combo
def dashboard_tile_from_context(self, insight: Insight, dashboard: Optional[Dashboard]) -> Optional[DashboardTile]:
Expand Down
7 changes: 7 additions & 0 deletions posthog/api/services/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from posthog.queries.time_to_see_data.serializers import SessionEventsQuerySerializer, SessionsQuerySerializer
from posthog.queries.time_to_see_data.sessions import get_session_events, get_sessions
from posthog.schema import (
DashboardFilter,
HogQLAutocomplete,
HogQLMetadata,
QuerySchemaRoot,
Expand All @@ -31,14 +32,17 @@ def process_query(
team: Team,
query_json: dict,
*,
dashboard_filters_json: Optional[dict] = None,
limit_context: Optional[LimitContext] = None,
execution_mode: ExecutionMode = ExecutionMode.RECENT_CACHE_CALCULATE_IF_STALE,
) -> dict:
model = QuerySchemaRoot.model_validate(query_json)
tag_queries(query=query_json)
dashboard_filters = DashboardFilter.model_validate(dashboard_filters_json) if dashboard_filters_json else None
return process_query_model(
team,
model.root,
dashboard_filters=dashboard_filters,
limit_context=limit_context,
execution_mode=execution_mode,
)
Expand All @@ -48,6 +52,7 @@ def process_query_model(
team: Team,
query: BaseModel, # mypy has problems with unions and isinstance
*,
dashboard_filters: Optional[DashboardFilter] = None,
limit_context: Optional[LimitContext] = None,
execution_mode: ExecutionMode = ExecutionMode.RECENT_CACHE_CALCULATE_IF_STALE,
) -> dict:
Expand Down Expand Up @@ -89,6 +94,8 @@ def process_query_model(
else:
raise ValidationError(f"Unsupported query kind: {query.__class__.__name__}")
else: # Query runner available - it will handle execution as well as caching
if dashboard_filters:
query_runner.apply_dashboard_filters(dashboard_filters)
result = query_runner.run(execution_mode=execution_mode)

if isinstance(result, BaseModel):
Expand Down
158 changes: 158 additions & 0 deletions posthog/api/test/test_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from django.test import override_settings
from django.utils import timezone
from freezegun import freeze_time
from posthog.caching.insight_cache import update_cache
from posthog.caching.insight_caching_state import TargetCacheAge
from posthog.hogql.query import execute_hogql_query
from rest_framework import status
from parameterized import parameterized
Expand All @@ -29,6 +31,7 @@
OrganizationMembership,
Text,
)
from posthog.models.insight_caching_state import InsightCachingState
from posthog.schema import (
DataTableNode,
DataVisualizationNode,
Expand Down Expand Up @@ -1309,6 +1312,161 @@ def test_insight_refreshing_query(self, properties_filter, spy_execute_hogql_que
],
)

@patch("posthog.api.insight.synchronously_update_cache", wraps=synchronously_update_cache)
@patch(
"posthog.caching.insight_caching_state.calculate_target_age_insight",
# The tested insight normally wouldn't satisfy the criteria for being refreshed in the background,
# this patch means it will be treated as if it did satisfy them
return_value=TargetCacheAge.MID_PRIORITY,
)
def test_insight_refreshing_legacy_with_background_update(
self, spy_synchronously_update_cache, spy_calculate_target_age_insight
) -> None:
with freeze_time("2012-01-14T03:21:34.000Z"):
_create_event(
team=self.team,
event="$pageview",
distinct_id="1",
properties={"prop": "val"},
)
_create_event(
team=self.team,
event="$pageview",
distinct_id="2",
properties={"prop": "another_val"},
)
_create_event(
team=self.team,
event="$pageview",
distinct_id="2",
properties={"prop": "val", "another": "never_return_this"},
)
flush_persons_and_events()

with freeze_time("2012-01-15T04:01:34.000Z"):
response = self.client.post(
f"/api/projects/{self.team.id}/insights",
data={
"filters": {
"events": [{"id": "$pageview"}],
"properties": [
{
"key": "another",
"value": "never_return_this",
"operator": "is_not",
}
],
},
},
).json()
self.assertNotIn("code", response) # Watching out for an error code
self.assertEqual(response["last_refresh"], None)
insight_id = response["id"]

response = self.client.get(f"/api/projects/{self.team.id}/insights/{insight_id}/?refresh=true").json()
self.assertNotIn("code", response)
self.assertEqual(spy_synchronously_update_cache.call_count, 1)
self.assertEqual(response["result"][0]["data"], [0, 0, 0, 0, 0, 0, 2, 0])
self.assertEqual(response["last_refresh"], "2012-01-15T04:01:34Z")
self.assertEqual(response["last_modified_at"], "2012-01-15T04:01:34Z")
self.assertFalse(response["is_cached"])

with freeze_time("2012-01-17T05:01:34.000Z"):
update_cache(InsightCachingState.objects.get(insight_id=insight_id).id)

with freeze_time("2012-01-17T06:01:34.000Z"):
response = self.client.get(f"/api/projects/{self.team.id}/insights/{insight_id}/?refresh=false").json()
self.assertNotIn("code", response)
self.assertEqual(spy_synchronously_update_cache.call_count, 1)
self.assertEqual(response["result"][0]["data"], [0, 0, 0, 0, 2, 0, 0, 0])
self.assertEqual(response["last_refresh"], "2012-01-17T05:01:34Z") # Got refreshed with `update_cache`!
self.assertEqual(response["last_modified_at"], "2012-01-15T04:01:34Z")
self.assertTrue(response["is_cached"])

@parameterized.expand(
[
[ # Property group filter, which is what's actually used these days
PropertyGroupFilter(
type=FilterLogicalOperator.AND,
values=[
PropertyGroupFilterValue(
type=FilterLogicalOperator.OR,
values=[EventPropertyFilter(key="another", value="never_return_this", operator="is_not")],
)
],
)
],
[ # Classic list of filters
[EventPropertyFilter(key="another", value="never_return_this", operator="is_not")]
],
]
)
@patch("posthog.hogql_queries.insights.trends.trends_query_runner.execute_hogql_query", wraps=execute_hogql_query)
@patch(
"posthog.caching.insight_caching_state.calculate_target_age_insight",
# The tested insight normally wouldn't satisfy the criteria for being refreshed in the background,
# this patch means it will be treated as if it did satisfy them
return_value=TargetCacheAge.MID_PRIORITY,
)
def test_insight_refreshing_query_with_background_update(
self, properties_filter, spy_execute_hogql_query, spy_calculate_target_age_insight
) -> None:
with freeze_time("2012-01-14T03:21:34.000Z"):
_create_event(
team=self.team,
event="$pageview",
distinct_id="1",
properties={"prop": "val"},
)
_create_event(
team=self.team,
event="$pageview",
distinct_id="2",
properties={"prop": "another_val"},
)
_create_event(
team=self.team,
event="$pageview",
distinct_id="2",
properties={"prop": "val", "another": "never_return_this"},
)
flush_persons_and_events()

query_dict = TrendsQuery(
series=[
EventsNode(
event="$pageview",
)
],
properties=properties_filter,
).model_dump()

with freeze_time("2012-01-15T04:01:34.000Z"):
response = self.client.post(f"/api/projects/{self.team.id}/insights", data={"query": query_dict}).json()
self.assertNotIn("code", response) # Watching out for an error code
self.assertEqual(response["last_refresh"], None)
insight_id = response["id"]

response = self.client.get(f"/api/projects/{self.team.id}/insights/{insight_id}/?refresh=true").json()
self.assertNotIn("code", response)
self.assertEqual(spy_execute_hogql_query.call_count, 1)
self.assertEqual(response["result"][0]["data"], [0, 0, 0, 0, 0, 0, 2, 0])
self.assertEqual(response["last_refresh"], "2012-01-15T04:01:34Z")
self.assertEqual(response["last_modified_at"], "2012-01-15T04:01:34Z")
self.assertFalse(response["is_cached"])

with freeze_time("2012-01-17T05:01:34.000Z"):
update_cache(InsightCachingState.objects.get(insight_id=insight_id).id)

with freeze_time("2012-01-17T06:01:34.000Z"):
response = self.client.get(f"/api/projects/{self.team.id}/insights/{insight_id}/?refresh=false").json()
self.assertNotIn("code", response)
self.assertEqual(spy_execute_hogql_query.call_count, 1)
self.assertEqual(response["result"][0]["data"], [0, 0, 0, 0, 2, 0, 0, 0])
self.assertEqual(response["last_refresh"], "2012-01-17T05:01:34Z") # Got refreshed with `update_cache`!
self.assertEqual(response["last_modified_at"], "2012-01-15T04:01:34Z")
self.assertTrue(response["is_cached"])

def test_dashboard_filters_applied_to_sql_data_table_node(self):
dashboard_id, _ = self.dashboard_api.create_dashboard(
{"name": "the dashboard", "filters": {"date_from": "-180d"}}
Expand Down
38 changes: 38 additions & 0 deletions posthog/api/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,44 @@ def test_full_hogql_query_values(self):

self.assertEqual(response.get("results", [])[0][0], 20)

def test_dashboard_filters_applied(self):
random_uuid = f"RANDOM_TEST_ID::{UUIDT()}"
with freeze_time("2020-01-07 12:00:00"):
_create_event(
team=self.team,
event="sign up",
distinct_id=random_uuid,
properties={"key": "test_val1"},
)
with freeze_time("2020-01-10 15:00:00"):
_create_event(
team=self.team,
event="$pageview",
distinct_id=random_uuid,
properties={"key": "test_val1"},
)
flush_persons_and_events()

with freeze_time("2020-01-10 19:00:00"):
response_without_dashboard_filters = process_query(
team=self.team,
query_json={
"kind": "HogQLQuery",
"query": "select count() from events where {filters}",
},
)
response_with_dashboard_filters = process_query(
team=self.team,
query_json={
"kind": "HogQLQuery",
"query": "select count() from events where {filters}",
},
dashboard_filters_json={"date_from": "2020-01-09", "date_to": "2020-01-11"},
)

self.assertEqual(response_without_dashboard_filters.get("results", []), [(2,)])
self.assertEqual(response_with_dashboard_filters.get("results", []), [(1,)])


class TestQueryRetrieve(APIBaseTest):
def setUp(self):
Expand Down

0 comments on commit 5b80840

Please sign in to comment.