Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(connector_token): Move config redis #4540

Merged
merged 9 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/diesel_models/src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,13 @@ impl From<ConfigUpdate> for ConfigUpdateInternal {
}
}
}

impl From<ConfigNew> for Config {
fn from(config_new: ConfigNew) -> Self {
Self {
id: 0i32,
key: config_new.key,
config: config_new.config,
}
}
}
2 changes: 2 additions & 0 deletions crates/router/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ pub const POLL_ID_TTL: i64 = 900;
// Default Poll Config
pub const DEFAULT_POLL_DELAY_IN_SECS: i8 = 2;
pub const DEFAULT_POLL_FREQUENCY: i8 = 5;

pub const CONNECTOR_CREDS_TOKEN_TTL: i64 = 900;
90 changes: 65 additions & 25 deletions crates/router/src/core/payments/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use common_utils::{
use diesel_models::enums;
// TODO : Evaluate all the helper functions ()
use error_stack::{report, ResultExt};
use futures::future::Either;
use hyperswitch_domain_models::{
mandates::MandateData,
payments::{payment_attempt::PaymentAttempt, PaymentIntent},
Expand Down Expand Up @@ -3077,27 +3078,31 @@ pub async fn insert_merchant_connector_creds_to_config(
merchant_connector_details: admin::MerchantConnectorDetailsWrap,
) -> RouterResult<()> {
if let Some(encoded_data) = merchant_connector_details.encoded_data {
match db
.insert_config(storage::ConfigNew {
key: format!(
"mcd_{merchant_id}_{}",
merchant_connector_details.creds_identifier
),
config: encoded_data.peek().to_owned(),
})
let redis = &db
.get_redis_conn()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to get redis connection")?;

let key = format!(
"mcd_{merchant_id}_{}",
merchant_connector_details.creds_identifier
);

redis
.serialize_and_set_key_with_expiry(
key.as_str(),
&encoded_data.peek(),
crate::consts::CONNECTOR_CREDS_TOKEN_TTL,
)
.await
{
Ok(_) => Ok(()),
Err(err) => {
if err.current_context().is_db_unique_violation() {
Ok(())
} else {
Err(err
.map_or_else(
|e| {
Err(e
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to insert connector_creds to config"))
}
}
}
},
|_| Ok(()),
)
} else {
Ok(())
}
Expand Down Expand Up @@ -3170,14 +3175,49 @@ pub async fn get_merchant_connector_account(
let db = &*state.store;
match creds_identifier {
Some(creds_identifier) => {
let mca_config = db
.find_config_by_key(format!("mcd_{merchant_id}_{creds_identifier}").as_str())
let key = format!("mcd_{merchant_id}_{creds_identifier}");
let redis_fetch = || async {
db.get_redis_conn()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to get redis connection")
.async_and_then(|redis| async move {
redis
.get_and_deserialize_key(key.as_str(), "String")
.await
.change_context(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: key.clone(),
},
)
.attach_printable(key + ": Not found in Redis")
})
.await
};

let db_fetch = || async {
db.find_config_by_key(format!("mcd_{merchant_id}_{creds_identifier}").as_str())
.await
.to_not_found_response(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: format!("mcd_{merchant_id}_{creds_identifier}"),
},
)
};

let mca_config: String = redis_fetch()
.await
.to_not_found_response(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: format!("mcd_{merchant_id}_{creds_identifier}"),
.map_or_else(
|_| {
Either::Left(async {
match db_fetch().await {
Ok(config_entry) => Ok(config_entry.config),
Err(e) => Err(e),
}
})
},
)?;
|result| Either::Right(async { Ok(result) }),
)
.await?;

let private_key = state
.conf
Expand All @@ -3187,7 +3227,7 @@ pub async fn get_merchant_connector_account(
.peek()
.as_bytes();

let decrypted_mca = services::decrypt_jwe(mca_config.config.as_str(), services::KeyIdCheck::SkipKeyIdCheck, private_key, jwe::RSA_OAEP_256)
let decrypted_mca = services::decrypt_jwe(mca_config.as_str(), services::KeyIdCheck::SkipKeyIdCheck, private_key, jwe::RSA_OAEP_256)
.await
.change_context(errors::ApiErrorResponse::UnprocessableEntity{
message: "decoding merchant_connector_details failed due to invalid data format!".into()})
Expand Down
27 changes: 17 additions & 10 deletions crates/router/src/db/configs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use common_utils::ext_traits::AsyncExt;
use diesel_models::configs::ConfigUpdateInternal;
use error_stack::{report, ResultExt};
use router_env::{instrument, tracing};
Expand Down Expand Up @@ -65,10 +64,21 @@ impl ConfigInterface for Store {
config: storage::ConfigNew,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
config
let inserted = config
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
.map_err(|error| report!(errors::StorageError::from(error)))?;

self.get_redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.publish(
consts::PUB_SUB_CHANNEL,
CacheKind::Config((&inserted.key).into()),
)
.await
.map_err(Into::<errors::StorageError>::into)?;

Ok(inserted)
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -126,7 +136,7 @@ impl ConfigInterface for Store {
async fn find_config_by_key_unwrap_or(
&self,
key: &str,
// If the config is not found it will be created with the default value.
// If the config is not found it will be cached with the default value.
default_config: Option<String>,
) -> CustomResult<storage::Config, errors::StorageError> {
let find_else_unwrap_or = || async {
Expand All @@ -139,17 +149,14 @@ impl ConfigInterface for Store {
Err(err) => {
if err.current_context().is_db_not_found() {
default_config
.ok_or(err)
.async_and_then(|c| async {
.map(|c| {
storage::ConfigNew {
key: key.to_string(),
config: c,
}
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
.into()
})
.await
.ok_or(err)
} else {
Err(err)
}
Expand Down