-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(db): Add TenantID field to KafkaEvent struct #4598
Changes from 2 commits
c8a5569
6cc8a60
0d12d38
4aa7bbf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ use crate::types::storage::Dispute; | |
|
||
// Using message queue result here to avoid confusion with Kafka result provided by library | ||
pub type MQResult<T> = CustomResult<T, KafkaError>; | ||
use crate::db::kafka_store::TenantID; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the line @lsampras |
||
|
||
pub trait KafkaMessage | ||
where | ||
|
@@ -54,19 +55,22 @@ struct KafkaEvent<'a, T: KafkaMessage> { | |
#[serde(flatten)] | ||
event: &'a T, | ||
sign_flag: i32, | ||
tenant_id: TenantID, | ||
} | ||
|
||
impl<'a, T: KafkaMessage> KafkaEvent<'a, T> { | ||
fn new(event: &'a T) -> Self { | ||
fn new(event: &'a T, tenant_id: TenantID) -> Self { | ||
Self { | ||
event, | ||
sign_flag: 1, | ||
tenant_id, | ||
} | ||
} | ||
fn old(event: &'a T) -> Self { | ||
fn old(event: &'a T, tenant_id: TenantID) -> Self { | ||
Self { | ||
event, | ||
sign_flag: -1, | ||
tenant_id, | ||
} | ||
} | ||
} | ||
|
@@ -266,28 +270,32 @@ impl KafkaProducer { | |
&self, | ||
attempt: &PaymentAttempt, | ||
old_attempt: Option<PaymentAttempt>, | ||
tenant_id: TenantID, | ||
) -> MQResult<()> { | ||
if let Some(negative_event) = old_attempt { | ||
self.log_event(&KafkaEvent::old(&KafkaPaymentAttempt::from_storage( | ||
&negative_event, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaPaymentAttempt::from_storage(&negative_event), | ||
tenant_id, | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative attempt event {negative_event:?}") | ||
})?; | ||
}; | ||
self.log_event(&KafkaEvent::new(&KafkaPaymentAttempt::from_storage( | ||
attempt, | ||
))) | ||
self.log_event(&KafkaEvent::new( | ||
&KafkaPaymentAttempt::from_storage(attempt), | ||
tenant_id, | ||
)) | ||
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}")) | ||
} | ||
|
||
pub async fn log_payment_attempt_delete( | ||
&self, | ||
delete_old_attempt: &PaymentAttempt, | ||
) -> MQResult<()> { | ||
self.log_event(&KafkaEvent::old(&KafkaPaymentAttempt::from_storage( | ||
delete_old_attempt, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaPaymentAttempt::from_storage(delete_old_attempt), | ||
TenantID("default".to_string()), | ||
)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we follow the same pattern as we did for there shouldn't be any new initializations of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I this commit solves this issue, |
||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative attempt event {delete_old_attempt:?}") | ||
}) | ||
|
@@ -299,46 +307,56 @@ impl KafkaProducer { | |
old_intent: Option<PaymentIntent>, | ||
) -> MQResult<()> { | ||
if let Some(negative_event) = old_intent { | ||
self.log_event(&KafkaEvent::old(&KafkaPaymentIntent::from_storage( | ||
&negative_event, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaPaymentIntent::from_storage(&negative_event), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative intent event {negative_event:?}") | ||
})?; | ||
}; | ||
self.log_event(&KafkaEvent::new(&KafkaPaymentIntent::from_storage(intent))) | ||
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}")) | ||
self.log_event(&KafkaEvent::new( | ||
&KafkaPaymentIntent::from_storage(intent), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}")) | ||
} | ||
|
||
pub async fn log_payment_intent_delete( | ||
&self, | ||
delete_old_intent: &PaymentIntent, | ||
) -> MQResult<()> { | ||
self.log_event(&KafkaEvent::old(&KafkaPaymentIntent::from_storage( | ||
delete_old_intent, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaPaymentIntent::from_storage(delete_old_intent), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative intent event {delete_old_intent:?}") | ||
}) | ||
} | ||
|
||
pub async fn log_refund(&self, refund: &Refund, old_refund: Option<Refund>) -> MQResult<()> { | ||
if let Some(negative_event) = old_refund { | ||
self.log_event(&KafkaEvent::old(&KafkaRefund::from_storage( | ||
&negative_event, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaRefund::from_storage(&negative_event), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative refund event {negative_event:?}") | ||
})?; | ||
}; | ||
self.log_event(&KafkaEvent::new(&KafkaRefund::from_storage(refund))) | ||
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}")) | ||
self.log_event(&KafkaEvent::new( | ||
&KafkaRefund::from_storage(refund), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}")) | ||
} | ||
|
||
pub async fn log_refund_delete(&self, delete_old_refund: &Refund) -> MQResult<()> { | ||
self.log_event(&KafkaEvent::old(&KafkaRefund::from_storage( | ||
delete_old_refund, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaRefund::from_storage(delete_old_refund), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative refund event {delete_old_refund:?}") | ||
}) | ||
|
@@ -350,15 +368,19 @@ impl KafkaProducer { | |
old_dispute: Option<Dispute>, | ||
) -> MQResult<()> { | ||
if let Some(negative_event) = old_dispute { | ||
self.log_event(&KafkaEvent::old(&KafkaDispute::from_storage( | ||
&negative_event, | ||
))) | ||
self.log_event(&KafkaEvent::old( | ||
&KafkaDispute::from_storage(&negative_event), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative dispute event {negative_event:?}") | ||
})?; | ||
}; | ||
self.log_event(&KafkaEvent::new(&KafkaDispute::from_storage(dispute))) | ||
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}")) | ||
self.log_event(&KafkaEvent::new( | ||
&KafkaDispute::from_storage(dispute), | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}")) | ||
} | ||
|
||
#[cfg(feature = "payouts")] | ||
|
@@ -368,21 +390,27 @@ impl KafkaProducer { | |
old_payout: Option<KafkaPayout<'_>>, | ||
) -> MQResult<()> { | ||
if let Some(negative_event) = old_payout { | ||
self.log_event(&KafkaEvent::old(&negative_event)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative payout event {negative_event:?}") | ||
})?; | ||
self.log_event(&KafkaEvent::old( | ||
&negative_event, | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative payout event {negative_event:?}") | ||
})?; | ||
}; | ||
self.log_event(&KafkaEvent::new(payout)) | ||
self.log_event(&KafkaEvent::new(payout, TenantID("default".to_string()))) | ||
.attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}")) | ||
} | ||
|
||
#[cfg(feature = "payouts")] | ||
pub async fn log_payout_delete(&self, delete_old_payout: &KafkaPayout<'_>) -> MQResult<()> { | ||
self.log_event(&KafkaEvent::old(delete_old_payout)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative payout event {delete_old_payout:?}") | ||
}) | ||
self.log_event(&KafkaEvent::old( | ||
delete_old_payout, | ||
TenantID("default".to_string()), | ||
)) | ||
.attach_printable_lazy(|| { | ||
format!("Failed to add negative payout event {delete_old_payout:?}") | ||
}) | ||
} | ||
|
||
pub fn get_topic(&self, event: EventType) -> &str { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the tenant_id stored in
KafkaStore
struct as such