Skip to content

Commit

Permalink
[Metrics V2] Query processor handle no join strategy (#42666)
Browse files Browse the repository at this point in the history
* [Metrics V2] Query processor handle no join strategy

Fixes #42462

Make QP preprocessor handle queries referencing metrics in the style of
legacy metrics. Enforce various compatibility checks.

* Make sure to adjust the actual stages being walked

* Address PR comments

* Add tests for recursion

* Fix e2e tests by setting fields on adjusted metric stage

* Fix more e2e tests by recurring properly
  • Loading branch information
snoe committed May 16, 2024
1 parent 608ac13 commit a4e2903
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 139 deletions.
232 changes: 147 additions & 85 deletions src/metabase/query_processor/middleware/metrics.clj
Original file line number Diff line number Diff line change
@@ -1,120 +1,182 @@
(ns metabase.query-processor.middleware.metrics
(:require
[medley.core :as m]
[metabase.lib.convert :as lib.convert]
[metabase.lib.core :as lib]
[metabase.lib.metadata :as lib.metadata]
[metabase.lib.util :as lib.util]
[metabase.lib.util.match :as lib.util.match]
[metabase.lib.walk :as lib.walk]
[metabase.util :as u]
[metabase.util.log :as log]))
[metabase.util.log :as log]
[metabase.util.malli :as mu]))

(defn- replace-metric-aggregation-refs [x lookup]
(lib.util.match/replace
x
[:metric & (_ :guard (fn [[{:keys [join-alias]} metric-id]]
(contains? lookup [join-alias metric-id])))]
(let [[_ {:keys [join-alias]} metric-id] &match
{replacement :aggregation metric-name :name} (get lookup [join-alias metric-id])]
(lib.util.match/replace x
[:metric _ metric-id]
(if-let [{replacement :aggregation metric-name :name} (get lookup metric-id)]
(update (lib.util/fresh-uuids replacement)
1
merge
{:name (u/slugify (or join-alias metric-name))}
(select-keys (get &match 1) [:lib/uuid :name])))))
{:name metric-name}
(select-keys (get &match 1) [:lib/uuid :name]))
(throw (ex-info "Incompatible metric" {:match &match
:lookup lookup})))))

(defn- find-metric-ids
[x]
(lib.util.match/match x
[:metric _ (id :guard pos-int?)]
id))

