Skip to content

Commit

Permalink
refactor(db): Add TenantID field to KafkaEvent struct (#4598)
Browse files Browse the repository at this point in the history
Co-authored-by: Sampras Lopes <sampras.lopes@juspay.in>
  • Loading branch information
subhajit20 and lsampras committed May 15, 2024
1 parent 0958d94 commit 24214bc
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 53 deletions.
55 changes: 40 additions & 15 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use crate::{
},
};

#[derive(Clone, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct TenantID(pub String);

#[derive(Clone)]
Expand Down Expand Up @@ -413,7 +413,11 @@ impl DisputeInterface for KafkaStore {
) -> CustomResult<storage::Dispute, errors::StorageError> {
let dispute = self.diesel_store.insert_dispute(dispute_new).await?;

if let Err(er) = self.kafka_producer.log_dispute(&dispute, None).await {
if let Err(er) = self
.kafka_producer
.log_dispute(&dispute, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Dispute {dispute:?}", error_message=?er);
};

Expand Down Expand Up @@ -466,7 +470,7 @@ impl DisputeInterface for KafkaStore {
.await?;
if let Err(er) = self
.kafka_producer
.log_dispute(&dispute_new, Some(this))
.log_dispute(&dispute_new, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Dispute {dispute_new:?}", error_message=?er);
Expand Down Expand Up @@ -1109,7 +1113,7 @@ impl PaymentAttemptInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_attempt(&attempt, None)
.log_payment_attempt(&attempt, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er)
Expand All @@ -1131,7 +1135,7 @@ impl PaymentAttemptInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_attempt(&attempt, Some(this))
.log_payment_attempt(&attempt, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er)
Expand Down Expand Up @@ -1311,7 +1315,7 @@ impl PaymentIntentInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_intent(&intent, Some(this))
.log_payment_intent(&intent, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er);
Expand All @@ -1331,7 +1335,11 @@ impl PaymentIntentInterface for KafkaStore {
.insert_payment_intent(new, storage_scheme)
.await?;

if let Err(er) = self.kafka_producer.log_payment_intent(&intent, None).await {
if let Err(er) = self
.kafka_producer
.log_payment_intent(&intent, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er);
};

Expand Down Expand Up @@ -1558,6 +1566,7 @@ impl PayoutAttemptInterface for KafkaStore {
.log_payout(
&KafkaPayout::from_storage(payouts, &updated_payout_attempt),
Some(KafkaPayout::from_storage(payouts, this)),
self.tenant_id.clone(),
)
.await
{
Expand All @@ -1582,6 +1591,7 @@ impl PayoutAttemptInterface for KafkaStore {
.log_payout(
&KafkaPayout::from_storage(payouts, &payout_attempt_new),
None,
self.tenant_id.clone(),
)
.await
{
Expand Down Expand Up @@ -1639,6 +1649,7 @@ impl PayoutsInterface for KafkaStore {
.log_payout(
&KafkaPayout::from_storage(&payout, payout_attempt),
Some(KafkaPayout::from_storage(this, payout_attempt)),
self.tenant_id.clone(),
)
.await
{
Expand Down Expand Up @@ -1903,7 +1914,11 @@ impl RefundInterface for KafkaStore {
.update_refund(this.clone(), refund, storage_scheme)
.await?;

if let Err(er) = self.kafka_producer.log_refund(&refund, Some(this)).await {
if let Err(er) = self
.kafka_producer
.log_refund(&refund, Some(this), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to insert analytics event for Refund Update {refund?}", error_message=?er);
}
Ok(refund)
Expand Down Expand Up @@ -1931,7 +1946,11 @@ impl RefundInterface for KafkaStore {
) -> CustomResult<storage::Refund, errors::StorageError> {
let refund = self.diesel_store.insert_refund(new, storage_scheme).await?;

if let Err(er) = self.kafka_producer.log_refund(&refund, None).await {
if let Err(er) = self
.kafka_producer
.log_refund(&refund, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to insert analytics event for Refund Create {refund?}", error_message=?er);
}
Ok(refund)
Expand Down Expand Up @@ -2504,7 +2523,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_intent in payment_intents_list.iter() {
let _ = self
.kafka_producer
.log_payment_intent(payment_intent, None)
.log_payment_intent(payment_intent, None, self.tenant_id.clone())
.await;
}
Ok(payment_intents_list)
Expand All @@ -2525,7 +2544,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_attempt in payment_attempts_list.iter() {
let _ = self
.kafka_producer
.log_payment_attempt(payment_attempt, None)
.log_payment_attempt(payment_attempt, None, self.tenant_id.clone())
.await;
}
Ok(payment_attempts_list)
Expand All @@ -2542,7 +2561,10 @@ impl BatchSampleDataInterface for KafkaStore {
.await?;

for refund in refunds_list.iter() {
let _ = self.kafka_producer.log_refund(refund, None).await;
let _ = self
.kafka_producer
.log_refund(refund, None, self.tenant_id.clone())
.await;
}
Ok(refunds_list)
}
Expand All @@ -2562,7 +2584,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_intent in payment_intents_list.iter() {
let _ = self
.kafka_producer
.log_payment_intent_delete(payment_intent)
.log_payment_intent_delete(payment_intent, self.tenant_id.clone())
.await;
}
Ok(payment_intents_list)
Expand All @@ -2583,7 +2605,7 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_attempt in payment_attempts_list.iter() {
let _ = self
.kafka_producer
.log_payment_attempt_delete(payment_attempt)
.log_payment_attempt_delete(payment_attempt, self.tenant_id.clone())
.await;
}

Expand All @@ -2601,7 +2623,10 @@ impl BatchSampleDataInterface for KafkaStore {
.await?;

for refund in refunds_list.iter() {
let _ = self.kafka_producer.log_refund_delete(refund).await;
let _ = self
.kafka_producer
.log_refund_delete(refund, self.tenant_id.clone())
.await;
}

Ok(refunds_list)
Expand Down

0 comments on commit 24214bc

Please sign in to comment.