Skip to content

Commit

Permalink
[Metrics V2] Query processor handle no join strategy
Browse files Browse the repository at this point in the history
Fixes #42462

Make QP preprocessor handle queries referencing metrics in the style of
legacy metrics. Enforce various compatibility checks.
  • Loading branch information
snoe committed May 14, 2024
1 parent e1a7fba commit 725bd48
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 122 deletions.
197 changes: 124 additions & 73 deletions src/metabase/query_processor/middleware/metrics.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(ns metabase.query-processor.middleware.metrics
(:require
[medley.core :as m]
[metabase.lib.convert :as lib.convert]
[metabase.lib.core :as lib]
[metabase.lib.metadata :as lib.metadata]
[metabase.lib.util :as lib.util]
[metabase.lib.util.match :as lib.util.match]
Expand All @@ -10,33 +12,68 @@

(defn- replace-metric-aggregation-refs [x lookup]
(lib.util.match/replace
x
[:metric & (_ :guard (fn [[{:keys [join-alias]} metric-id]]
(contains? lookup [join-alias metric-id])))]
(let [[_ {:keys [join-alias]} metric-id] &match
{replacement :aggregation metric-name :name} (get lookup [join-alias metric-id])]
x
[:metric _ (metric-id :guard #(contains? lookup %))]
(let [{replacement :aggregation metric-name :name} (get lookup metric-id)]
(update (lib.util/fresh-uuids replacement)
1
merge
{:name (u/slugify (or join-alias metric-name))}
(select-keys (get &match 1) [:lib/uuid :name])))))
{:name metric-name}
(select-keys (get &match 1) [:lib/uuid :name])))
[:metric _ (metric-id :guard #(not (contains? lookup %)))]
(throw (ex-info "Incompatible metric" {:match &match
:lookup lookup}))))

(defn- adjust-metric-stages
"`expanded-stages` are the result of :stages from fetch-source-query.
All source-card stages have been spliced in already.
(defn- find-metric-ids
[x]
(lib.util.match/match x
[:metric _ (id :guard pos-int?)]
id))

`metric-ref-lookup` this is a volatile that holds references to the original aggragation clause (count, sum etc...)
it is used to replace `[:metric {} id]` clauses. This depends on the order of `walk` as each join is touched depth-first,
a ref-lookup will be added for any metrics found during the stage.
(defn- fetch-referenced-metrics
[query stage]
(let [metric-ids (find-metric-ids stage)]
(->> metric-ids
(lib.metadata/bulk-metadata-or-throw query :metadata/card)
(into {}
(map (juxt :id
(fn [card-metadata]
(let [metric-query (lib.convert/->pMBQL
((requiring-resolve 'metabase.query-processor.preprocess/preprocess)
(lib/query query (:dataset-query card-metadata))))]
{:query metric-query
:aggregation (first (lib/aggregations metric-query))
:name (:name card-metadata)})))))
not-empty)))

To adjust:
(defn- expression-with-name-from-source
[query [_ {:lib/keys [expression-name]} :as expression]]
(lib/expression query 0 expression-name expression))

We look for the transition between the last stage of a metric and the next stage following it.
We adjust those two stages - as explained in `expand`.
"
[query expanded-stages metric-ref-lookup]
;; Find a transition point, if it exists
(let [[idx metric-metadata] (some (fn [[[_idx-a stage-a] [idx-b stage-b]]]
(defn splice-compatible-metrics
"Splices in metric definitions that are compatible with the query."
[query]
(if-let [lookup (fetch-referenced-metrics query (lib/aggregations query 0))]
(let [new-query (reduce
(fn [query [_metric-id {metric-query :query}]]
(if (and (= (lib.util/source-table-id query) (lib.util/source-table-id metric-query))
(= 1 (lib/stage-count metric-query)))
(let [{:keys [expressions filters]} (lib.util/query-stage metric-query 0)]
(as-> query $q
(reduce expression-with-name-from-source $q expressions)
(reduce lib/filter $q filters)
(update-in $q [:stages 0 :aggregation] replace-metric-aggregation-refs lookup)))
(throw (ex-info "Incompatible metric" {:query query
:metric metric-query}))))
query
lookup)]
(:stages new-query))
(:stages query)))

(defn- find-metric-transition
"Finds an unadjusted transition between a metric source-card and the next stage."
[query expanded-stages]
(some (fn [[[idx-a stage-a] [_idx-b stage-b]]]
(let [stage-a-source (:qp/stage-is-from-source-card stage-a)
metric-metadata (some->> stage-a-source (lib.metadata/card query))]
(when (and
Expand All @@ -47,74 +84,88 @@
;; because metrics must have aggregations
;; if it is missing, then it has been removed in this process
(:aggregation stage-a))
[idx-b metric-metadata])))
(partition-all 2 1 (m/indexed expanded-stages)))]
(if idx
(let [[pre-transition-stages following-stages] (split-at idx expanded-stages)
metric-name (:name metric-metadata)
last-metric-stage (last pre-transition-stages)
metric-aggregation (-> last-metric-stage :aggregation first)
new-metric-stage (cond-> last-metric-stage
:always (dissoc :aggregation :fields :lib/stage-metadata)
(seq following-stages) (dissoc :breakout :order-by :limit))
;; Store lookup for metric references created in this set of stages.
;; These will be adjusted later if these stages are in a join
_ (vswap! metric-ref-lookup assoc [nil (:id metric-metadata)] {:name metric-name :aggregation metric-aggregation})
new-following-stages (replace-metric-aggregation-refs
following-stages
@metric-ref-lookup)
combined-stages (vec (remove nil? (concat (butlast pre-transition-stages) [new-metric-stage] new-following-stages)))]
(recur query combined-stages metric-ref-lookup))
expanded-stages)))

(defn adjust
"Adjusts the final and following stages of `:source-card` of `:type` `:metric`.
[idx-a metric-metadata])))
(partition-all 2 1 (m/indexed expanded-stages))))

Expects stages to have been processed by `fetch-source-query/resolve-source-cards`
such that source card stages have been spliced in across the query.
(defn- update-metric-transition-stages
"Adjusts source-card metrics referencing themselves in the next stage.
The final stage of metric is adjusted by:
The final stage of the metric is adjusted by:
```
:aggregation - always removed
:breakout - removed if there are following-stages
:order-by - removed if there are following-stages
:limit - removed if there are following-stages
:fields - always removed
```
Stages following this, and stages further up the query hierarchy will have
`[:metric {} id]` clauses replaced with the actual aggregation of the metric.
The following stages will have `[:metric {} id]` clauses
replaced with the actual aggregation of the metric."
[expanded-stages idx metric-metadata]
(let [[pre-transition-stages [last-metric-stage following-stage & following-stages]] (split-at idx expanded-stages)
metric-name (:name metric-metadata)
metric-aggregation (-> last-metric-stage :aggregation first)
new-metric-stage (cond-> last-metric-stage
:always (dissoc :aggregation :fields :lib/stage-metadata)
following-stage (dissoc :breakout :order-by))
lookup {(:id metric-metadata)
{:name metric-name :aggregation metric-aggregation}}
new-following-stage (replace-metric-aggregation-refs
following-stage
lookup)
combined-stages (vec (remove nil? (concat pre-transition-stages
[new-metric-stage new-following-stage]
following-stages)))]
combined-stages))

(defn- adjust-metric-stages
"`expanded-stages` are the result of :stages from fetch-source-query.
All source-card stages have been spliced in already.
To adjust:
We look for the transition between the last stage of a metric and the next stage following it.
We adjust those two stages - as explained in `expand`.
"
[query expanded-stages]
;; Find a transition point, if it exists
(let [[idx metric-metadata] (find-metric-transition query expanded-stages)
[first-stage] expanded-stages]
(cond
idx
(recur query (update-metric-transition-stages expanded-stages idx metric-metadata))

(:source-table first-stage)
(splice-compatible-metrics query)

:else
expanded-stages)))

(defn adjust
"Looks for `[:metric {} id]` clause references and adjusts the query accordingly.
Expects stages to have been processed by `fetch-source-query/resolve-source-cards`
such that source card stages have been spliced in across the query.
Metrics can be referenced in two scenarios:
1. Compatible source table metrics.
Multiple metrics can be referenced in the first stage of a query that references a `:source-table`
Those metrics must:
- Be single stage metrics.
- Have the same `:source-table` as the query
2. Metric source cards can reference themselves.
A query built from a `:source-card` of `:type :metric` can reference itself."
[query]
(let [;; Once the stages are processed any ref-lookup missing a join alias must have
;; come from this join's stages, so further references must include the join alias.
metric-ref-lookup (volatile! {})
query (lib.walk/walk
(let [query (lib.walk/walk
query
(fn [_query path-type _path stage-or-join]
(case path-type
:lib.walk/join
(let [result (update stage-or-join :stages #(adjust-metric-stages query % metric-ref-lookup))]
;; Once the stages are processed any ref-lookup missing a join alias must have
;; come from this join's stages, so further references must include the join alias.
(vswap! metric-ref-lookup #(m/map-kv (fn [[lookup-alias lookup-card] v]
[(if-not lookup-alias
[(:alias stage-or-join) lookup-card]
[lookup-alias lookup-card])
(lib.util.match/replace
v
[:field (_ :guard (complement :join-alias)) _]
(update &match 1 assoc :join-alias (:alias stage-or-join)))])
%))
result)
(update stage-or-join :stages #(adjust-metric-stages query %))
stage-or-join)))
new-stages (adjust-metric-stages query (:stages query) metric-ref-lookup)]
new-stages (adjust-metric-stages query (:stages query))]
(u/prog1
(replace-metric-aggregation-refs
(assoc query :stages new-stages)
@metric-ref-lookup)
(when-let [match (lib.util.match/match-one <>
[:metric {} _] &match)]
(assoc query :stages new-stages)
(when-let [metric (lib.util.match/match-one <>
[:metric _ _] &match)]
(log/warn "Failed to replace metric"
(pr-str {:match match
:lookup @metric-ref-lookup}))))))
(pr-str {:metric metric}))))))
92 changes: 43 additions & 49 deletions test/metabase/query_processor/middleware/metrics_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,50 @@
metadata-provider
(lib.tu/mock-metadata-provider
{:cards [metric]}))])))

