Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(analytics): org level analytics #4530

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 14 additions & 14 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ impl AnalyticsProvider {
&self,
metric: &PaymentMetrics,
dimensions: &[PaymentDimensions],
merchant_id: &str,
filters: &PaymentFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
merchant_ids: &[String],
) -> types::MetricsResult<Vec<(PaymentMetricsBucketIdentifier, PaymentMetricRow)>> {
// Metrics to get the fetch time for each payment metric
metrics::request::record_operation_time(
Expand All @@ -110,44 +110,44 @@ impl AnalyticsProvider {
metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
merchant_ids
)
.await
}
Self::Clickhouse(pool) => {
metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
merchant_ids
)
.await
}
Self::CombinedCkh(sqlx_pool, ckh_pool) => {
let (ckh_result, sqlx_result) = tokio::join!(metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
ckh_pool,
merchant_ids
),
metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
sqlx_pool,
merchant_ids
));
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
Expand All @@ -163,20 +163,20 @@ impl AnalyticsProvider {
let (ckh_result, sqlx_result) = tokio::join!(metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
ckh_pool,
merchant_ids
),
metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
sqlx_pool,
merchant_ids
));
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
Expand All @@ -201,7 +201,7 @@ impl AnalyticsProvider {
&self,
distribution: &Distribution,
dimensions: &[PaymentDimensions],
merchant_id: &str,
merchant_ids: &[String],
filters: &PaymentFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
Expand All @@ -215,7 +215,7 @@ impl AnalyticsProvider {
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand All @@ -228,7 +228,7 @@ impl AnalyticsProvider {
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand All @@ -241,7 +241,7 @@ impl AnalyticsProvider {
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand All @@ -251,7 +251,7 @@ impl AnalyticsProvider {
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand All @@ -272,7 +272,7 @@ impl AnalyticsProvider {
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand All @@ -282,7 +282,7 @@ impl AnalyticsProvider {
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand Down
43 changes: 30 additions & 13 deletions crates/analytics/src/payments/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,32 @@ pub enum TaskType {
),
}

fn compare_and_return_matching(org_merchant_ids: &[String], payload: &[String]) -> Vec<String> {
let matching_values: Vec<String> = payload
.iter()
.filter(|i| org_merchant_ids.contains(i))
.cloned()
.collect();

if matching_values.is_empty() {
org_merchant_ids.to_vec()
} else {
matching_values
}
}

#[instrument(skip_all)]
pub async fn get_metrics(
pool: &AnalyticsProvider,
merchant_id: &str,

req: GetPaymentMetricRequest,
merchant_ids: &[String],
) -> AnalyticsResult<MetricsResponse<MetricsBucketResponse>> {
let org_merchant_ids = compare_and_return_matching(merchant_ids, &req.filters.merchant_id);
let mut metrics_accumulator: HashMap<
PaymentMetricsBucketIdentifier,
PaymentMetricsAccumulator,
> = HashMap::new();

let mut set = tokio::task::JoinSet::new();
for metric_type in req.metrics.iter().cloned() {
let req = req.clone();
Expand All @@ -63,17 +78,18 @@ pub async fn get_metrics(

// TODO: lifetime issues with joinset,
// can be optimized away if joinset lifetime requirements are relaxed
let merchant_id_scoped = merchant_id.to_owned();

let merchant_ids = org_merchant_ids.to_owned();
set.spawn(
async move {
let data = pool
.get_payment_metrics(
&metric_type,
&req.group_by_names.clone(),
&merchant_id_scoped,
&req.filters,
&req.time_series.map(|t| t.granularity),
&req.time_range,
&merchant_ids.clone(),
)
.await
.change_context(AnalyticsError::UnknownError);
Expand All @@ -91,14 +107,14 @@ pub async fn get_metrics(
payment_distribution = distribution.distribution_for.as_ref()
);

let merchant_id_scoped = merchant_id.to_owned();
let merchant_ids = org_merchant_ids.to_owned();
set.spawn(
async move {
let data = pool
.get_payment_distribution(
&distribution,
&req.group_by_names.clone(),
&merchant_id_scoped,
&merchant_ids,
&req.filters,
&req.time_series.map(|t| t.granularity),
&req.time_range,
Expand Down Expand Up @@ -221,33 +237,33 @@ pub async fn get_metrics(
pub async fn get_filters(
pool: &AnalyticsProvider,
req: GetPaymentFiltersRequest,
merchant_id: &String,
merchant_ids: &[String],
) -> AnalyticsResult<PaymentFiltersResponse> {
let mut res = PaymentFiltersResponse::default();

for dim in req.group_by_names {
let values = match pool {
AnalyticsProvider::Sqlx(pool) => {
get_payment_filter_for_dimension(dim, merchant_id, &req.time_range, pool)
get_payment_filter_for_dimension(dim, &req.time_range, pool,merchant_ids)
.await
}
AnalyticsProvider::Clickhouse(pool) => {
get_payment_filter_for_dimension(dim, merchant_id, &req.time_range, pool)
get_payment_filter_for_dimension(dim, &req.time_range, pool,merchant_ids)
.await
}
AnalyticsProvider::CombinedCkh(sqlx_poll, ckh_pool) => {
let ckh_result = get_payment_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
ckh_pool,
merchant_ids
)
.await;
let sqlx_result = get_payment_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
sqlx_poll,
merchant_ids
)
.await;
match (&sqlx_result, &ckh_result) {
Expand All @@ -261,16 +277,16 @@ pub async fn get_filters(
AnalyticsProvider::CombinedSqlx(sqlx_poll, ckh_pool) => {
let ckh_result = get_payment_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
ckh_pool,
merchant_ids
)
.await;
let sqlx_result = get_payment_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
sqlx_poll,
merchant_ids
)
.await;
match (&sqlx_result, &ckh_result) {
Expand All @@ -291,6 +307,7 @@ pub async fn get_filters(
PaymentDimensions::AuthType => fil.authentication_type.map(|i| i.as_ref().to_string()),
PaymentDimensions::PaymentMethod => fil.payment_method,
PaymentDimensions::PaymentMethodType => fil.payment_method_type,
PaymentDimensions::MerchantId=>fil.merchant_id
})
.collect::<Vec<String>>();
res.query_data.push(FilterValue {
Expand Down
7 changes: 4 additions & 3 deletions crates/analytics/src/payments/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct PaymentDistributionRow {
pub start_bucket: Option<PrimitiveDateTime>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub end_bucket: Option<PrimitiveDateTime>,
pub merchant_id: Option<String>,
}

pub trait PaymentDistributionAnalytics: LoadRow<PaymentDistributionRow> {}
Expand All @@ -45,7 +46,7 @@ where
&self,
distribution: &Distribution,
dimensions: &[PaymentDimensions],
merchant_id: &str,
merchant_ids: &[String],
filters: &PaymentFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
Expand All @@ -67,7 +68,7 @@ where
&self,
distribution: &Distribution,
dimensions: &[PaymentDimensions],
merchant_id: &str,
merchant_ids: &[String],
filters: &PaymentFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
Expand All @@ -79,7 +80,7 @@ where
.load_distribution(
distribution,
dimensions,
merchant_id,
merchant_ids,
filters,
granularity,
time_range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ where
&self,
distribution: &Distribution,
dimensions: &[PaymentDimensions],
merchant_id: &str,
merchant_ids: &[String],
filters: &PaymentFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
Expand Down Expand Up @@ -70,7 +70,7 @@ where
filters.set_filter_clause(&mut query_builder).switch()?;

query_builder
.add_filter_clause("merchant_id", merchant_id)
.add_filter_in_range_clause("merchant_id", merchant_ids)
.switch()?;

time_range
Expand Down
7 changes: 3 additions & 4 deletions crates/analytics/src/payments/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub trait PaymentFilterAnalytics: LoadRow<FilterRow> {}

pub async fn get_payment_filter_for_dimension<T>(
dimension: PaymentDimensions,
merchant: &String,
time_range: &TimeRange,
pool: &T,
merchant_ids: &[String],
) -> FiltersResult<Vec<FilterRow>>
where
T: AnalyticsDataSource + PaymentFilterAnalytics,
Expand All @@ -37,11 +37,9 @@ where
.switch()?;

query_builder
.add_filter_clause("merchant_id", merchant)
.add_filter_in_range_clause("merchant_id", merchant_ids)
.switch()?;

query_builder.set_distinct();

query_builder
.execute_query::<FilterRow, _>(pool)
.await
Expand All @@ -57,4 +55,5 @@ pub struct FilterRow {
pub authentication_type: Option<DBEnumWrapper<AuthenticationType>>,
pub payment_method: Option<String>,
pub payment_method_type: Option<String>,
pub merchant_id: Option<String>,
}