Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(db): Add TenantID field to KafkaEvent struct #4598

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 41 additions & 17 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use common_enums::enums::MerchantStorageScheme;
use common_utils::{errors::CustomResult, pii};
use diesel_models::{
Expand All @@ -26,6 +24,7 @@ use scheduler::{
SchedulerInterface,
};
use serde::Serialize;
use std::sync::Arc;
use storage_impl::redis::kv_store::RedisConnInterface;
use time::PrimitiveDateTime;

Expand Down Expand Up @@ -76,7 +75,7 @@ use crate::{
},
};

#[derive(Clone, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct TenantID(pub String);

#[derive(Clone)]
Expand Down Expand Up @@ -413,7 +412,11 @@ impl DisputeInterface for KafkaStore {
) -> CustomResult<storage::Dispute, errors::StorageError> {
let dispute = self.diesel_store.insert_dispute(dispute_new).await?;

if let Err(er) = self.kafka_producer.log_dispute(&dispute, None).await {
if let Err(er) = self
.kafka_producer
.log_dispute(&dispute, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Dispute {dispute:?}", error_message=?er);
};

Expand Down Expand Up @@ -466,7 +469,7 @@ impl DisputeInterface for KafkaStore {
.await?;
if let Err(er) = self
.kafka_producer
.log_dispute(&dispute_new, Some(this))
.log_dispute(&dispute_new, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Dispute {dispute_new:?}", error_message=?er);
Expand Down Expand Up @@ -1109,7 +1112,7 @@ impl PaymentAttemptInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_attempt(&attempt, None)
.log_payment_attempt(&attempt, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er)
Expand All @@ -1131,7 +1134,7 @@ impl PaymentAttemptInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_attempt(&attempt, Some(this))
.log_payment_attempt(&attempt, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er)
Expand Down Expand Up @@ -1311,7 +1314,7 @@ impl PaymentIntentInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_intent(&intent, Some(this))
.log_payment_intent(&intent, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er);
Expand All @@ -1331,7 +1334,11 @@ impl PaymentIntentInterface for KafkaStore {
.insert_payment_intent(new, storage_scheme)
.await?;

if let Err(er) = self.kafka_producer.log_payment_intent(&intent, None).await {
if let Err(er) = self
.kafka_producer
.log_payment_intent(&intent, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er);
};

Expand Down Expand Up @@ -1558,6 +1565,7 @@ impl PayoutAttemptInterface for KafkaStore {
.log_payout(
&KafkaPayout::from_storage(payouts, &updated_payout_attempt),
Some(KafkaPayout::from_storage(payouts, this)),
self.tenant_id.clone(),
)
.await
{
Expand All @@ -1582,6 +1590,7 @@ impl PayoutAttemptInterface for KafkaStore {
.log_payout(
&KafkaPayout::from_storage(payouts, &payout_attempt_new),
None,
self.tenant_id.clone(),
)
.await
{
Expand Down Expand Up @@ -1639,6 +1648,7 @@ impl PayoutsInterface for KafkaStore {
.log_payout(
&KafkaPayout::from_storage(&payout, payout_attempt),
Some(KafkaPayout::from_storage(this, payout_attempt)),
self.tenant_id.clone(),
)
.await
{
Expand Down Expand Up @@ -1903,7 +1913,11 @@ impl RefundInterface for KafkaStore {
.update_refund(this.clone(), refund, storage_scheme)
.await?;

if let Err(er) = self.kafka_producer.log_refund(&refund, Some(this)).await {
if let Err(er) = self
.kafka_producer
.log_refund(&refund, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to insert analytics event for Refund Update {refund?}", error_message=?er);
}
Ok(refund)
Expand Down Expand Up @@ -1931,7 +1945,11 @@ impl RefundInterface for KafkaStore {
) -> CustomResult<storage::Refund, errors::StorageError> {
let refund = self.diesel_store.insert_refund(new, storage_scheme).await?;

if let Err(er) = self.kafka_producer.log_refund(&refund, None).await {
if let Err(er) = self
.kafka_producer
.log_refund(&refund, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to insert analytics event for Refund Create {refund?}", error_message=?er);
}
Ok(refund)
Expand Down Expand Up @@ -2504,7 +2522,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_intent in payment_intents_list.iter() {
let _ = self
.kafka_producer
.log_payment_intent(payment_intent, None)
.log_payment_intent(payment_intent, None, self.tenant_id.clone())
.await;
}
Ok(payment_intents_list)
Expand All @@ -2525,7 +2543,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_attempt in payment_attempts_list.iter() {
let _ = self
.kafka_producer
.log_payment_attempt(payment_attempt, None)
.log_payment_attempt(payment_attempt, None, self.tenant_id.clone())
.await;
}
Ok(payment_attempts_list)
Expand All @@ -2542,7 +2560,10 @@ impl BatchSampleDataInterface for KafkaStore {
.await?;

for refund in refunds_list.iter() {
let _ = self.kafka_producer.log_refund(refund, None).await;
let _ = self
.kafka_producer
.log_refund(refund, None, self.tenant_id.clone())
.await;
}
Ok(refunds_list)
}
Expand All @@ -2562,7 +2583,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_intent in payment_intents_list.iter() {
let _ = self
.kafka_producer
.log_payment_intent_delete(payment_intent)
.log_payment_intent_delete(payment_intent, self.tenant_id.clone())
.await;
}
Ok(payment_intents_list)
Expand All @@ -2583,7 +2604,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_attempt in payment_attempts_list.iter() {
let _ = self
.kafka_producer
.log_payment_attempt_delete(payment_attempt)
.log_payment_attempt_delete(payment_attempt, self.tenant_id.clone())
.await;
}

Expand All @@ -2601,7 +2622,10 @@ impl BatchSampleDataInterface for KafkaStore {
.await?;

for refund in refunds_list.iter() {
let _ = self.kafka_producer.log_refund_delete(refund).await;
let _ = self
.kafka_producer
.log_refund_delete(refund, self.tenant_id.clone())
.await;
}

Ok(refunds_list)
Expand Down