diff --git a/crates/hyperswitch_domain_models/src/customer.rs b/crates/hyperswitch_domain_models/src/customer.rs index 65e17ace222..68d3408e00a 100644 --- a/crates/hyperswitch_domain_models/src/customer.rs +++ b/crates/hyperswitch_domain_models/src/customer.rs @@ -1,3 +1,4 @@ +use common_enums::enums::MerchantStorageScheme; #[cfg(all(feature = "v2", feature = "customer_v2"))] use common_enums::DeleteStatus; use common_utils::{ @@ -11,13 +12,15 @@ use common_utils::{ Description, }, }; -use diesel_models::customers::CustomerUpdateInternal; +use diesel_models::{ + customers as storage_types, customers::CustomerUpdateInternal, query::customers as query, +}; use error_stack::ResultExt; use masking::{PeekInterface, Secret, SwitchStrategy}; use rustc_hash::FxHashMap; use time::PrimitiveDateTime; -use crate::type_encryption as types; +use crate::{behaviour, merchant_key_store::MerchantKeyStore, type_encryption as types}; #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[derive(Clone, Debug, router_derive::ToEncryption)] @@ -109,7 +112,7 @@ impl Customer { #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[async_trait::async_trait] -impl super::behaviour::Conversion for Customer { +impl behaviour::Conversion for Customer { type DstType = diesel_models::customers::Customer; type NewDstType = diesel_models::customers::CustomerNew; async fn convert(self) -> CustomResult { @@ -213,7 +216,7 @@ impl super::behaviour::Conversion for Customer { #[cfg(all(feature = "v2", feature = "customer_v2"))] #[async_trait::async_trait] -impl super::behaviour::Conversion for Customer { +impl behaviour::Conversion for Customer { type DstType = diesel_models::customers::Customer; type NewDstType = diesel_models::customers::CustomerNew; async fn convert(self) -> CustomResult { @@ -501,3 +504,136 @@ impl From for CustomerUpdateInternal { } } } + +pub struct CustomerListConstraints { + pub limit: u16, + pub offset: Option, +} + +impl From for query::CustomerListConstraints { + fn from(value: CustomerListConstraints) -> Self { + Self { + limit: i64::from(value.limit), + offset: value.offset.map(i64::from), + } + } +} + +#[async_trait::async_trait] +pub trait CustomerInterface +where + Customer: behaviour::Conversion< + DstType = storage_types::Customer, + NewDstType = storage_types::CustomerNew, + >, +{ + type Error; + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn delete_customer_by_customer_id_merchant_id( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + ) -> CustomResult; + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, Self::Error>; + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, Self::Error>; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, Self::Error>; + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: id_type::CustomerId, + merchant_id: id_type::MerchantId, + customer: Customer, + customer_update: CustomerUpdate, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + async fn list_customers_by_merchant_id( + &self, + state: &KeyManagerState, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + constraints: CustomerListConstraints, + ) -> CustomResult, Self::Error>; + + async fn insert_customer( + &self, + customer_data: Customer, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &id_type::GlobalCustomerId, + customer: Customer, + merchant_id: &id_type::MerchantId, + customer_update: CustomerUpdate, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &id_type::GlobalCustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; +} diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index d98f834e7b5..1f71bacbbe8 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -96,7 +96,7 @@ pub trait StorageInterface: + blocklist_lookup::BlocklistLookupInterface + configs::ConfigInterface + capture::CaptureInterface - + customers::CustomerInterface + + customers::CustomerInterface + dashboard_metadata::DashboardMetadataInterface + dispute::DisputeInterface + ephemeral_key::EphemeralKeyInterface diff --git a/crates/router/src/db/customers.rs b/crates/router/src/db/customers.rs index f1845be5373..a70908970ec 100644 --- a/crates/router/src/db/customers.rs +++ b/crates/router/src/db/customers.rs @@ -1,1554 +1 @@ -use common_utils::{ext_traits::AsyncExt, id_type, types::keymanager::KeyManagerState}; -use diesel_models::query::customers::CustomerListConstraints as DieselCustomerListConstraints; -use error_stack::ResultExt; -use futures::future::try_join_all; -use hyperswitch_domain_models::customer; -#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] -use router_env::{instrument, tracing}; - -use super::MockDb; -use crate::{ - core::errors::{self, CustomResult}, - types::{ - domain::{ - self, - behaviour::{Conversion, ReverseConversion}, - }, - storage::{self as storage_types, enums::MerchantStorageScheme}, - }, -}; - -pub struct CustomerListConstraints { - pub limit: u16, - pub offset: Option, -} - -impl From for DieselCustomerListConstraints { - fn from(value: CustomerListConstraints) -> Self { - Self { - limit: i64::from(value.limit), - offset: value.offset.map(i64::from), - } - } -} - -#[async_trait::async_trait] -pub trait CustomerInterface -where - customer::Customer: - Conversion, -{ - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn delete_customer_by_customer_id_merchant_id( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - ) -> CustomResult; - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_optional_by_merchant_id_merchant_reference_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[allow(clippy::too_many_arguments)] - async fn update_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: id_type::CustomerId, - merchant_id: id_type::MerchantId, - customer: customer::Customer, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_customer_by_merchant_reference_id_merchant_id( - &self, - state: &KeyManagerState, - merchant_reference_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - async fn list_customers_by_merchant_id( - &self, - state: &KeyManagerState, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - constraints: CustomerListConstraints, - ) -> CustomResult, errors::StorageError>; - - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[allow(clippy::too_many_arguments)] - async fn update_customer_by_global_id( - &self, - state: &KeyManagerState, - id: &id_type::GlobalCustomerId, - customer: customer::Customer, - merchant_id: &id_type::MerchantId, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_customer_by_global_id( - &self, - state: &KeyManagerState, - id: &id_type::GlobalCustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; -} - -#[cfg(feature = "kv_store")] -mod storage { - use common_utils::{ext_traits::AsyncExt, id_type, types::keymanager::KeyManagerState}; - use diesel_models::kv; - use error_stack::{report, ResultExt}; - use futures::future::try_join_all; - use hyperswitch_domain_models::customer; - use masking::PeekInterface; - use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{ - decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, - }; - - use super::CustomerInterface; - use crate::{ - connection, - core::{ - customers::REDACTED, - errors::{self, CustomResult}, - }, - services::Store, - types::{ - domain::{ - self, - behaviour::{Conversion, ReverseConversion}, - }, - storage::{self as storage_types, enums::MerchantStorageScheme}, - }, - utils::db_utils, - }; - - #[async_trait::async_trait] - impl CustomerInterface for Store { - #[instrument(skip_all)] - // check customer not found in kv and fallback to db - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|err| report!(errors::StorageError::from(err))) - }; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Find, - )) - .await; - let maybe_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id, - customer_id, - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - // check for ValueNotFound - async { - Box::pin(kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - )) - .await? - .try_into_hget() - .map(Some) - }, - database_call, - )) - .await - } - }?; - - let maybe_result = maybe_customer - .async_map(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - - maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(Some(customer)), - }) - } - - #[instrument(skip_all)] - // check customer not found in kv and fallback to db - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|err| report!(errors::StorageError::from(err))) - }; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Find, - )) - .await; - let maybe_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id, - customer_id, - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - // check for ValueNotFound - async { - Box::pin(kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - )) - .await? - .try_into_hget() - .map(Some) - }, - database_call, - )) - .await - } - }?; - - let maybe_result = maybe_customer - .async_map(|customer| async { - customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - - Ok(maybe_result) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_optional_by_merchant_id_merchant_reference_id( - &self, - state: &KeyManagerState, - merchant_reference_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_optional_by_merchant_id_merchant_reference_id( - &conn, - merchant_reference_id, - merchant_id, - ) - .await - .map_err(|err| report!(errors::StorageError::from(err))) - }; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Find, - )) - .await; - let maybe_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdMerchantReferenceId { - merchant_id, - merchant_reference_id: merchant_reference_id.get_string_repr(), - }; - let field = format!("cust_{}", merchant_reference_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - // check for ValueNotFound - async { - kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - ) - .await? - .try_into_hget() - .map(Some) - }, - database_call, - )) - .await - } - }?; - - let maybe_result = maybe_customer - .async_map(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - - maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(Some(customer)), - }) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn update_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: id_type::CustomerId, - merchant_id: id_type::MerchantId, - customer: customer::Customer, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - let customer = Conversion::convert(customer) - .await - .change_context(errors::StorageError::EncryptionError)?; - let database_call = || async { - storage_types::Customer::update_by_customer_id_merchant_id( - &conn, - customer_id.clone(), - merchant_id.clone(), - customer_update.clone().into(), - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: &customer_id, - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Update(key.clone(), &field, customer.updated_by.as_deref()), - )) - .await; - let updated_object = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let updated_customer = - diesel_models::CustomerUpdateInternal::from(customer_update.clone()) - .apply_changeset(customer.clone()); - - let redis_value = serde_json::to_string(&updated_customer) - .change_context(errors::StorageError::KVError)?; - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Update { - updatable: Box::new(kv::Updateable::CustomerUpdate( - kv::CustomerUpdateMems { - orig: customer, - update_data: customer_update.into(), - }, - )), - }, - }; - - Box::pin(kv_wrapper::<(), _, _>( - self, - KvOperation::Hset::( - (&field, redis_value), - redis_entry, - ), - key, - )) - .await - .change_context(errors::StorageError::KVError)? - .try_into_hset() - .change_context(errors::StorageError::KVError)?; - - Ok(updated_customer) - } - }; - - updated_object? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_customer_by_merchant_reference_id_merchant_id( - &self, - state: &KeyManagerState, - merchant_reference_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_by_merchant_reference_id_merchant_id( - &conn, - merchant_reference_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Find, - )) - .await; - let customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdMerchantReferenceId { - merchant_id, - merchant_reference_id: merchant_reference_id.get_string_repr(), - }; - let field = format!("cust_{}", merchant_reference_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - ) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - }?; - - let result: customer::Customer = customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?; - - match result.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(result), - } - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn find_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Find, - )) - .await; - let customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id, - customer_id, - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - Box::pin(kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - )) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - }?; - - let result: customer::Customer = customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?; - - match result.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(result), - } - } - - #[instrument(skip_all)] - async fn list_customers_by_merchant_id( - &self, - state: &KeyManagerState, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - constraints: super::CustomerListConstraints, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - - let customer_list_constraints = - diesel_models::query::customers::CustomerListConstraints::from(constraints); - - let encrypted_customers = storage_types::Customer::list_by_merchant_id( - &conn, - merchant_id, - customer_list_constraints, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let customers = try_join_all(encrypted_customers.into_iter().map( - |encrypted_customer| async { - encrypted_customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }, - )) - .await?; - - Ok(customers) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let id = customer_data.id.clone(); - let mut new_customer = customer_data - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)?; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Insert, - )) - .await; - new_customer.update_storage_scheme(storage_scheme); - let create_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - new_customer - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::GlobalId { - id: id.get_string_repr(), - }; - let field = format!("cust_{}", id.get_string_repr()); - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Insert { - insertable: Box::new(kv::Insertable::Customer(new_customer.clone())), - }, - }; - let storage_customer = new_customer.into(); - - match kv_wrapper::( - self, - KvOperation::HSetNx::( - &field, - &storage_customer, - redis_entry, - ), - key, - ) - .await - .change_context(errors::StorageError::KVError)? - .try_into_hsetnx() - { - Ok(redis_interface::HsetnxReply::KeyNotSet) => { - Err(report!(errors::StorageError::DuplicateValue { - entity: "customer", - key: Some(id.get_string_repr().to_owned()), - })) - } - Ok(redis_interface::HsetnxReply::KeySet) => Ok(storage_customer), - Err(er) => Err(er).change_context(errors::StorageError::KVError), - } - } - }?; - - create_customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let customer_id = customer_data.customer_id.clone(); - let merchant_id = customer_data.merchant_id.clone(); - let mut new_customer = customer_data - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)?; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Insert, - )) - .await; - new_customer.update_storage_scheme(storage_scheme); - let create_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - new_customer - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: &customer_id, - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Insert { - insertable: Box::new(kv::Insertable::Customer(new_customer.clone())), - }, - }; - let storage_customer = new_customer.into(); - - match Box::pin(kv_wrapper::( - self, - KvOperation::HSetNx::( - &field, - &storage_customer, - redis_entry, - ), - key, - )) - .await - .change_context(errors::StorageError::KVError)? - .try_into_hsetnx() - { - Ok(redis_interface::HsetnxReply::KeyNotSet) => { - Err(report!(errors::StorageError::DuplicateValue { - entity: "customer", - key: Some(customer_id.get_string_repr().to_string()), - })) - } - Ok(redis_interface::HsetnxReply::KeySet) => Ok(storage_customer), - Err(er) => Err(er).change_context(errors::StorageError::KVError), - } - } - }?; - - create_customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn delete_customer_by_customer_id_merchant_id( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::Customer::delete_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_customer_by_global_id( - &self, - state: &KeyManagerState, - id: &id_type::GlobalCustomerId, - _merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_by_global_id(&conn, id) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Find, - )) - .await; - let customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::GlobalId { - id: id.get_string_repr(), - }; - let field = format!("cust_{}", id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - ) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - }?; - - let result: customer::Customer = customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?; - - if result.status == common_enums::DeleteStatus::Redacted { - Err(report!(errors::StorageError::CustomerRedacted)) - } else { - Ok(result) - } - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn update_customer_by_global_id( - &self, - state: &KeyManagerState, - id: &id_type::GlobalCustomerId, - customer: customer::Customer, - _merchant_id: &id_type::MerchantId, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - let customer = Conversion::convert(customer) - .await - .change_context(errors::StorageError::EncryptionError)?; - let database_call = || async { - storage_types::Customer::update_by_id( - &conn, - id.clone(), - customer_update.clone().into(), - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let key = PartitionKey::GlobalId { - id: id.get_string_repr(), - }; - let field = format!("cust_{}", id.get_string_repr()); - let storage_scheme = Box::pin(decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Update(key.clone(), &field, customer.updated_by.as_deref()), - )) - .await; - let updated_object = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let updated_customer = - diesel_models::CustomerUpdateInternal::from(customer_update.clone()) - .apply_changeset(customer.clone()); - - let redis_value = serde_json::to_string(&updated_customer) - .change_context(errors::StorageError::KVError)?; - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Update { - updatable: Box::new(kv::Updateable::CustomerUpdate( - kv::CustomerUpdateMems { - orig: customer, - update_data: customer_update.into(), - }, - )), - }, - }; - - kv_wrapper::<(), _, _>( - self, - KvOperation::Hset::( - (&field, redis_value), - redis_entry, - ), - key, - ) - .await - .change_context(errors::StorageError::KVError)? - .try_into_hset() - .change_context(errors::StorageError::KVError)?; - - Ok(updated_customer) - } - }; - - updated_object? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - } -} - -#[cfg(not(feature = "kv_store"))] -mod storage { - use common_utils::{ext_traits::AsyncExt, id_type, types::keymanager::KeyManagerState}; - use error_stack::{report, ResultExt}; - use futures::future::try_join_all; - use hyperswitch_domain_models::customer; - use masking::PeekInterface; - use router_env::{instrument, tracing}; - - use super::CustomerInterface; - use crate::{ - connection, - core::{ - customers::REDACTED, - errors::{self, CustomResult}, - }, - services::Store, - types::{ - domain::{ - self, - behaviour::{Conversion, ReverseConversion}, - }, - storage::{self as storage_types, enums::MerchantStorageScheme}, - }, - }; - - #[async_trait::async_trait] - impl CustomerInterface for Store { - #[instrument(skip_all)] - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let maybe_customer: Option = - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .async_map(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - maybe_customer.map_or(Ok(None), |customer| { - // in the future, once #![feature(is_some_and)] is stable, we can make this more concise: - // `if customer.name.is_some_and(|ref name| name == REDACTED) ...` - match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(Some(customer)), - } - }) - } - - #[instrument(skip_all)] - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let maybe_customer: Option = - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .async_map(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - Ok(maybe_customer) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_optional_by_merchant_id_merchant_reference_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let maybe_customer: Option = - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .async_map(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - maybe_customer.map_or(Ok(None), |customer| { - // in the future, once #![feature(is_some_and)] is stable, we can make this more concise: - // `if customer.name.is_some_and(|ref name| name == REDACTED) ...` - match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(Some(customer)), - } - }) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn update_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: id_type::CustomerId, - merchant_id: id_type::MerchantId, - _customer: customer::Customer, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::Customer::update_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id.clone(), - customer_update.into(), - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn find_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let customer: customer::Customer = - storage_types::Customer::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await?; - match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(customer), - } - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_customer_by_merchant_reference_id_merchant_id( - &self, - state: &KeyManagerState, - merchant_reference_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let customer: customer::Customer = - storage_types::Customer::find_by_merchant_reference_id_merchant_id( - &conn, - merchant_reference_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await?; - match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(customer), - } - } - - #[instrument(skip_all)] - async fn list_customers_by_merchant_id( - &self, - state: &KeyManagerState, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - constraints: super::CustomerListConstraints, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - - let customer_list_constraints = - diesel_models::query::customers::CustomerListConstraints::from(constraints); - - let encrypted_customers = storage_types::Customer::list_by_merchant_id( - &conn, - merchant_id, - customer_list_constraints, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let customers = try_join_all(encrypted_customers.into_iter().map( - |encrypted_customer| async { - encrypted_customer - .convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }, - )) - .await?; - - Ok(customers) - } - - #[instrument(skip_all)] - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - customer_data - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)? - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn delete_customer_by_customer_id_merchant_id( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::Customer::delete_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[allow(clippy::too_many_arguments)] - async fn update_customer_by_global_id( - &self, - state: &KeyManagerState, - id: &id_type::GlobalCustomerId, - customer: customer::Customer, - merchant_id: &id_type::MerchantId, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::Customer::update_by_global_id(&conn, id, customer_update.into()) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_customer_by_global_id( - &self, - state: &KeyManagerState, - id: &id_type::GlobalCustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let customer: customer::Customer = - storage_types::Customer::find_by_global_id(&conn, customer_id, merchant_id) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await?; - match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(customer), - } - } - } -} - -#[async_trait::async_trait] -impl CustomerInterface for MockDb { - #[allow(clippy::panic)] - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let customers = self.customers.lock().await; - let customer = customers - .iter() - .find(|customer| { - customer.customer_id == *customer_id && &customer.merchant_id == merchant_id - }) - .cloned(); - customer - .async_map(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose() - } - - #[allow(clippy::panic)] - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let customers = self.customers.lock().await; - let customer = customers - .iter() - .find(|customer| { - customer.customer_id == *customer_id && &customer.merchant_id == merchant_id - }) - .cloned(); - customer - .async_map(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose() - } - - #[allow(clippy::panic)] - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_optional_by_merchant_id_merchant_reference_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - todo!() - } - - async fn list_customers_by_merchant_id( - &self, - state: &KeyManagerState, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - constraints: CustomerListConstraints, - ) -> CustomResult, errors::StorageError> { - let customers = self.customers.lock().await; - - let customers = try_join_all( - customers - .iter() - .filter(|customer| customer.merchant_id == *merchant_id) - .take(usize::from(constraints.limit)) - .skip(usize::try_from(constraints.offset.unwrap_or(0)).unwrap_or(0)) - .map(|customer| async { - customer - .to_owned() - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }), - ) - .await?; - - Ok(customers) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[instrument(skip_all)] - async fn update_customer_by_customer_id_merchant_id( - &self, - _state: &KeyManagerState, - _customer_id: id_type::CustomerId, - _merchant_id: id_type::MerchantId, - _customer: customer::Customer, - _customer_update: storage_types::CustomerUpdate, - _key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn find_customer_by_customer_id_merchant_id( - &self, - _state: &KeyManagerState, - _customer_id: &id_type::CustomerId, - _merchant_id: &id_type::MerchantId, - _key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_customer_by_merchant_reference_id_merchant_id( - &self, - _state: &KeyManagerState, - _merchant_reference_id: &id_type::CustomerId, - _merchant_id: &id_type::MerchantId, - _key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? - } - - #[allow(clippy::panic)] - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let mut customers = self.customers.lock().await; - - let customer = Conversion::convert(customer_data) - .await - .change_context(errors::StorageError::EncryptionError)?; - - customers.push(customer.clone()); - - customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - async fn delete_customer_by_customer_id_merchant_id( - &self, - _customer_id: &id_type::CustomerId, - _merchant_id: &id_type::MerchantId, - ) -> CustomResult { - // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[allow(clippy::too_many_arguments)] - async fn update_customer_by_global_id( - &self, - _state: &KeyManagerState, - _id: &id_type::GlobalCustomerId, - _customer: customer::Customer, - _merchant_id: &id_type::MerchantId, - _customer_update: storage_types::CustomerUpdate, - _key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_customer_by_global_id( - &self, - _state: &KeyManagerState, - _id: &id_type::GlobalCustomerId, - _merchant_id: &id_type::MerchantId, - _key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? - } -} +pub use hyperswitch_domain_models::customer::{CustomerInterface, CustomerListConstraints}; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index a5171c167b8..4d44cfffc5d 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -372,6 +372,7 @@ impl ConfigInterface for KafkaStore { #[async_trait::async_trait] impl CustomerInterface for KafkaStore { + type Error = errors::StorageError; #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn delete_customer_by_customer_id_merchant_id( &self, diff --git a/crates/storage_impl/src/customers.rs b/crates/storage_impl/src/customers.rs index 0d89e1c454b..3c30f0730e0 100644 --- a/crates/storage_impl/src/customers.rs +++ b/crates/storage_impl/src/customers.rs @@ -1,5 +1,901 @@ -use diesel_models::customers::Customer; +use common_utils::{id_type, pii}; +use diesel_models::{customers, kv}; +use error_stack::ResultExt; +use futures::future::try_join_all; +use hyperswitch_domain_models::{ + behaviour::{Conversion, ReverseConversion}, + customer as domain, + merchant_key_store::MerchantKeyStore, +}; +use masking::PeekInterface; +use router_env::{instrument, tracing}; -use crate::redis::kv_store::KvStorePartition; +use crate::{ + diesel_error_to_data_error, + errors::StorageError, + kv_router_store, + redis::kv_store::{decide_storage_scheme, KvStorePartition, Op, PartitionKey}, + store::enums::MerchantStorageScheme, + utils::{pg_connection_read, pg_connection_write}, + CustomResult, DatabaseStore, KeyManagerState, MockDb, RouterStore, +}; -impl KvStorePartition for Customer {} +impl KvStorePartition for customers::Customer {} + +#[async_trait::async_trait] +impl domain::CustomerInterface for kv_router_store::KVRouterStore { + type Error = StorageError; + #[instrument(skip_all)] + // check customer not found in kv and fallback to db + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + let maybe_result = self + .find_optional_resource_by_id( + state, + key_store, + storage_scheme, + customers::Customer::find_optional_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ), + kv_router_store::FindResourceBy::Id( + format!("cust_{}", customer_id.get_string_repr()), + PartitionKey::MerchantIdCustomerId { + merchant_id, + customer_id, + }, + ), + ) + .await?; + + maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(Some(customer)), + }) + } + + #[instrument(skip_all)] + // check customer not found in kv and fallback to db + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + self.find_optional_resource_by_id( + state, + key_store, + storage_scheme, + customers::Customer::find_optional_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ), + kv_router_store::FindResourceBy::Id( + format!("cust_{}", customer_id.get_string_repr()), + PartitionKey::MerchantIdCustomerId { + merchant_id, + customer_id, + }, + ), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + let maybe_result = self + .find_optional_resource_by_id( + state, + key_store, + storage_scheme, + customers::Customer::find_optional_by_merchant_id_merchant_reference_id( + &conn, + merchant_reference_id, + merchant_id, + ), + kv_router_store::FindResourceBy::Id( + format!("cust_{}", merchant_reference_id.get_string_repr()), + PartitionKey::MerchantIdMerchantReferenceId { + merchant_id, + merchant_reference_id: merchant_reference_id.get_string_repr(), + }, + ), + ) + .await?; + + maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(Some(customer)), + }) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn update_customer_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: id_type::CustomerId, + merchant_id: id_type::MerchantId, + customer: domain::Customer, + customer_update: domain::CustomerUpdate, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + let customer = Conversion::convert(customer) + .await + .change_context(StorageError::EncryptionError)?; + let updated_customer = diesel_models::CustomerUpdateInternal::from(customer_update.clone()) + .apply_changeset(customer.clone()); + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &merchant_id, + customer_id: &customer_id, + }; + let field = format!("cust_{}", customer_id.get_string_repr()); + self.update_resource( + state, + key_store, + storage_scheme, + customers::Customer::update_by_customer_id_merchant_id( + &conn, + customer_id.clone(), + merchant_id.clone(), + customer_update.clone().into(), + ), + updated_customer, + kv_router_store::UpdateResourceParams { + updateable: kv::Updateable::CustomerUpdate(kv::CustomerUpdateMems { + orig: customer.clone(), + update_data: customer_update.clone().into(), + }), + operation: Op::Update(key.clone(), &field, customer.updated_by.as_deref()), + }, + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + let result: domain::Customer = self + .find_resource_by_id( + state, + key_store, + storage_scheme, + customers::Customer::find_by_merchant_reference_id_merchant_id( + &conn, + merchant_reference_id, + merchant_id, + ), + kv_router_store::FindResourceBy::Id( + format!("cust_{}", merchant_reference_id.get_string_repr()), + PartitionKey::MerchantIdMerchantReferenceId { + merchant_id, + merchant_reference_id: merchant_reference_id.get_string_repr(), + }, + ), + ) + .await?; + + match result.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(result), + } + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn find_customer_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + let result: domain::Customer = self + .find_resource_by_id( + state, + key_store, + storage_scheme, + customers::Customer::find_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ), + kv_router_store::FindResourceBy::Id( + format!("cust_{}", customer_id.get_string_repr()), + PartitionKey::MerchantIdCustomerId { + merchant_id, + customer_id, + }, + ), + ) + .await?; + + match result.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(result), + } + } + + #[instrument(skip_all)] + async fn list_customers_by_merchant_id( + &self, + state: &KeyManagerState, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + constraints: domain::CustomerListConstraints, + ) -> CustomResult, StorageError> { + self.router_store + .list_customers_by_merchant_id(state, merchant_id, key_store, constraints) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn insert_customer( + &self, + customer_data: domain::Customer, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + let id = customer_data.id.clone(); + let key = PartitionKey::GlobalId { + id: id.get_string_repr(), + }; + let identifier = format!("cust_{}", id.get_string_repr()); + let mut new_customer = customer_data + .construct_new() + .await + .change_context(StorageError::EncryptionError)?; + let storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>( + self, + storage_scheme, + Op::Insert, + )) + .await; + new_customer.update_storage_scheme(storage_scheme); + self.insert_resource( + state, + key_store, + storage_scheme, + new_customer.clone().insert(&conn), + new_customer.clone().into(), + kv_router_store::InsertResourceParams { + insertable: kv::Insertable::Customer(new_customer.clone()), + reverse_lookups: vec![], + identifier, + key, + resource_type: "customer", + }, + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn insert_customer( + &self, + customer_data: domain::Customer, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &customer_data.merchant_id.clone(), + customer_id: &customer_data.customer_id.clone(), + }; + let identifier = format!("cust_{}", customer_data.customer_id.get_string_repr()); + let mut new_customer = customer_data + .construct_new() + .await + .change_context(StorageError::EncryptionError)?; + let storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>( + self, + storage_scheme, + Op::Insert, + )) + .await; + new_customer.update_storage_scheme(storage_scheme); + let customer = new_customer.clone().into(); + self.insert_resource( + state, + key_store, + storage_scheme, + new_customer.clone().insert(&conn), + customer, + kv_router_store::InsertResourceParams { + insertable: kv::Insertable::Customer(new_customer.clone()), + reverse_lookups: vec![], + identifier, + key, + resource_type: "customer", + }, + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn delete_customer_by_customer_id_merchant_id( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + ) -> CustomResult { + self.router_store + .delete_customer_by_customer_id_merchant_id(customer_id, merchant_id) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &id_type::GlobalCustomerId, + _merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + let result: domain::Customer = self + .find_resource_by_id( + state, + key_store, + storage_scheme, + customers::Customer::find_by_global_id(&conn, id), + kv_router_store::FindResourceBy::Id( + format!("cust_{}", id.get_string_repr()), + PartitionKey::GlobalId { + id: id.get_string_repr(), + }, + ), + ) + .await?; + + if result.status == common_enums::DeleteStatus::Redacted { + Err(StorageError::CustomerRedacted)? + } else { + Ok(result) + } + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn update_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &id_type::GlobalCustomerId, + customer: domain::Customer, + _merchant_id: &id_type::MerchantId, + customer_update: domain::CustomerUpdate, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + let customer = Conversion::convert(customer) + .await + .change_context(StorageError::EncryptionError)?; + let database_call = + customers::Customer::update_by_id(&conn, id.clone(), customer_update.clone().into()); + let key = PartitionKey::GlobalId { + id: id.get_string_repr(), + }; + let field = format!("cust_{}", id.get_string_repr()); + self.update_resource( + state, + key_store, + storage_scheme, + database_call, + diesel_models::CustomerUpdateInternal::from(customer_update.clone()) + .apply_changeset(customer.clone()), + kv_router_store::UpdateResourceParams { + updateable: kv::Updateable::CustomerUpdate(kv::CustomerUpdateMems { + orig: customer.clone(), + update_data: customer_update.into(), + }), + operation: Op::Update(key.clone(), &field, customer.updated_by.as_deref()), + }, + ) + .await + } +} + +#[async_trait::async_trait] +impl domain::CustomerInterface for RouterStore { + type Error = StorageError; + #[instrument(skip_all)] + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + let maybe_customer: Option = self + .find_optional_resource( + state, + key_store, + customers::Customer::find_optional_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ), + ) + .await?; + maybe_customer.map_or(Ok(None), |customer| { + // in the future, once #![feature(is_some_and)] is stable, we can make this more concise: + // `if customer.name.is_some_and(|ref name| name == pii::REDACTED) ...` + match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => { + Err(StorageError::CustomerRedacted)? + } + _ => Ok(Some(customer)), + } + }) + } + + #[instrument(skip_all)] + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + self.find_optional_resource( + state, + key_store, + customers::Customer::find_optional_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ), + ) + .await + } + + #[instrument(skip_all)] + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + let maybe_customer: Option = self + .find_optional_resource( + state, + key_store, + customers::Customer::find_optional_by_merchant_id_merchant_reference_id( + &conn, + customer_id, + merchant_id, + ), + ) + .await?; + maybe_customer.map_or(Ok(None), |customer| { + // in the future, once #![feature(is_some_and)] is stable, we can make this more concise: + // `if customer.name.is_some_and(|ref name| name == pii::REDACTED) ...` + match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => { + Err(StorageError::CustomerRedacted)? + } + _ => Ok(Some(customer)), + } + }) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn update_customer_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: id_type::CustomerId, + merchant_id: id_type::MerchantId, + _customer: domain::Customer, + customer_update: domain::CustomerUpdate, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + self.call_database( + state, + key_store, + customers::Customer::update_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id.clone(), + customer_update.into(), + ), + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn find_customer_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + let customer: domain::Customer = self + .call_database( + state, + key_store, + customers::Customer::find_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ), + ) + .await?; + match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(customer), + } + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + let customer: domain::Customer = self + .call_database( + state, + key_store, + customers::Customer::find_by_merchant_reference_id_merchant_id( + &conn, + merchant_reference_id, + merchant_id, + ), + ) + .await?; + match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(customer), + } + } + + #[instrument(skip_all)] + async fn list_customers_by_merchant_id( + &self, + state: &KeyManagerState, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + constraints: domain::CustomerListConstraints, + ) -> CustomResult, StorageError> { + let conn = pg_connection_read(self).await?; + let customer_list_constraints = + diesel_models::query::customers::CustomerListConstraints::from(constraints); + self.find_resources( + state, + key_store, + customers::Customer::list_by_merchant_id(&conn, merchant_id, customer_list_constraints), + ) + .await + } + + #[instrument(skip_all)] + async fn insert_customer( + &self, + customer_data: domain::Customer, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + let customer_new = customer_data + .construct_new() + .await + .change_context(StorageError::EncryptionError)?; + self.call_database(state, key_store, customer_new.insert(&conn)) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn delete_customer_by_customer_id_merchant_id( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + customers::Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id) + .await + .map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + }) + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &id_type::GlobalCustomerId, + customer: domain::Customer, + merchant_id: &id_type::MerchantId, + customer_update: domain::CustomerUpdate, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + self.call_database( + state, + key_store, + customers::Customer::update_by_id(&conn, id.clone(), customer_update.into()), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &id_type::GlobalCustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + let customer: domain::Customer = self + .call_database( + state, + key_store, + customers::Customer::find_by_global_id(&conn, id), + ) + .await?; + match customer.name { + Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?, + _ => Ok(customer), + } + } +} + +#[async_trait::async_trait] +impl domain::CustomerInterface for MockDb { + type Error = StorageError; + #[allow(clippy::panic)] + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let customers = self.customers.lock().await; + self.find_resource(state, key_store, customers, |customer| { + customer.customer_id == *customer_id && &customer.merchant_id == merchant_id + }) + .await + } + + #[allow(clippy::panic)] + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + let customers = self.customers.lock().await; + self.find_resource(state, key_store, customers, |customer| { + customer.customer_id == *customer_id && &customer.merchant_id == merchant_id + }) + .await + } + + #[allow(clippy::panic)] + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + todo!() + } + + async fn list_customers_by_merchant_id( + &self, + state: &KeyManagerState, + merchant_id: &id_type::MerchantId, + key_store: &MerchantKeyStore, + constraints: domain::CustomerListConstraints, + ) -> CustomResult, StorageError> { + let customers = self.customers.lock().await; + + let customers = try_join_all( + customers + .iter() + .filter(|customer| customer.merchant_id == *merchant_id) + .take(usize::from(constraints.limit)) + .skip(usize::try_from(constraints.offset.unwrap_or(0)).unwrap_or(0)) + .map(|customer| async { + customer + .to_owned() + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError) + }), + ) + .await?; + + Ok(customers) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn update_customer_by_customer_id_merchant_id( + &self, + _state: &KeyManagerState, + _customer_id: id_type::CustomerId, + _merchant_id: id_type::MerchantId, + _customer: domain::Customer, + _customer_update: domain::CustomerUpdate, + _key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn find_customer_by_customer_id_merchant_id( + &self, + _state: &KeyManagerState, + _customer_id: &id_type::CustomerId, + _merchant_id: &id_type::MerchantId, + _key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + _state: &KeyManagerState, + _merchant_reference_id: &id_type::CustomerId, + _merchant_id: &id_type::MerchantId, + _key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + + #[allow(clippy::panic)] + async fn insert_customer( + &self, + customer_data: domain::Customer, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let mut customers = self.customers.lock().await; + + let customer = Conversion::convert(customer_data) + .await + .change_context(StorageError::EncryptionError)?; + + customers.push(customer.clone()); + + customer + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + async fn delete_customer_by_customer_id_merchant_id( + &self, + _customer_id: &id_type::CustomerId, + _merchant_id: &id_type::MerchantId, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_global_id( + &self, + _state: &KeyManagerState, + _id: &id_type::GlobalCustomerId, + _customer: domain::Customer, + _merchant_id: &id_type::MerchantId, + _customer_update: domain::CustomerUpdate, + _key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_global_id( + &self, + _state: &KeyManagerState, + _id: &id_type::GlobalCustomerId, + _merchant_id: &id_type::MerchantId, + _key_store: &MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } +} diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index f3c03fe2a8e..26e3792f547 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -179,6 +179,37 @@ impl RouterStore { .change_context(StorageError::DecryptionError) } + pub async fn find_optional_resource( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + execute_query_fut: R, + ) -> error_stack::Result, StorageError> + where + D: Debug + Sync + Conversion, + R: futures::Future< + Output = error_stack::Result, diesel_models::errors::DatabaseError>, + > + Send, + M: ReverseConversion, + { + match execute_query_fut.await.map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + })? { + Some(resource) => Ok(Some( + resource + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError)?, + )), + None => Ok(None), + } + } + pub async fn find_resources( &self, state: &KeyManagerState, diff --git a/crates/storage_impl/src/mock_db.rs b/crates/storage_impl/src/mock_db.rs index f086c762951..1471f854de6 100644 --- a/crates/storage_impl/src/mock_db.rs +++ b/crates/storage_impl/src/mock_db.rs @@ -112,33 +112,56 @@ impl MockDb { }) } + /// Returns an option of the resource if it exists pub async fn find_resource( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, resources: MutexGuard<'_, Vec>, filter_fn: impl Fn(&&D) -> bool, - error_message: String, - ) -> CustomResult + ) -> CustomResult, StorageError> where D: Sync + ReverseConversion + Clone, R: Conversion, { let resource = resources.iter().find(filter_fn).cloned(); match resource { - Some(res) => Ok(res - .convert( + Some(res) => Ok(Some( + res.convert( state, key_store.key.get_inner(), key_store.merchant_id.clone().into(), ) .await - .change_context(StorageError::DecryptionError)?), + .change_context(StorageError::DecryptionError)?, + )), + None => Ok(None), + } + } + + /// Throws errors when the requested resource is not found + pub async fn get_resource( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + resources: MutexGuard<'_, Vec>, + filter_fn: impl Fn(&&D) -> bool, + error_message: String, + ) -> CustomResult + where + D: Sync + ReverseConversion + Clone, + R: Conversion, + { + match self + .find_resource(state, key_store, resources, filter_fn) + .await? + { + Some(res) => Ok(res), None => Err(StorageError::ValueNotFound(error_message).into()), } } - pub async fn find_resources( + pub async fn get_resources( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, diff --git a/crates/storage_impl/src/payment_method.rs b/crates/storage_impl/src/payment_method.rs index da0fa80a077..89e7d32b855 100644 --- a/crates/storage_impl/src/payment_method.rs +++ b/crates/storage_impl/src/payment_method.rs @@ -769,7 +769,7 @@ impl PaymentMethodInterface for MockDb { _storage_scheme: MerchantStorageScheme, ) -> CustomResult { let payment_methods = self.payment_methods.lock().await; - self.find_resource::( + self.get_resource::( state, key_store, payment_methods, @@ -788,7 +788,7 @@ impl PaymentMethodInterface for MockDb { _storage_scheme: MerchantStorageScheme, ) -> CustomResult { let payment_methods = self.payment_methods.lock().await; - self.find_resource::( + self.get_resource::( state, key_store, payment_methods, @@ -810,7 +810,7 @@ impl PaymentMethodInterface for MockDb { _storage_scheme: MerchantStorageScheme, ) -> CustomResult { let payment_methods = self.payment_methods.lock().await; - self.find_resource::( + self.get_resource::( state, key_store, payment_methods, @@ -885,7 +885,7 @@ impl PaymentMethodInterface for MockDb { _limit: Option, ) -> CustomResult, errors::StorageError> { let payment_methods = self.payment_methods.lock().await; - self.find_resources( + self.get_resources( state, key_store, payment_methods, @@ -922,7 +922,7 @@ impl PaymentMethodInterface for MockDb { _storage_scheme: MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { let payment_methods = self.payment_methods.lock().await; - self.find_resources( + self.get_resources( state, key_store, payment_methods, @@ -952,7 +952,7 @@ impl PaymentMethodInterface for MockDb { pm.customer_id == *customer_id && pm.merchant_id == *merchant_id && pm.status == status }; let error_message = "cannot find payment method".to_string(); - self.find_resources(state, key_store, payment_methods, find_pm_by, error_message) + self.get_resources(state, key_store, payment_methods, find_pm_by, error_message) .await } @@ -1050,7 +1050,7 @@ impl PaymentMethodInterface for MockDb { fingerprint_id: &str, ) -> CustomResult { let payment_methods = self.payment_methods.lock().await; - self.find_resource::( + self.get_resource::( state, key_store, payment_methods,