(defn- fetch-referenced-metrics
[query stage]
(let [metric-ids (find-metric-ids stage)]
(->> metric-ids
(lib.metadata/bulk-metadata-or-throw query :metadata/card)
(into {}
(map (fn [card-metadata]
(let [metric-query (lib.convert/->pMBQL
((requiring-resolve 'metabase.query-processor.preprocess/preprocess)
(lib/query query (:dataset-query card-metadata))))]
[(:id card-metadata)
{:query metric-query
:aggregation (first (lib/aggregations metric-query))
:name (:name card-metadata)}]))))
not-empty)))

(defn- expression-with-name-from-source
[query [_ {:lib/keys [expression-name]} :as expression]]
(lib/expression query 0 expression-name expression))

(defn- temp-query-at-stage-path
[query stage-path]
(cond-> query
stage-path (lib.walk/query-for-stage-at-path stage-path)
stage-path :query))

(defn splice-compatible-metrics
"Splices in metric definitions that are compatible with the query."
[query path expanded-stages]
(if-let [lookup (fetch-referenced-metrics query (:aggregation (first expanded-stages)))]
(let [new-query (reduce
(fn [query [_metric-id {metric-query :query}]]
(if (and (= (lib.util/source-table-id query) (lib.util/source-table-id metric-query))
(= 1 (lib/stage-count metric-query)))
(let [{:keys [expressions filters]} (lib.util/query-stage metric-query 0)]
(as-> query $q
(reduce expression-with-name-from-source $q expressions)
(reduce lib/filter $q filters)
(update-in $q [:stages 0 :aggregation] replace-metric-aggregation-refs lookup)))
(throw (ex-info "Incompatible metric" {:query query
:metric metric-query}))))
(temp-query-at-stage-path query path)
lookup)]
(:stages new-query))
expanded-stages))

(defn- find-metric-transition
"Finds an unadjusted transition between a metric source-card and the next stage."
[query expanded-stages]
(some (fn [[[idx-a stage-a] [_idx-b stage-b]]]
(let [stage-a-source (:qp/stage-is-from-source-card stage-a)
metric-metadata (some->> stage-a-source (lib.metadata/card query))]
(when (and
stage-a-source
(not= stage-a-source (:qp/stage-is-from-source-card stage-b))
(= (:type metric-metadata) :metric)
;; This indicates this stage has not been processed
;; because metrics must have aggregations
;; if it is missing, then it has been removed in this process
(:aggregation stage-a))
[idx-a metric-metadata])))
(partition-all 2 1 (m/indexed expanded-stages))))

(defn- update-metric-transition-stages
"Adjusts source-card metrics referencing themselves in the next stage.
The final stage of the metric is adjusted by removing:
```
:aggregation
:breakout
:order-by
```
`:fields` are added explictly to pass previous-stage fields onto the following-stage
The following stages will have `[:metric {} id]` clauses
replaced with the actual aggregation of the metric."
[query stage-path expanded-stages idx metric-metadata]
(mu/disable-enforcement
(let [[pre-transition-stages [last-metric-stage following-stage & following-stages]] (split-at idx expanded-stages)
metric-name (:name metric-metadata)
metric-aggregation (-> last-metric-stage :aggregation first)
stage-query (temp-query-at-stage-path query stage-path)
last-metric-stage-number idx
stage-query (update-in stage-query
[:stages idx]
(fn [stage]
(dissoc stage :breakout :order-by :aggregation :fields :lib/stage-metadata)))
;; Needed for field references to resolve further in the pipeline
stage-query (lib/with-fields stage-query idx (lib/fieldable-columns stage-query idx))
new-metric-stage (lib.util/query-stage stage-query last-metric-stage-number)
lookup {(:id metric-metadata)
{:name metric-name :aggregation metric-aggregation}}
new-following-stage (replace-metric-aggregation-refs
following-stage
lookup)
combined-stages (vec (remove nil? (concat pre-transition-stages
[new-metric-stage new-following-stage]
following-stages)))]
combined-stages)))

(defn- adjust-metric-stages
"`expanded-stages` are the result of :stages from fetch-source-query.
All source-card stages have been spliced in already.
`metric-ref-lookup` this is a volatile that holds references to the original aggragation clause (count, sum etc...)
it is used to replace `[:metric {} id]` clauses. This depends on the order of `walk` as each join is touched depth-first,
a ref-lookup will be added for any metrics found during the stage.
To adjust:
We look for the transition between the last stage of a metric and the next stage following it.
We adjust those two stages - as explained in `expand`.
"
[query expanded-stages metric-ref-lookup]
[query path expanded-stages]
;; Find a transition point, if it exists
(let [[idx metric-metadata] (some (fn [[[_idx-a stage-a] [idx-b stage-b]]]
(let [stage-a-source (:qp/stage-is-from-source-card stage-a)
metric-metadata (some->> stage-a-source (lib.metadata/card query))]
(when (and
stage-a-source
(not= stage-a-source (:qp/stage-is-from-source-card stage-b))
(= (:type metric-metadata) :metric)
;; This indicates this stage has not been processed
;; because metrics must have aggregations
;; if it is missing, then it has been removed in this process
(:aggregation stage-a))
[idx-b metric-metadata])))
(partition-all 2 1 (m/indexed expanded-stages)))]
(if idx
(let [[pre-transition-stages following-stages] (split-at idx expanded-stages)
metric-name (:name metric-metadata)
last-metric-stage (last pre-transition-stages)
metric-aggregation (-> last-metric-stage :aggregation first)
new-metric-stage (cond-> last-metric-stage
:always (dissoc :aggregation :fields :lib/stage-metadata)
(seq following-stages) (dissoc :breakout :order-by :limit))
;; Store lookup for metric references created in this set of stages.
;; These will be adjusted later if these stages are in a join
_ (vswap! metric-ref-lookup assoc [nil (:id metric-metadata)] {:name metric-name :aggregation metric-aggregation})
new-following-stages (replace-metric-aggregation-refs
following-stages
@metric-ref-lookup)
combined-stages (vec (remove nil? (concat (butlast pre-transition-stages) [new-metric-stage] new-following-stages)))]
(recur query combined-stages metric-ref-lookup))
(let [[idx metric-metadata] (find-metric-transition query expanded-stages)
[first-stage] expanded-stages]
(cond
idx
(let [new-stages (update-metric-transition-stages query path expanded-stages idx metric-metadata)]
(recur (assoc-in query (conj path :stages) new-stages) path new-stages))

(:source-table first-stage)
(splice-compatible-metrics query path expanded-stages)

:else
expanded-stages)))

(defn adjust
"Adjusts the final and following stages of `:source-card` of `:type` `:metric`.
"Looks for `[:metric {} id]` clause references and adjusts the query accordingly.
Expects stages to have been processed by `fetch-source-query/resolve-source-cards`
such that source card stages have been spliced in across the query.
The final stage of metric is adjusted by:
```
:aggregation - always removed
:breakout - removed if there are following-stages
:order-by - removed if there are following-stages
:limit - removed if there are following-stages
:fields - always removed
```
Stages following this, and stages further up the query hierarchy will have
`[:metric {} id]` clauses replaced with the actual aggregation of the metric.
"
Metrics can be referenced in two scenarios:
1. Compatible source table metrics.
Multiple metrics can be referenced in the first stage of a query that references a `:source-table`
Those metrics must:
- Be single stage metrics.
- Have the same `:source-table` as the query
2. Metric source cards can reference themselves.
A query built from a `:source-card` of `:type :metric` can reference itself."
[query]
(let [;; Once the stages are processed any ref-lookup missing a join alias must have
;; come from this join's stages, so further references must include the join alias.
metric-ref-lookup (volatile! {})
query (lib.walk/walk
(let [query (lib.walk/walk
query
(fn [_query path-type _path stage-or-join]
(case path-type
:lib.walk/join
(let [result (update stage-or-join :stages #(adjust-metric-stages query % metric-ref-lookup))]
;; Once the stages are processed any ref-lookup missing a join alias must have
;; come from this join's stages, so further references must include the join alias.
(vswap! metric-ref-lookup #(m/map-kv (fn [[lookup-alias lookup-card] v]
[(if-not lookup-alias
[(:alias stage-or-join) lookup-card]
[lookup-alias lookup-card])
(lib.util.match/replace
v
[:field (_ :guard (complement :join-alias)) _]
(update &match 1 assoc :join-alias (:alias stage-or-join)))])
%))
result)
stage-or-join)))
new-stages (adjust-metric-stages query (:stages query) metric-ref-lookup)]
(fn [_query path-type path stage-or-join]
(when (= path-type :lib.walk/join)
(update stage-or-join :stages #(adjust-metric-stages query path %)))))]
(u/prog1
(replace-metric-aggregation-refs
(assoc query :stages new-stages)
@metric-ref-lookup)
(when-let [match (lib.util.match/match-one <>
[:metric {} _] &match)]
(update query :stages #(adjust-metric-stages query nil %))
(when-let [metric (lib.util.match/match-one <>
[:metric _ _] &match)]
(log/warn "Failed to replace metric"
(pr-str {:match match
:lookup @metric-ref-lookup}))))))
(pr-str {:metric metric}))))))

0 comments on commit a4e2903

Please sign in to comment.