Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(db): Add TenantID field to KafkaEvent struct #4598

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use crate::{
},
};

#[derive(Clone, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct TenantID(pub String);

#[derive(Clone)]
Expand Down Expand Up @@ -1109,7 +1109,7 @@ impl PaymentAttemptInterface for KafkaStore {

if let Err(er) = self
.kafka_producer
.log_payment_attempt(&attempt, None)
.log_payment_attempt(&attempt, None, TenantID("default".to_string()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.log_payment_attempt(&attempt, None, TenantID("default".to_string()))
.log_payment_attempt(&attempt, None, self.tenant_id)

Can we use the tenant_id stored in KafkaStore struct as such

.await
{
logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er)
Expand Down
110 changes: 69 additions & 41 deletions crates/router/src/services/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::types::storage::Dispute;

// Using message queue result here to avoid confusion with Kafka result provided by library
pub type MQResult<T> = CustomResult<T, KafkaError>;
use crate::db::kafka_store::TenantID;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the line @lsampras


pub trait KafkaMessage
where
Expand All @@ -54,19 +55,22 @@ struct KafkaEvent<'a, T: KafkaMessage> {
#[serde(flatten)]
event: &'a T,
sign_flag: i32,
tenant_id: TenantID,
}

impl<'a, T: KafkaMessage> KafkaEvent<'a, T> {
fn new(event: &'a T) -> Self {
fn new(event: &'a T, tenant_id: TenantID) -> Self {
Self {
event,
sign_flag: 1,
tenant_id,
}
}
fn old(event: &'a T) -> Self {
fn old(event: &'a T, tenant_id: TenantID) -> Self {
Self {
event,
sign_flag: -1,
tenant_id,
}
}
}
Expand Down Expand Up @@ -266,28 +270,32 @@ impl KafkaProducer {
&self,
attempt: &PaymentAttempt,
old_attempt: Option<PaymentAttempt>,
tenant_id: TenantID,
) -> MQResult<()> {
if let Some(negative_event) = old_attempt {
self.log_event(&KafkaEvent::old(&KafkaPaymentAttempt::from_storage(
&negative_event,
)))
self.log_event(&KafkaEvent::old(
&KafkaPaymentAttempt::from_storage(&negative_event),
tenant_id,
))
.attach_printable_lazy(|| {
format!("Failed to add negative attempt event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(&KafkaPaymentAttempt::from_storage(
attempt,
)))
self.log_event(&KafkaEvent::new(
&KafkaPaymentAttempt::from_storage(attempt),
tenant_id,
))
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))
}

pub async fn log_payment_attempt_delete(
&self,
delete_old_attempt: &PaymentAttempt,
) -> MQResult<()> {
self.log_event(&KafkaEvent::old(&KafkaPaymentAttempt::from_storage(
delete_old_attempt,
)))
self.log_event(&KafkaEvent::old(
&KafkaPaymentAttempt::from_storage(delete_old_attempt),
TenantID("default".to_string()),
))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we follow the same pattern as we did for log_payment_attempt function for every other usage as well?

there shouldn't be any new initializations of TenantID in this PR, since it is already defined in KafkaStore which is what we'll be using instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I this commit solves this issue,
let me know if further changes needed. @lsampras

.attach_printable_lazy(|| {
format!("Failed to add negative attempt event {delete_old_attempt:?}")
})
Expand All @@ -299,46 +307,56 @@ impl KafkaProducer {
old_intent: Option<PaymentIntent>,
) -> MQResult<()> {
if let Some(negative_event) = old_intent {
self.log_event(&KafkaEvent::old(&KafkaPaymentIntent::from_storage(
&negative_event,
)))
self.log_event(&KafkaEvent::old(
&KafkaPaymentIntent::from_storage(&negative_event),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative intent event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(&KafkaPaymentIntent::from_storage(intent)))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))
self.log_event(&KafkaEvent::new(
&KafkaPaymentIntent::from_storage(intent),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))
}

pub async fn log_payment_intent_delete(
&self,
delete_old_intent: &PaymentIntent,
) -> MQResult<()> {
self.log_event(&KafkaEvent::old(&KafkaPaymentIntent::from_storage(
delete_old_intent,
)))
self.log_event(&KafkaEvent::old(
&KafkaPaymentIntent::from_storage(delete_old_intent),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative intent event {delete_old_intent:?}")
})
}

pub async fn log_refund(&self, refund: &Refund, old_refund: Option<Refund>) -> MQResult<()> {
if let Some(negative_event) = old_refund {
self.log_event(&KafkaEvent::old(&KafkaRefund::from_storage(
&negative_event,
)))
self.log_event(&KafkaEvent::old(
&KafkaRefund::from_storage(&negative_event),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative refund event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(&KafkaRefund::from_storage(refund)))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))
self.log_event(&KafkaEvent::new(
&KafkaRefund::from_storage(refund),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))
}

pub async fn log_refund_delete(&self, delete_old_refund: &Refund) -> MQResult<()> {
self.log_event(&KafkaEvent::old(&KafkaRefund::from_storage(
delete_old_refund,
)))
self.log_event(&KafkaEvent::old(
&KafkaRefund::from_storage(delete_old_refund),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative refund event {delete_old_refund:?}")
})
Expand All @@ -350,15 +368,19 @@ impl KafkaProducer {
old_dispute: Option<Dispute>,
) -> MQResult<()> {
if let Some(negative_event) = old_dispute {
self.log_event(&KafkaEvent::old(&KafkaDispute::from_storage(
&negative_event,
)))
self.log_event(&KafkaEvent::old(
&KafkaDispute::from_storage(&negative_event),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative dispute event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(&KafkaDispute::from_storage(dispute)))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))
self.log_event(&KafkaEvent::new(
&KafkaDispute::from_storage(dispute),
TenantID("default".to_string()),
))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))
}

#[cfg(feature = "payouts")]
Expand All @@ -368,21 +390,27 @@ impl KafkaProducer {
old_payout: Option<KafkaPayout<'_>>,
) -> MQResult<()> {
if let Some(negative_event) = old_payout {
self.log_event(&KafkaEvent::old(&negative_event))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {negative_event:?}")
})?;
self.log_event(&KafkaEvent::old(
&negative_event,
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(payout))
self.log_event(&KafkaEvent::new(payout, TenantID("default".to_string())))
.attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}"))
}

#[cfg(feature = "payouts")]
pub async fn log_payout_delete(&self, delete_old_payout: &KafkaPayout<'_>) -> MQResult<()> {
self.log_event(&KafkaEvent::old(delete_old_payout))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {delete_old_payout:?}")
})
self.log_event(&KafkaEvent::old(
delete_old_payout,
TenantID("default".to_string()),
))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {delete_old_payout:?}")
})
}

pub fn get_topic(&self, event: EventType) -> &str {
Expand Down