(def adjust
(comp metrics/adjust fetch-source-query/resolve-source-cards))
(comp #'metrics/adjust #'fetch-source-query/resolve-source-cards))

(deftest ^:parallel adjust-basic-source-table-test
(let [[source-metric mp] (mock-metric)
query (-> (lib/query mp (meta/table-metadata :products))
(lib/aggregate (lib.metadata/metric mp (:id source-metric))))]
(is (=? {:stages [{:source-table (meta/id :products)
:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]}
(adjust query)))))

(deftest ^:parallel adjust-basic-source-metric-test
(let [[source-metric mp] (mock-metric)
query (lib/query mp source-metric)]
(is (=?
{:stages [{:source-table (meta/id :products)}
{:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]}
(adjust query)))))

(deftest ^:parallel adjust-aggregation-metric-ref-test
(let [[source-metric mp] (mock-metric)
query (-> (lib/query mp source-metric)
query (-> (lib/query mp (meta/table-metadata :products))
(lib/aggregate (lib/+ (lib.options/ensure-uuid [:metric {} (:id source-metric)]) 1)))]
(is (=?
{:stages [{:source-table (meta/id :products)}
{:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]
[:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]]}]}
{:stages [{:source-table (meta/id :products)
:aggregation [[:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]]}]}
(adjust query)))))

(deftest ^:parallel adjust-aggregation-metric-ordering-test
(let [[source-metric mp] (mock-metric)
query (-> (lib/query mp source-metric)
query (-> (lib/query mp (meta/table-metadata :products))
(lib/aggregate (lib/+ (lib.options/ensure-uuid [:metric {} (:id source-metric)]) 1)))]
(doseq [agg-ref (map lib.options/uuid (lib/aggregations query))
:let [ordered (lib/order-by query (lib.options/ensure-uuid [:aggregation {} agg-ref]))
adjusted (adjust ordered)]]
(is (=? {:stages
[{:source-table (meta/id :products)}
{:aggregation
[[:avg {} [:field {} (meta/id :products :rating)]]
[:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]]
[{:source-table (meta/id :products)
:aggregation
[[:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]]
:order-by
[[:asc {} [:aggregation {} agg-ref]]]}]}
adjusted)))))

(deftest ^:parallel adjust-basic-test
(let [[source-metric mp] (mock-metric)
query (-> (lib/query mp source-metric))]
(is (=?
{:stages [{:source-table (meta/id :products) :aggregation complement}
{:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]}
(adjust query)))))

(deftest ^:parallel adjust-join-test
(let [[source-metric mp] (mock-metric)
query (-> (lib/query mp source-metric)
Expand Down Expand Up @@ -113,22 +120,6 @@
{:filters [[:= {} [:field {} (meta/id :products :category)] "Widget"]]}]}
(adjust query)))))

(deftest ^:parallel adjust-multi-metric-test
(let [[first-metric mp] (mock-metric (lib/filter (basic-metric-query) (lib/> (meta/field-metadata :products :price) 1)))
[second-metric mp] (mock-metric mp (-> (lib/query mp first-metric)
(lib/filter (lib/< (meta/field-metadata :products :price) 100))))
query (-> (lib/query mp second-metric)
(lib/filter (lib/= (meta/field-metadata :products :category) "Widget")))]
(is (=?
{:stages [{:source-table (meta/id :products)
:filters [[:> {} [:field {} (meta/id :products :price)] 1]]
:aggregation complement}
{:filters [[:< {} [:field {} (meta/id :products :price)] 100]]
:aggregation complement}
{:filters [[:= {} [:field {} (meta/id :products :category)] "Widget"]]
:aggregation some?}]}
(adjust query)))))

(deftest ^:parallel adjust-mixed-multi-source-test
(let [[first-metric mp] (mock-metric lib.tu/metadata-provider-with-mock-cards
(-> (lib/query lib.tu/metadata-provider-with-mock-cards (:products lib.tu/mock-cards))
Expand All @@ -148,21 +139,7 @@
:aggregation some?}]}
(adjust query)))))

(deftest ^:parallel adjust-joined-metric-test
(let [[source-metric mp] (mock-metric)
query (as-> (lib/query mp (meta/table-metadata :orders)) $q
(lib/join $q (lib/join-clause (lib.metadata/card mp (:id source-metric))
[(lib/= (meta/field-metadata :orders :product-id)
(meta/field-metadata :products :id))]))
(lib/aggregate $q (lib.options/ensure-uuid [:metric {:join-alias "Mock metric - Product"} (:id source-metric)])))]
(is (=?
;; joins get an extra, empty stage from 'fetch-source-query'
{:stages [{:joins [{:stages [{:source-table (meta/id :products)} {}]}]
:aggregation [[:avg {:name "mock_metric___product"}
[:field {} (meta/id :products :rating)]]]}]}
(adjust query)))))

(deftest ^:parallel e2e-results-test
(deftest ^:parallel e2e-source-metric-results-test
(let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id))
source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products)))
(lib/filter (lib/< (lib.metadata/field mp (mt/id :products :price)) 3))
Expand All @@ -176,6 +153,23 @@
(mt/rows (qp/process-query source-query))
(mt/rows (qp/process-query query))))))))

(deftest ^:parallel e2e-source-table-results-test
(let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id))
source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products)))
(lib/filter (lib/< (lib.metadata/field mp (mt/id :products :price)) 30))
(lib/aggregate (lib/avg (lib.metadata/field mp (mt/id :products :rating)))))]
(mt/with-temp [:model/Card source-metric {:dataset_query (lib.convert/->legacy-MBQL source-query)
:database_id (mt/id)
:name "new_metric"
:type :metric}]
(let [query (-> (lib/query mp (lib.metadata/table mp (mt/id :products)))
(lib/filter (lib/< (lib.metadata/field mp (mt/id :products :rating)) 3))
(lib/aggregate (lib.metadata/metric mp (:id source-metric))))]
(is (=
(mt/rows (qp/process-query (-> source-query
(lib/filter (lib/< (lib.metadata/field mp (mt/id :products :rating)) 3)))))
(mt/rows (qp/process-query query))))))))

(deftest ^:parallel e2e-source-card-test
(let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id))
source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products)))
Expand Down

0 comments on commit 725bd48

Please sign in to comment.