diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index a0a75dbb0d4..4bbe361980d 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -19,7 +19,6 @@ use crate::{ parse_data_column_key, }; use itertools::{Itertools, process_results}; -use lru::LruCache; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use serde::{Deserialize, Serialize}; @@ -30,10 +29,9 @@ use state_processing::{ block_replayer::PreSlotHook, }; use std::cmp::{Ordering, min}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::io::{Read, Write}; use std::marker::PhantomData; -use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -69,8 +67,6 @@ pub struct HotColdDB, Cold: ItemStore> { /// /// The hot database also contains all blocks. pub hot_db: Hot, - /// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded. - block_cache: Option>>, /// Cache of beacon states. /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. @@ -86,76 +82,6 @@ pub struct HotColdDB, Cold: ItemStore> { _phantom: PhantomData, } -#[derive(Debug)] -struct BlockCache { - block_cache: LruCache>, - blob_cache: LruCache>, - data_column_cache: LruCache>>>, - data_column_custody_info_cache: Option, -} - -impl BlockCache { - pub fn new(size: NonZeroUsize) -> Self { - Self { - block_cache: LruCache::new(size), - blob_cache: LruCache::new(size), - data_column_cache: LruCache::new(size), - data_column_custody_info_cache: None, - } - } - pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock) { - self.block_cache.put(block_root, block); - } - pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { - self.blob_cache.put(block_root, blobs); - } - pub fn put_data_column(&mut self, block_root: Hash256, data_column: Arc>) { - self.data_column_cache - .get_or_insert_mut(block_root, Default::default) - .insert(data_column.index, data_column); - } - pub fn put_data_column_custody_info( - &mut self, - data_column_custody_info: Option, - ) { - self.data_column_custody_info_cache = data_column_custody_info; - } - pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { - self.block_cache.get(block_root) - } - pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { - self.blob_cache.get(block_root) - } - // Note: data columns are all individually cached, hence there's no guarantee that - // `data_column_cache.get(block_root)` will return all custody columns. - pub fn get_data_column( - &mut self, - block_root: &Hash256, - column_index: &ColumnIndex, - ) -> Option>> { - self.data_column_cache - .get(block_root) - .and_then(|map| map.get(column_index).cloned()) - } - pub fn get_data_column_custody_info(&self) -> Option { - self.data_column_custody_info_cache.clone() - } - pub fn delete_block(&mut self, block_root: &Hash256) { - let _ = self.block_cache.pop(block_root); - } - pub fn delete_blobs(&mut self, block_root: &Hash256) { - let _ = self.blob_cache.pop(block_root); - } - pub fn delete_data_columns(&mut self, block_root: &Hash256) { - let _ = self.data_column_cache.pop(block_root); - } - pub fn delete(&mut self, block_root: &Hash256) { - self.delete_block(block_root); - self.delete_blobs(block_root); - self.delete_data_columns(block_root); - } -} - #[derive(Debug, PartialEq)] pub enum HotColdDBError { UnsupportedSchemaVersion { @@ -233,9 +159,6 @@ impl HotColdDB, MemoryStore> { cold_db: MemoryStore::open(), blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), - block_cache: NonZeroUsize::new(config.block_cache_size) - .map(BlockCache::new) - .map(Mutex::new), state_cache: Mutex::new(StateCache::new( config.state_cache_size, config.state_cache_headroom, @@ -287,9 +210,6 @@ impl HotColdDB, BeaconNodeBackend> { blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?, cold_db: BeaconNodeBackend::open(&config, cold_path)?, hot_db, - block_cache: NonZeroUsize::new(config.block_cache_size) - .map(BlockCache::new) - .map(Mutex::new), state_cache: Mutex::new(StateCache::new( config.state_cache_size, config.state_cache_headroom, @@ -496,17 +416,6 @@ impl, Cold: ItemStore> HotColdDB pub fn register_metrics(&self) { let hsc_metrics = self.historic_state_cache.lock().metrics(); - if let Some(block_cache) = &self.block_cache { - let cache = block_cache.lock(); - metrics::set_gauge( - &metrics::STORE_BEACON_BLOCK_CACHE_SIZE, - cache.block_cache.len() as i64, - ); - metrics::set_gauge( - &metrics::STORE_BEACON_BLOB_CACHE_SIZE, - cache.blob_cache.len() as i64, - ); - } let state_cache = self.state_cache.lock(); metrics::set_gauge( &metrics::STORE_BEACON_STATE_CACHE_SIZE, @@ -553,7 +462,7 @@ impl, Cold: ItemStore> HotColdDB ); } - /// Store a block and update the LRU cache. + /// Store a block on disk. pub fn put_block( &self, block_root: &Hash256, @@ -561,12 +470,8 @@ impl, Cold: ItemStore> HotColdDB ) -> Result<(), Error> { // Store on disk. let mut ops = Vec::with_capacity(2); - let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?; + self.block_as_kv_store_ops(block_root, block, &mut ops)?; self.hot_db.do_atomically(ops)?; - // Update cache. - self.block_cache - .as_ref() - .inspect(|cache| cache.lock().put_block(*block_root, block)); Ok(()) } @@ -617,14 +522,6 @@ impl, Cold: ItemStore> HotColdDB ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); - // Check the cache. - if let Some(cache) = &self.block_cache - && let Some(block) = cache.lock().get_block(block_root) - { - metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT); - return Ok(Some(DatabaseBlock::Full(block.clone()))); - } - // Load the blinded block. let Some(blinded_block) = self.get_blinded_block(block_root)? else { return Ok(None); @@ -642,12 +539,6 @@ impl, Cold: ItemStore> HotColdDB { // Re-constructing the full block should always succeed here. let full_block = self.make_full_block(block_root, blinded_block)?; - - // Add to cache. - self.block_cache - .as_ref() - .inspect(|cache| cache.lock().put_block(*block_root, full_block.clone())); - DatabaseBlock::Full(full_block) } else if !self.config.prune_payloads { // If payload pruning is disabled there's a chance we may have the payload of @@ -917,9 +808,6 @@ impl, Cold: ItemStore> HotColdDB /// Delete a block from the store and the block cache. pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { - self.block_cache - .as_ref() - .inspect(|cache| cache.lock().delete(block_root)); self.hot_db .key_delete(DBColumn::BeaconBlock, block_root.as_slice())?; self.hot_db @@ -934,9 +822,6 @@ impl, Cold: ItemStore> HotColdDB block_root.as_slice(), &blobs.as_ssz_bytes(), )?; - self.block_cache - .as_ref() - .inspect(|cache| cache.lock().put_blobs(*block_root, blobs)); Ok(()) } @@ -977,12 +862,6 @@ impl, Cold: ItemStore> HotColdDB self.blobs_db .put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?; - self.block_cache.as_ref().inspect(|cache| { - cache - .lock() - .put_data_column_custody_info(Some(data_column_custody_info)) - }); - Ok(()) } @@ -997,9 +876,6 @@ impl, Cold: ItemStore> HotColdDB &get_data_column_key(block_root, &data_column.index), &data_column.as_ssz_bytes(), )?; - self.block_cache - .as_ref() - .inspect(|cache| cache.lock().put_data_column(*block_root, data_column)); } Ok(()) } @@ -1377,111 +1253,29 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.delete_if(column, f) } + // FIXME(sproul): rename pub fn do_atomically_with_block_and_blobs_cache( &self, batch: Vec>, ) -> Result<(), Error> { - let mut blobs_to_delete = Vec::new(); - let mut data_columns_to_delete = Vec::new(); + // FIXME(sproul): handle KVStoreOps for blob-metadata let (blobs_ops, hot_db_ops): (Vec>, Vec>) = batch.into_iter().partition(|store_op| match store_op { - StoreOp::PutBlobs(_, _) | StoreOp::PutDataColumns(_, _) => true, - StoreOp::DeleteBlobs(block_root) => { - match self.get_blobs(block_root) { - Ok(BlobSidecarListFromRoot::Blobs(blob_sidecar_list)) => { - blobs_to_delete.push((*block_root, blob_sidecar_list)); - } - Ok(BlobSidecarListFromRoot::NoBlobs | BlobSidecarListFromRoot::NoRoot) => {} - Err(e) => { - error!( - %block_root, - error = ?e, - "Error getting blobs" - ); - } - } - true - } - StoreOp::DeleteDataColumns(block_root, indices) => { - match indices - .iter() - .map(|index| self.get_data_column(block_root, index)) - .collect::, _>>() - { - Ok(data_column_sidecar_list_opt) => { - let data_column_sidecar_list = data_column_sidecar_list_opt - .into_iter() - .flatten() - .collect::>(); - // Must push the same number of items as StoreOp::DeleteDataColumns items to - // prevent a `HotColdDBError::Rollback` error below in case of rollback - data_columns_to_delete.push((*block_root, data_column_sidecar_list)); - } - Err(e) => { - error!( - %block_root, - error = ?e, - "Error getting data columns" - ); - } - } - true - } - StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, + StoreOp::PutBlobs(..) + | StoreOp::PutDataColumns(..) + | StoreOp::DeleteBlobs(..) + | StoreOp::DeleteDataColumns(..) => true, + StoreOp::PutBlock(..) | StoreOp::DeleteBlock(..) => false, _ => false, }); - // Update database whilst holding a lock on cache, to ensure that the cache updates - // atomically with the database. - let guard = self.block_cache.as_ref().map(|cache| cache.lock()); - - let blob_cache_ops = blobs_ops.clone(); - // Try to execute blobs store ops. - self.blobs_db - .do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; - - let hot_db_cache_ops = hot_db_ops.clone(); - // Try to execute hot db store ops. - let tx_res = match self.convert_to_kv_batch(hot_db_ops) { - Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops), - Err(e) => Err(e), - }; - // Rollback on failure - if let Err(e) = tx_res { - error!( - error = ?e, - action = "reverting blob DB changes", - "Database write failed" - ); - let mut blob_cache_ops = blob_cache_ops; - for op in blob_cache_ops.iter_mut() { - let reverse_op = match op { - StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), - StoreOp::PutDataColumns(block_root, data_columns) => { - let indices = data_columns.iter().map(|c| c.index).collect(); - StoreOp::DeleteDataColumns(*block_root, indices) - } - StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { - Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), - None => return Err(HotColdDBError::Rollback.into()), - }, - StoreOp::DeleteDataColumns(_, _) => match data_columns_to_delete.pop() { - Some((block_root, data_columns)) => { - StoreOp::PutDataColumns(block_root, data_columns) - } - None => return Err(HotColdDBError::Rollback.into()), - }, - _ => return Err(HotColdDBError::Rollback.into()), - }; - *op = reverse_op; - } - self.blobs_db - .do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?; - return Err(e); - } - - // Delete from the state cache. - for op in &hot_db_cache_ops { + // Start by deleting related states from the state cache. It's possible that a concurrent + // thread could reinsert one of these states before we delete it from disk, which could lead + // to other threads observing the state after it should have been deleted. This is OK + // however, as we maintain the one-directed invariant: + // + // - block on disk & newer than split --> state on disk + for op in &hot_db_ops { match op { StoreOp::DeleteBlock(block_root) => { self.state_cache.lock().delete_block_states(block_root); @@ -1493,54 +1287,17 @@ impl, Cold: ItemStore> HotColdDB } } - // If the block cache is enabled, also delete from the block cache. - if let Some(mut guard) = guard { - for op in hot_db_cache_ops { - match op { - StoreOp::PutBlock(block_root, block) => { - guard.put_block(block_root, (*block).clone()); - } - - StoreOp::PutBlobs(_, _) => (), - - StoreOp::PutDataColumns(_, _) => (), - - StoreOp::PutState(_, _) => (), - - StoreOp::PutStateSummary(_, _) => (), - - StoreOp::DeleteBlock(block_root) => { - guard.delete_block(&block_root); - } - - StoreOp::DeleteState(_, _) => (), - - StoreOp::DeleteBlobs(_) => (), - - StoreOp::DeleteDataColumns(_, _) => (), - - StoreOp::DeleteExecutionPayload(_) => (), - - StoreOp::DeleteSyncCommitteeBranch(_) => (), - - StoreOp::KeyValueOp(_) => (), - } - } - - for op in blob_cache_ops { - match op { - StoreOp::PutBlobs(block_root, blobs) => { - guard.put_blobs(block_root, blobs); - } - - StoreOp::DeleteBlobs(block_root) => { - guard.delete_blobs(&block_root); - } + // Execute operations on the blob DB first. In the case of a sudden shutdown or a failed + // write to the hot DB, we allow data to be stored in the blob database that is not + // referenced by any block in the hot database. This maintains the invariant: + // + // - block in hot DB --> corresponding blobs/columns in blob DB + self.blobs_db + .do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; - _ => (), - } - } - } + // Write to the hot database. + self.hot_db + .do_atomically(self.convert_to_kv_batch(hot_db_ops.clone())?)?; Ok(()) } @@ -2469,23 +2226,12 @@ impl, Cold: ItemStore> HotColdDB /// Fetch custody info from the cache. /// If custody info doesn't exist in the cache, /// try to fetch from the DB and prime the cache. + // FIXME(sproul): re-add cache for this? pub fn get_data_column_custody_info(&self) -> Result, Error> { - if let Some(cache) = &self.block_cache - && let Some(data_column_custody_info) = cache.lock().get_data_column_custody_info() - { - return Ok(Some(data_column_custody_info)); - } let data_column_custody_info = self .blobs_db .get::(&DATA_COLUMN_CUSTODY_INFO_KEY)?; - // Update the cache - self.block_cache.as_ref().inspect(|cache| { - cache - .lock() - .put_data_column_custody_info(data_column_custody_info.clone()) - }); - Ok(data_column_custody_info) } @@ -2506,16 +2252,6 @@ impl, Cold: ItemStore> HotColdDB /// Fetch blobs for a given block from the store. pub fn get_blobs(&self, block_root: &Hash256) -> Result, Error> { - // Check the cache. - if let Some(blobs) = self - .block_cache - .as_ref() - .and_then(|cache| cache.lock().get_blobs(block_root).cloned()) - { - metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT); - return Ok(blobs.into()); - } - match self .blobs_db .get_bytes(DBColumn::BeaconBlob, block_root.as_slice())? @@ -2531,10 +2267,6 @@ impl, Cold: ItemStore> HotColdDB .map(|blob| self.spec.max_blobs_per_block(blob.epoch())) { let blobs = BlobSidecarList::new(blobs, max_blobs_per_block as usize)?; - self.block_cache - .as_ref() - .inspect(|cache| cache.lock().put_blobs(*block_root, blobs.clone())); - Ok(BlobSidecarListFromRoot::Blobs(blobs)) } else { // This always implies that there were no blobs for this block_root @@ -2573,27 +2305,12 @@ impl, Cold: ItemStore> HotColdDB block_root: &Hash256, column_index: &ColumnIndex, ) -> Result>>, Error> { - // Check the cache. - if let Some(data_column) = self - .block_cache - .as_ref() - .and_then(|cache| cache.lock().get_data_column(block_root, column_index)) - { - metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT); - return Ok(Some(data_column)); - } - match self.blobs_db.get_bytes( DBColumn::BeaconDataColumn, &get_data_column_key(block_root, column_index), )? { Some(ref data_column_bytes) => { let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?); - self.block_cache.as_ref().inspect(|cache| { - cache - .lock() - .put_data_column(*block_root, data_column.clone()) - }); Ok(Some(data_column)) } None => Ok(None), @@ -3358,14 +3075,6 @@ impl, Cold: ItemStore> HotColdDB } } - // Remove deleted blobs from the cache. - if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) { - for block_root in removed_block_roots { - block_cache.delete_blobs(&block_root); - block_cache.delete_data_columns(&block_root); - } - } - // Remove from disk. if !blobs_db_ops.is_empty() { debug!( diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 93c9840586e..f43a8ff5968 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -235,34 +235,10 @@ pub static BEACON_BLOCK_GET_COUNT: LazyLock> = LazyLock::new( "Total number of beacon blocks requested from the store (cache or DB)", ) }); -pub static BEACON_BLOCK_CACHE_HIT_COUNT: LazyLock> = LazyLock::new(|| { - try_create_int_counter( - "store_beacon_block_cache_hit_total", - "Number of hits to the store's block cache", - ) -}); /* * Caches */ -pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock> = LazyLock::new(|| { - try_create_int_counter( - "store_beacon_blobs_cache_hit_total", - "Number of hits to the store's blob cache", - ) -}); -pub static STORE_BEACON_BLOCK_CACHE_SIZE: LazyLock> = LazyLock::new(|| { - try_create_int_gauge( - "store_beacon_block_cache_size", - "Current count of items in beacon store block cache", - ) -}); -pub static STORE_BEACON_BLOB_CACHE_SIZE: LazyLock> = LazyLock::new(|| { - try_create_int_gauge( - "store_beacon_blob_cache_size", - "Current count of items in beacon store blob cache", - ) -}); pub static STORE_BEACON_STATE_CACHE_SIZE: LazyLock> = LazyLock::new(|| { try_create_int_gauge( "store_beacon_state_cache_size",