diff --git a/CHANGELOG.md b/CHANGELOG.md index 053d3258a87..293b1aa9e04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. +#### Breaking +- [2383](https://github.com/FuelLabs/fuel-core/pull/2383): Asset balance queries now return U128 instead of U64. + ## [Version 0.40.0] ### Added diff --git a/bin/e2e-test-client/src/test_context.rs b/bin/e2e-test-client/src/test_context.rs index 1cb6bcb8b07..d5e98c121d4 100644 --- a/bin/e2e-test-client/src/test_context.rs +++ b/bin/e2e-test-client/src/test_context.rs @@ -99,7 +99,7 @@ impl Wallet { } /// returns the balance associated with a wallet - pub async fn balance(&self, asset_id: Option) -> anyhow::Result { + pub async fn balance(&self, asset_id: Option) -> anyhow::Result { self.client .balance(&self.address, Some(&asset_id.unwrap_or_default())) .await diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index b9048362caa..42aea952a82 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -4,7 +4,7 @@ scalar AssetId type Balance { owner: Address! - amount: U64! + amount: U128! assetId: AssetId! } @@ -1259,6 +1259,8 @@ enum TxParametersVersion { scalar TxPointer +scalar U128 + scalar U16 scalar U32 diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 65789f6c9c3..ed032d5888e 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1054,7 +1054,7 @@ impl FuelClient { &self, owner: &Address, asset_id: Option<&AssetId>, - ) -> io::Result { + ) -> io::Result { let owner: schema::Address = (*owner).into(); let asset_id: schema::AssetId = match asset_id { Some(asset_id) => (*asset_id).into(), diff --git a/crates/client/src/client/schema/balance.rs b/crates/client/src/client/schema/balance.rs index 89c5dbca32d..c20da989f62 100644 --- a/crates/client/src/client/schema/balance.rs +++ b/crates/client/src/client/schema/balance.rs @@ -4,7 +4,7 @@ use crate::client::{ Address, AssetId, PageInfo, - U64, + U128, }, PageDirection, PaginationRequest, @@ -99,7 +99,7 @@ pub struct BalanceEdge { #[cynic(schema_path = "./assets/schema.sdl")] pub struct Balance { pub owner: Address, - pub amount: U64, + pub amount: U128, pub asset_id: AssetId, } diff --git a/crates/client/src/client/schema/primitives.rs b/crates/client/src/client/schema/primitives.rs index 1559c835844..4c2852f852a 100644 --- a/crates/client/src/client/schema/primitives.rs +++ b/crates/client/src/client/schema/primitives.rs @@ -272,6 +272,7 @@ macro_rules! number_scalar { }; } +number_scalar!(U128, u128); number_scalar!(U64, u64); number_scalar!(U32, u32); number_scalar!(U16, u16); diff --git a/crates/client/src/client/types/balance.rs b/crates/client/src/client/types/balance.rs index 334fc5dec46..3220d9c036c 100644 --- a/crates/client/src/client/types/balance.rs +++ b/crates/client/src/client/types/balance.rs @@ -10,7 +10,7 @@ use crate::client::{ #[derive(Clone, Copy, Debug, PartialEq)] pub struct Balance { pub owner: Address, - pub amount: u64, + pub amount: u128, pub asset_id: AssetId, } diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index 3d7abda01bc..cc93928b5ed 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -986,6 +986,7 @@ mod tests { let on_chain = self.database.on_chain().clone(); let off_chain = self.database.off_chain().clone(); ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain) + .expect("should create service database") } } diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 5061692ed92..6d4d9752f7c 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -508,7 +508,6 @@ where Ok(()) } - #[cfg(feature = "rocksdb")] pub fn convert_to_rocksdb_direction(direction: IterDirection) -> rocksdb::Direction { match direction { diff --git a/crates/fuel-core/src/database/database_description.rs b/crates/fuel-core/src/database/database_description.rs index 14d240c54f5..9fb1fc73d5b 100644 --- a/crates/fuel-core/src/database/database_description.rs +++ b/crates/fuel-core/src/database/database_description.rs @@ -4,6 +4,8 @@ use fuel_core_types::{ blockchain::primitives::DaBlockHeight, fuel_types::BlockHeight, }; +use std::collections::HashSet; +use strum::IntoEnumIterator; pub mod gas_price; pub mod off_chain; @@ -67,10 +69,39 @@ pub trait DatabaseDescription: 'static + Copy + Debug + Send + Sync { fn prefix(column: &Self::Column) -> Option; } +#[derive( + Copy, + Clone, + Debug, + serde::Serialize, + serde::Deserialize, + Eq, + PartialEq, + Hash, + strum::EnumIter, +)] +pub enum IndexationKind { + Balances, +} + +impl IndexationKind { + pub fn all() -> impl Iterator { + Self::iter() + } +} + /// The metadata of the database contains information about the version and its height. -#[derive(Copy, Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum DatabaseMetadata { - V1 { version: u32, height: Height }, + V1 { + version: u32, + height: Height, + }, + V2 { + version: u32, + height: Height, + indexation_availability: HashSet, + }, } impl DatabaseMetadata { @@ -78,6 +109,7 @@ impl DatabaseMetadata { pub fn version(&self) -> u32 { match self { Self::V1 { version, .. } => *version, + Self::V2 { version, .. } => *version, } } @@ -85,6 +117,18 @@ impl DatabaseMetadata { pub fn height(&self) -> &Height { match self { Self::V1 { height, .. } => height, + Self::V2 { height, .. } => height, + } + } + + /// Returns true if the given indexation kind is available. + pub fn indexation_available(&self, kind: IndexationKind) -> bool { + match self { + Self::V1 { .. } => false, + Self::V2 { + indexation_availability, + .. + } => indexation_availability.contains(&kind), } } } diff --git a/crates/fuel-core/src/database/metadata.rs b/crates/fuel-core/src/database/metadata.rs index 72cf2bbedb7..900a484a16c 100644 --- a/crates/fuel-core/src/database/metadata.rs +++ b/crates/fuel-core/src/database/metadata.rs @@ -17,6 +17,8 @@ use fuel_core_storage::{ StorageInspect, }; +use super::database_description::IndexationKind; + /// The table that stores all metadata about the database. pub struct MetadataTable(core::marker::PhantomData); @@ -74,4 +76,12 @@ where Ok(metadata) } + + pub fn indexation_available(&self, kind: IndexationKind) -> StorageResult { + let Some(metadata) = self.storage::>().get(&())? + else { + return Ok(false) + }; + Ok(metadata.indexation_available(kind)) + } } diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index 412d13c18cf..05965d5d71c 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -638,6 +638,7 @@ mod tests { .next() .unwrap() .unwrap(); + assert_eq!(asset_id, AssetId::zeroed()); assert_eq!(amount, expected_fee_amount_1 + expected_fee_amount_2); } diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 772bbc815ea..0dc3fe6b8db 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -79,7 +79,7 @@ impl Default for Costs { } pub const DEFAULT_QUERY_COSTS: Costs = Costs { - balance_query: 40001, + balance_query: 40001, /* Cost will depend on whether balances index is available or not, but let's keep the default high to be on the safe side */ coins_to_spend: 40001, get_peers: 40001, estimate_predicates: 40001, diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index 28a714f8b0e..b7e7ba35997 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -90,6 +90,7 @@ use tower_http::{ pub type Service = fuel_core_services::ServiceRunner; pub use super::database::ReadDatabase; +use super::ports::worker; pub type BlockProducer = Box; // In the future GraphQL should not be aware of `TxPool`. It should @@ -229,7 +230,7 @@ pub fn new_service( ) -> anyhow::Result where OnChain: AtomicView + 'static, - OffChain: AtomicView + 'static, + OffChain: AtomicView + worker::OffChainDatabase + 'static, OnChain::LatestView: OnChainDatabase, OffChain::LatestView: OffChainDatabase, { @@ -241,7 +242,7 @@ where genesis_block_height, on_database, off_database, - ); + )?; let request_timeout = config.config.api_request_timeout; let concurrency_limit = config.config.max_concurrent_queries; let body_limit = config.config.request_body_bytes_limit; diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index bf47c8d92a7..6feaaabc5dc 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -68,6 +68,8 @@ use std::{ sync::Arc, }; +use super::ports::worker; + mod arc_wrapper; /// The on-chain view of the database used by the [`ReadView`] to fetch on-chain data. @@ -86,6 +88,8 @@ pub struct ReadDatabase { on_chain: Box>, /// The off-chain database view provider. off_chain: Box>, + /// The flag that indicates whether the Balances cache table is enabled. + balances_enabled: bool, } impl ReadDatabase { @@ -95,19 +99,22 @@ impl ReadDatabase { genesis_height: BlockHeight, on_chain: OnChain, off_chain: OffChain, - ) -> Self + ) -> Result where OnChain: AtomicView + 'static, - OffChain: AtomicView + 'static, + OffChain: AtomicView + worker::OffChainDatabase + 'static, OnChain::LatestView: OnChainDatabase, OffChain::LatestView: OffChainDatabase, { - Self { + let balances_enabled = off_chain.balances_enabled()?; + + Ok(Self { batch_size, genesis_height, on_chain: Box::new(ArcWrapper::new(on_chain)), off_chain: Box::new(ArcWrapper::new(off_chain)), - } + balances_enabled, + }) } /// Creates a consistent view of the database. @@ -120,6 +127,7 @@ impl ReadDatabase { genesis_height: self.genesis_height, on_chain: self.on_chain.latest_view()?, off_chain: self.off_chain.latest_view()?, + balances_enabled: self.balances_enabled, }) } @@ -135,6 +143,7 @@ pub struct ReadView { pub(crate) genesis_height: BlockHeight, pub(crate) on_chain: OnChainView, pub(crate) off_chain: OffChainView, + pub(crate) balances_enabled: bool, } impl ReadView { diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index 077a48d1637..50d14a1f5d5 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -62,7 +62,12 @@ use fuel_core_types::{ }, tai64::Tai64, }; -use std::sync::Arc; +use std::{ + collections::BTreeMap, + sync::Arc, +}; + +use super::storage::balances::TotalBalanceAmount; pub trait OffChainDatabase: Send + Sync { fn block_height(&self, block_id: &BlockId) -> StorageResult; @@ -71,6 +76,19 @@ pub trait OffChainDatabase: Send + Sync { fn tx_status(&self, tx_id: &TxId) -> StorageResult; + fn balance( + &self, + owner: &Address, + asset_id: &AssetId, + base_asset_id: &AssetId, + ) -> StorageResult; + + fn balances( + &self, + owner: &Address, + base_asset_id: &AssetId, + ) -> StorageResult>; + fn owned_coins_ids( &self, owner: &Address, @@ -273,6 +291,10 @@ pub mod worker { }, }, graphql_api::storage::{ + balances::{ + CoinBalances, + MessageBalances, + }, da_compression::*, old::{ OldFuelBlockConsensus, @@ -315,6 +337,9 @@ pub mod worker { /// Creates a write database transaction. fn transaction(&mut self) -> Self::Transaction<'_>; + + /// Checks if Balances cache functionality is available. + fn balances_enabled(&self) -> StorageResult; } pub trait OffChainDatabaseTransaction: @@ -327,6 +352,8 @@ pub mod worker { + StorageMutate + StorageMutate + StorageMutate + + StorageMutate + + StorageMutate + StorageMutate + StorageMutate + StorageMutate diff --git a/crates/fuel-core/src/graphql_api/storage.rs b/crates/fuel-core/src/graphql_api/storage.rs index 8f8cfcd1f19..de1db10a550 100644 --- a/crates/fuel-core/src/graphql_api/storage.rs +++ b/crates/fuel-core/src/graphql_api/storage.rs @@ -36,6 +36,7 @@ use fuel_core_types::{ }; use statistic::StatisticTable; +pub mod balances; pub mod blocks; pub mod coins; pub mod contracts; @@ -113,6 +114,10 @@ pub enum Column { DaCompressionTemporalRegistryScriptCode = 21, /// See [`DaCompressionTemporalRegistryPredicateCode`](da_compression::DaCompressionTemporalRegistryPredicateCode) DaCompressionTemporalRegistryPredicateCode = 22, + /// Coin balances per user and asset. + CoinBalances = 23, + /// Message balances per user. + MessageBalances = 24, } impl Column { diff --git a/crates/fuel-core/src/graphql_api/storage/balances.rs b/crates/fuel-core/src/graphql_api/storage/balances.rs new file mode 100644 index 00000000000..10aec2862e7 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/balances.rs @@ -0,0 +1,77 @@ +use fuel_core_storage::{ + blueprint::plain::Plain, + codec::{ + postcard::Postcard, + raw::Raw, + }, + structured_storage::TableWithBlueprint, + Mappable, +}; +use fuel_core_types::{ + fuel_tx::{ + Address, + AssetId, + }, + fuel_vm::double_key, +}; +use rand::{ + distributions::Standard, + prelude::Distribution, + Rng, +}; + +pub type ItemAmount = u64; +pub type TotalBalanceAmount = u128; + +double_key!(BalancesKey, Address, address, AssetId, asset_id); +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> BalancesKey { + let mut bytes = [0u8; BalancesKey::LEN]; + rng.fill_bytes(bytes.as_mut()); + BalancesKey::from_array(bytes) + } +} + +impl core::fmt::Display for BalancesKey { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "address={} asset_id={}", self.address(), self.asset_id()) + } +} + +/// This table stores the balances of coins per owner and asset id. +pub struct CoinBalances; + +impl Mappable for CoinBalances { + type Key = BalancesKey; + type OwnedKey = Self::Key; + type Value = TotalBalanceAmount; + type OwnedValue = Self::Value; +} + +impl TableWithBlueprint for CoinBalances { + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::CoinBalances + } +} + +/// This table stores the balances of messages per owner. +pub struct MessageBalances; + +impl Mappable for MessageBalances { + type Key = Address; + type OwnedKey = Self::Key; + type Value = TotalBalanceAmount; + type OwnedValue = Self::Value; +} + +impl TableWithBlueprint for MessageBalances { + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::MessageBalances + } +} diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 959733d4919..6c9141b7ffe 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -1,9 +1,16 @@ use super::{ da_compression::da_compress_block, - storage::old::{ - OldFuelBlockConsensus, - OldFuelBlocks, - OldTransactions, + storage::{ + balances::{ + BalancesKey, + ItemAmount, + TotalBalanceAmount, + }, + old::{ + OldFuelBlockConsensus, + OldFuelBlocks, + OldTransactions, + }, }, }; use crate::{ @@ -13,6 +20,10 @@ use crate::{ worker::OffChainDatabaseTransaction, }, storage::{ + balances::{ + CoinBalances, + MessageBalances, + }, blocks::FuelBlockIdsToHeights, coins::{ owner_coin_id_key, @@ -39,8 +50,11 @@ use fuel_core_services::{ }; use fuel_core_storage::{ Error as StorageError, + Mappable, Result as StorageResult, StorageAsMut, + StorageInspect, + StorageMutate, }; use fuel_core_types::{ blockchain::{ @@ -50,7 +64,11 @@ use fuel_core_types::{ }, consensus::Consensus, }, - entities::relayer::transaction::RelayedTransactionStatus, + entities::{ + coins::coin::Coin, + relayer::transaction::RelayedTransactionStatus, + Message, + }, fuel_tx::{ field::{ Inputs, @@ -94,6 +112,11 @@ use std::{ borrow::Cow, ops::Deref, }; +use tracing::{ + debug, + error, + info, +}; #[cfg(test)] mod tests; @@ -125,6 +148,7 @@ pub struct Task { chain_id: ChainId, da_compression_config: DaCompressionConfig, continue_on_error: bool, + balances_enabled: bool, } impl Task @@ -157,6 +181,7 @@ where process_executor_events( result.events.iter().map(Cow::Borrowed), &mut transaction, + self.balances_enabled, )?; match self.da_compression_config { @@ -181,16 +206,139 @@ where } } +trait DatabaseItemWithAmount { + type Storage: Mappable; + + fn key(&self) -> ::Key; + fn amount(&self) -> ItemAmount; +} + +impl DatabaseItemWithAmount for &Coin { + type Storage = CoinBalances; + + fn key(&self) -> ::Key { + BalancesKey::new(&self.owner, &self.asset_id) + } + + fn amount(&self) -> ItemAmount { + self.amount + } +} + +impl DatabaseItemWithAmount for &Message { + type Storage = MessageBalances; + + fn key(&self) -> ::Key { + *self.recipient() + } + + fn amount(&self) -> ItemAmount { + (**self).amount() + } +} + +trait BalanceIndexationUpdater: DatabaseItemWithAmount { + type TotalBalance: From<::OwnedValue> + core::fmt::Display; + + fn update_balances( + &self, + tx: &mut T, + updater: UpdaterFn, + ) -> StorageResult<()> + where + ::Key: Sized + core::fmt::Display, + ::OwnedValue: Default + core::fmt::Display, + UpdaterFn: Fn(Self::TotalBalance, ItemAmount) -> Option, + T: OffChainDatabaseTransaction + StorageMutate, + ::Value: + From<::TotalBalance>, + fuel_core_storage::Error: From<>::Error>, + { + let key = self.key(); + let amount = self.amount(); + let storage = tx.storage::(); + let current_balance = storage.get(&key)?.unwrap_or_default(); + let prev_balance = current_balance.clone(); + match updater(current_balance.as_ref().clone().into(), amount) { + Some(new_balance) => { + debug!( + %key, + %amount, + %prev_balance, + %new_balance, + "changing balance"); + + let storage = tx.storage::(); + Ok(storage.insert(&key, &new_balance.into())?) + } + None => { + error!( + %key, + %amount, + %prev_balance, + "unable to change balance due to overflow"); + Err(anyhow::anyhow!("unable to change balance due to overflow").into()) + } + } + } +} + +impl BalanceIndexationUpdater for &Coin { + type TotalBalance = TotalBalanceAmount; +} +impl BalanceIndexationUpdater for &Message { + type TotalBalance = TotalBalanceAmount; +} + +fn process_balances_update( + event: &Event, + block_st_transaction: &mut T, + balances_enabled: bool, +) -> StorageResult<()> +where + T: OffChainDatabaseTransaction, +{ + if !balances_enabled { + return Ok(()); + } + match event { + Event::MessageImported(message) => message + .update_balances(block_st_transaction, |balance, amount| { + balance.checked_add(amount as TotalBalanceAmount) + }), + Event::MessageConsumed(message) => message + .update_balances(block_st_transaction, |balance, amount| { + balance.checked_sub(amount as TotalBalanceAmount) + }), + Event::CoinCreated(coin) => coin + .update_balances(block_st_transaction, |balance, amount| { + balance.checked_add(amount as TotalBalanceAmount) + }), + Event::CoinConsumed(coin) => coin + .update_balances(block_st_transaction, |balance, amount| { + balance.checked_sub(amount as TotalBalanceAmount) + }), + Event::ForcedTransactionFailed { .. } => Ok(()), + } +} + /// Process the executor events and update the indexes for the messages and coins. pub fn process_executor_events<'a, Iter, T>( events: Iter, block_st_transaction: &mut T, + balances_enabled: bool, ) -> anyhow::Result<()> where Iter: Iterator>, T: OffChainDatabaseTransaction, { for event in events { + if let Err(err) = + process_balances_update(event.deref(), block_st_transaction, balances_enabled) + { + // TODO[RC]: Balances overflow to be correctly handled. See: https://github.com/FuelLabs/fuel-core/issues/2428 + tracing::error!(%err, "Processing balances") + } match event.deref() { Event::MessageImported(message) => { block_st_transaction @@ -473,6 +621,9 @@ where graphql_metrics().total_txs_count.set(total_tx_count as i64); } + let balances_enabled = self.off_chain_database.balances_enabled()?; + info!("Balances cache available: {}", balances_enabled); + let InitializeTask { chain_id, da_compression_config, @@ -491,6 +642,7 @@ where chain_id, da_compression_config, continue_on_error, + balances_enabled, }; let mut target_chain_height = on_chain_database.latest_height()?; diff --git a/crates/fuel-core/src/graphql_api/worker_service/tests.rs b/crates/fuel-core/src/graphql_api/worker_service/tests.rs index 8b9ad758975..123501baabb 100644 --- a/crates/fuel-core/src/graphql_api/worker_service/tests.rs +++ b/crates/fuel-core/src/graphql_api/worker_service/tests.rs @@ -83,5 +83,6 @@ fn worker_task_with_block_importer_and_db( chain_id, da_compression_config: DaCompressionConfig::Disabled, continue_on_error: false, + balances_enabled: true, } } diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index 161fd64b87e..4c3a5cc9e84 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -1,4 +1,13 @@ -use crate::fuel_core_graphql_api::database::ReadView; +use std::{ + cmp::Ordering, + collections::HashMap, + future, +}; + +use crate::{ + fuel_core_graphql_api::database::ReadView, + graphql_api::storage::balances::TotalBalanceAmount, +}; use asset_query::{ AssetQuery, AssetSpendTarget, @@ -17,14 +26,15 @@ use fuel_core_types::{ services::graphql_api::AddressBalance, }; use futures::{ + stream, FutureExt, Stream, StreamExt, TryStreamExt, }; -use std::{ - cmp::Ordering, - collections::HashMap, +use tracing::{ + debug, + error, }; pub mod asset_query; @@ -36,22 +46,32 @@ impl ReadView { asset_id: AssetId, base_asset_id: AssetId, ) -> StorageResult { - let amount = AssetQuery::new( - &owner, - &AssetSpendTarget::new(asset_id, u64::MAX, u16::MAX), - &base_asset_id, - None, - self, - ) - .coins() - .map(|res| res.map(|coins| coins.amount())) - .try_fold(0u64, |balance, amount| { - async move { - // Increase the balance - Ok(balance.saturating_add(amount)) - } - }) - .await?; + let amount = if self.balances_enabled { + debug!(%owner, %asset_id, "Querying balance with balances cache"); + self.off_chain.balance(&owner, &asset_id, &base_asset_id)? + } else { + debug!(%owner, %asset_id, "Querying balance without balances cache"); + AssetQuery::new( + &owner, + &AssetSpendTarget::new(asset_id, u64::MAX, u16::MAX), + &base_asset_id, + None, + self, + ) + .coins() + .map(|res| res.map(|coins| coins.amount())) + .try_fold(0u128, |balance, amount| async move { + Ok(balance.checked_add(amount as u128).unwrap_or_else(|| { + // TODO[RC]: Balances overflow to be correctly handled. See: https://github.com/FuelLabs/fuel-core/issues/2428 + error!( + %asset_id, + prev_balance=%balance, + "unable to change balance due to overflow"); + u128::MAX + })) + }) + .await? as TotalBalanceAmount + }; Ok(AddressBalance { owner, @@ -66,6 +86,28 @@ impl ReadView { direction: IterDirection, base_asset_id: &'a AssetId, ) -> impl Stream> + 'a { + if self.balances_enabled { + futures::future::Either::Left(self.balances_with_cache( + owner, + base_asset_id, + direction, + )) + } else { + futures::future::Either::Right(self.balances_without_cache( + owner, + base_asset_id, + direction, + )) + } + } + + fn balances_without_cache<'a>( + &'a self, + owner: &'a Address, + base_asset_id: &'a AssetId, + direction: IterDirection, + ) -> impl Stream> + 'a { + debug!(%owner, "Querying balances without balances cache"); let query = AssetsQuery::new(owner, None, None, self, base_asset_id); let stream = query.coins(); @@ -73,10 +115,20 @@ impl ReadView { .try_fold( HashMap::new(), move |mut amounts_per_asset, coin| async move { - let amount: &mut u64 = amounts_per_asset + let amount: &mut TotalBalanceAmount = amounts_per_asset .entry(*coin.asset_id(base_asset_id)) .or_default(); - *amount = amount.saturating_add(coin.amount()); + let new_amount = amount + .checked_add(coin.amount() as TotalBalanceAmount) + .unwrap_or_else(|| { + // TODO[RC]: Balances overflow to be correctly handled. See: https://github.com/FuelLabs/fuel-core/issues/2428 + error!( + asset_id=%coin.asset_id(base_asset_id), + prev_balance=%amount, + "unable to change balance due to overflow"); + u128::MAX + }); + *amount = new_amount; Ok(amounts_per_asset) }, ) @@ -109,4 +161,32 @@ impl ReadView { .try_flatten() .yield_each(self.batch_size) } + + fn balances_with_cache<'a>( + &'a self, + owner: &'a Address, + base_asset_id: &AssetId, + direction: IterDirection, + ) -> impl Stream> + 'a { + debug!(%owner, "Querying balances using balances cache"); + match self.off_chain.balances(owner, base_asset_id) { + Ok(balances) => { + let iter = if direction == IterDirection::Reverse { + itertools::Either::Left(balances.into_iter().rev()) + } else { + itertools::Either::Right(balances.into_iter()) + }; + stream::iter(iter.map(|(asset_id, amount)| AddressBalance { + owner: *owner, + amount, + asset_id, + })) + .map(Ok) + .into_stream() + .yield_each(self.batch_size) + .left_stream() + } + Err(err) => stream::once(future::ready(Err(err))).right_stream(), + } + } } diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs index 140bb81256f..b6b95228e83 100644 --- a/crates/fuel-core/src/schema/balance.rs +++ b/crates/fuel-core/src/schema/balance.rs @@ -7,7 +7,7 @@ use crate::{ scalars::{ Address, AssetId, - U64, + U128, }, ReadViewProvider, }, @@ -33,7 +33,7 @@ impl Balance { self.0.owner.into() } - async fn amount(&self) -> U64 { + async fn amount(&self) -> U128 { self.0.amount.into() } diff --git a/crates/fuel-core/src/schema/scalars.rs b/crates/fuel-core/src/schema/scalars.rs index e75b3271f98..d3ddb7df20b 100644 --- a/crates/fuel-core/src/schema/scalars.rs +++ b/crates/fuel-core/src/schema/scalars.rs @@ -79,6 +79,7 @@ macro_rules! number_scalar { }; } +number_scalar!(U128, u128, "U128"); number_scalar!(U64, u64, "U64"); number_scalar!(U32, u32, "U32"); number_scalar!(U16, u16, "U16"); diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 6b42dd3960c..9474d08601f 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -4,7 +4,16 @@ use crate::{ CombinedDatabase, ShutdownListener, }, - database::Database, + database::{ + database_description::{ + off_chain::OffChain, + DatabaseDescription, + DatabaseMetadata, + IndexationKind, + }, + metadata::MetadataTable, + Database, + }, service::{ adapters::{ ExecutorAdapter, @@ -43,6 +52,7 @@ use std::{ net::SocketAddr, sync::Arc, }; +use tracing::info; pub use config::{ Config, @@ -132,6 +142,8 @@ impl FuelService { shutdown_listener, )?; + Self::write_metadata_at_genesis(&database)?; + // initialize sub services tracing::info!("Initializing sub services"); database.sync_aux_db_heights(shutdown_listener)?; @@ -209,6 +221,35 @@ impl FuelService { Ok(()) } + // When genesis is missing write to the database that balances cache should be used. + fn write_metadata_at_genesis(database: &CombinedDatabase) -> anyhow::Result<()> { + let on_chain_view = database.on_chain().latest_view()?; + if on_chain_view.get_genesis().is_err() { + let all_indexations = IndexationKind::all().collect(); + info!( + "No genesis, initializing metadata with all supported indexations: {:?}", + all_indexations + ); + let off_chain_view = database.off_chain().latest_view()?; + let mut database_tx = off_chain_view.read_transaction(); + database_tx + .storage_as_mut::>() + .insert( + &(), + &DatabaseMetadata::V2 { + version: ::version(), + height: Default::default(), + indexation_availability: all_indexations, + }, + )?; + database + .off_chain() + .data + .commit_changes(None, database_tx.into_changes())?; + } + Ok(()) + } + fn make_database_compatible_with_config( combined_database: &mut CombinedDatabase, config: &Config, diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs index d554c7ddc45..67bb2741369 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -1,6 +1,11 @@ +use std::collections::BTreeMap; + use crate::{ database::{ - database_description::off_chain::OffChain, + database_description::{ + off_chain::OffChain, + IndexationKind, + }, Database, OffChainIterableKeyValueView, }, @@ -16,10 +21,18 @@ use crate::{ transactions::OwnedTransactionIndexCursor, }, }, - graphql_api::storage::old::{ - OldFuelBlockConsensus, - OldFuelBlocks, - OldTransactions, + graphql_api::storage::{ + balances::{ + BalancesKey, + CoinBalances, + MessageBalances, + TotalBalanceAmount, + }, + old::{ + OldFuelBlockConsensus, + OldFuelBlocks, + OldTransactions, + }, }, }; use fuel_core_storage::{ @@ -51,6 +64,7 @@ use fuel_core_types::{ entities::relayer::transaction::RelayedTransactionStatus, fuel_tx::{ Address, + AssetId, Bytes32, ContractId, Salt, @@ -65,6 +79,7 @@ use fuel_core_types::{ }, services::txpool::TransactionStatus, }; +use tracing::debug; impl OffChainDatabase for OffChainIterableKeyValueView { fn block_height(&self, id: &BlockId) -> StorageResult { @@ -187,6 +202,72 @@ impl OffChainDatabase for OffChainIterableKeyValueView { fn message_is_spent(&self, nonce: &Nonce) -> StorageResult { self.message_is_spent(nonce) } + + fn balance( + &self, + owner: &Address, + asset_id: &AssetId, + base_asset_id: &AssetId, + ) -> StorageResult { + let coins = self + .storage_as_ref::() + .get(&BalancesKey::new(owner, asset_id))? + .unwrap_or_default() + .into_owned() as TotalBalanceAmount; + + if base_asset_id == asset_id { + let messages = self + .storage_as_ref::() + .get(owner)? + .unwrap_or_default() + .into_owned() as TotalBalanceAmount; + + let total = coins.checked_add(messages).ok_or(anyhow::anyhow!( + "Total balance overflow: coins: {coins}, messages: {messages}" + ))?; + + debug!(%coins, %messages, total, "total balance"); + Ok(total) + } else { + debug!(%coins, "total balance"); + Ok(coins) + } + } + + fn balances( + &self, + owner: &Address, + base_asset_id: &AssetId, + ) -> StorageResult> { + let mut balances = BTreeMap::new(); + for balance_key in self.iter_all_by_prefix_keys::(Some(owner)) { + let key = balance_key?; + let asset_id = key.asset_id(); + + let messages = if base_asset_id == asset_id { + self.storage_as_ref::() + .get(owner)? + .unwrap_or_default() + .into_owned() as TotalBalanceAmount + } else { + 0 + }; + + let coins = self + .storage_as_ref::() + .get(&key)? + .unwrap_or_default() + .into_owned() as TotalBalanceAmount; + + let total = coins.checked_add(messages).ok_or(anyhow::anyhow!( + "Total balance overflow: coins: {coins}, messages: {messages}" + ))?; + debug!(%owner, %asset_id, %total, "balance entry"); + balances.insert(*asset_id, total); + } + + Ok(balances) + } } impl worker::OffChainDatabase for Database { @@ -199,4 +280,8 @@ impl worker::OffChainDatabase for Database { fn transaction(&mut self) -> Self::Transaction<'_> { self.into_transaction() } + + fn balances_enabled(&self) -> StorageResult { + self.indexation_available(IndexationKind::Balances) + } } diff --git a/crates/fuel-core/src/service/genesis/importer/off_chain.rs b/crates/fuel-core/src/service/genesis/importer/off_chain.rs index eef13bf9ee5..3b7607c738c 100644 --- a/crates/fuel-core/src/service/genesis/importer/off_chain.rs +++ b/crates/fuel-core/src/service/genesis/importer/off_chain.rs @@ -107,10 +107,13 @@ impl ImportTable for Handler { group: Vec>, tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { + // We always want to enable balances indexation if we're starting at genesis. + const BALANCES_INDEXATION_ENABLED: bool = true; + let events = group .into_iter() .map(|TableEntry { value, .. }| Cow::Owned(Event::MessageImported(value))); - worker_service::process_executor_events(events, tx)?; + worker_service::process_executor_events(events, tx, BALANCES_INDEXATION_ENABLED)?; Ok(()) } } @@ -125,10 +128,13 @@ impl ImportTable for Handler { group: Vec>, tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { + // We always want to enable balances indexation if we're starting at genesis. + const BALANCES_INDEXATION_ENABLED: bool = true; + let events = group.into_iter().map(|TableEntry { value, key }| { Cow::Owned(Event::CoinCreated(value.uncompress(key))) }); - worker_service::process_executor_events(events, tx)?; + worker_service::process_executor_events(events, tx, BALANCES_INDEXATION_ENABLED)?; Ok(()) } } diff --git a/crates/types/src/services/graphql_api.rs b/crates/types/src/services/graphql_api.rs index b38d73e0e03..efcfdc99ecd 100644 --- a/crates/types/src/services/graphql_api.rs +++ b/crates/types/src/services/graphql_api.rs @@ -7,17 +7,17 @@ use crate::fuel_types::{ }; /// The cumulative balance(`amount`) of the `Owner` of `asset_id`. -pub struct Balance { +pub struct Balance { /// Owner of the asset. pub owner: Owner, /// The cumulative amount of the asset. - pub amount: u64, + pub amount: Amount, /// The identifier of the asset. pub asset_id: AssetId, } /// The alias for the `Balance` of the address. -pub type AddressBalance = Balance
; +pub type AddressBalance = Balance; /// The alias for the `Balance` of the contract. -pub type ContractBalance = Balance; +pub type ContractBalance = Balance; diff --git a/tests/tests/blob.rs b/tests/tests/blob.rs index 813dfea1418..74a1a7d13aa 100644 --- a/tests/tests/blob.rs +++ b/tests/tests/blob.rs @@ -183,7 +183,7 @@ async fn blob__cannot_post_already_existing_blob() { } #[tokio::test] -async fn blob__accessing_nonexitent_blob_panics_vm() { +async fn blob__accessing_nonexistent_blob_panics_vm() { // Given let ctx = TestContext::new().await; let blob_id = BlobId::new([0; 32]); // Nonexistent diff --git a/tests/tests/chain.rs b/tests/tests/chain.rs index c5c62b8f600..43470f7c0d3 100644 --- a/tests/tests/chain.rs +++ b/tests/tests/chain.rs @@ -170,11 +170,11 @@ async fn network_operates_with_non_zero_base_asset_id() { .expect("transaction should insert"); // Then - let expected_fee = 1; + let expected_fee = 1_u128; assert!(matches!(result, TransactionStatus::Success { .. })); let balance = client .balance(&owner, Some(&new_base_asset_id)) .await .expect("Should fetch the balance"); - assert_eq!(balance, amount - expected_fee); + assert_eq!(balance, amount as u128 - expected_fee); } diff --git a/tests/tests/fee_collection_contract.rs b/tests/tests/fee_collection_contract.rs index 54426b5c293..e449e6fa17b 100644 --- a/tests/tests/fee_collection_contract.rs +++ b/tests/tests/fee_collection_contract.rs @@ -227,7 +227,7 @@ async fn happy_path() { // Make sure that the full balance was been withdrawn assert_eq!( ctx.client.balance(&ctx.address, None).await.unwrap(), - contract_balance_before_collect + contract_balance_before_collect as u128 ); }