diff --git a/firewood/src/db.rs b/firewood/src/db.rs index 8ac60d275..3703b07cc 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -11,8 +11,8 @@ use crate::merkle::{Merkle, Value}; use crate::root_store::{NoOpStore, RootStore}; pub use crate::v2::api::BatchOp; use crate::v2::api::{ - self, ArcDynDbView, FrozenProof, FrozenRangeProof, HashKey, KeyType, KeyValuePair, - KeyValuePairIter, OptionalHashKeyExt, + self, ArcDynDbView, FrozenProof, FrozenRangeProof, HashKey, KeyType, KeyValuePairIter, + OptionalHashKeyExt, }; use crate::manager::{ConfigManager, RevisionManager, RevisionManagerConfig}; @@ -95,6 +95,14 @@ where } } +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub enum UseParallel { + Never, + BatchSize(usize), + Always, +} + /// Database configuration. #[derive(Clone, TypedBuilder, Debug)] #[non_exhaustive] @@ -109,6 +117,11 @@ pub struct DbConfig { /// Revision manager configuration. #[builder(default = RevisionManagerConfig::builder().build())] pub manager: RevisionManagerConfig, + // Whether to perform parallel proposal creation. If set to BatchSize, then firewood + // performs parallel proposal creation if the batch is >= to the BatchSize value. + // TODO: Experimentally determine the right value for BatchSize. + #[builder(default = UseParallel::BatchSize(8))] + pub use_parallel: UseParallel, } #[derive(Debug)] @@ -116,6 +129,7 @@ pub struct DbConfig { pub struct Db { metrics: Arc, manager: RevisionManager, + use_parallel: UseParallel, } impl api::Db for Db { @@ -139,45 +153,11 @@ impl api::Db for Db { Ok(self.manager.all_hashes()) } - #[fastrace::trace(short_name = true)] fn propose( &self, batch: impl IntoIterator, ) -> Result, api::Error> { - let parent = self.manager.current_revision(); - let proposal = NodeStore::new(&parent)?; - let mut merkle = Merkle::from(proposal); - let span = fastrace::Span::enter_with_local_parent("merkleops"); - for op in batch.into_iter().map_into_batch() { - match op { - BatchOp::Put { key, value } => { - merkle.insert(key.as_ref(), value.as_ref().into())?; - } - BatchOp::Delete { key } => { - merkle.remove(key.as_ref())?; - } - BatchOp::DeleteRange { prefix } => { - merkle.remove_prefix(prefix.as_ref())?; - } - } - } - - drop(span); - let span = fastrace::Span::enter_with_local_parent("freeze"); - - let nodestore = merkle.into_inner(); - let immutable: Arc, FileBacked>> = - Arc::new(nodestore.try_into()?); - - drop(span); - self.manager.add_proposal(immutable.clone()); - - self.metrics.proposals.increment(1); - - Ok(Self::Proposal { - nodestore: immutable, - db: self, - }) + self.propose_with_parent(batch, &self.manager.current_revision()) } } @@ -204,7 +184,11 @@ impl Db { let manager = RevisionManager::new(db_path.as_ref().to_path_buf(), config_manager, root_store)?; - let db = Self { metrics, manager }; + let db = Self { + metrics, + manager, + use_parallel: cfg.use_parallel, + }; Ok(db) } @@ -231,20 +215,57 @@ impl Db { latest_rev_nodestore.check(opt) } - /// Propose a new batch that is processed in parallel. + /// Create a proposal with a specified parent. A proposal is created in parallel if `use_parallel` + /// is `Always` or if `use_parallel` is `BatchSize` and the batch is >= to the `BatchSize` value. /// /// # Panics /// /// Panics if the revision manager cannot create a thread pool. - pub fn propose_parallel( + #[fastrace::trace(name = "propose")] + fn propose_with_parent( &self, batch: impl IntoIterator, + parent: &NodeStore, ) -> Result, api::Error> { - let parent = self.manager.current_revision(); - let mut parallel_merkle = ParallelMerkle::default(); - let immutable = - parallel_merkle.create_proposal(&parent, batch, self.manager.threadpool())?; + // If use_parallel is BatchSize, then perform parallel proposal creation if the batch + // size is >= BatchSize. + let batch = batch.into_iter(); + let use_parallel = match self.use_parallel { + UseParallel::Never => false, + UseParallel::Always => true, + UseParallel::BatchSize(required_size) => batch.size_hint().0 >= required_size, + }; + let immutable = if use_parallel { + let mut parallel_merkle = ParallelMerkle::default(); + let _span = fastrace::Span::enter_with_local_parent("parallel_merkle"); + parallel_merkle.create_proposal(parent, batch, self.manager.threadpool())? + } else { + let proposal = NodeStore::new(parent)?; + let mut merkle = Merkle::from(proposal); + let span = fastrace::Span::enter_with_local_parent("merkleops"); + for op in batch.into_iter().map_into_batch() { + match op { + BatchOp::Put { key, value } => { + merkle.insert(key.as_ref(), value.as_ref().into())?; + } + BatchOp::Delete { key } => { + merkle.remove(key.as_ref())?; + } + BatchOp::DeleteRange { prefix } => { + merkle.remove_prefix(prefix.as_ref())?; + } + } + } + + drop(span); + let _span = fastrace::Span::enter_with_local_parent("freeze"); + let nodestore = merkle.into_inner(); + Arc::new(nodestore.try_into()?) + }; self.manager.add_proposal(immutable.clone()); + + self.metrics.proposals.increment(1); + Ok(Proposal { nodestore: immutable, db: self, @@ -313,31 +334,7 @@ impl Proposal<'_> { &self, batch: impl IntoIterator, ) -> Result { - let parent = self.nodestore.clone(); - let proposal = NodeStore::new(&parent)?; - let mut merkle = Merkle::from(proposal); - for op in batch { - match op.into_batch() { - BatchOp::Put { key, value } => { - merkle.insert(key.as_ref(), value.as_ref().into())?; - } - BatchOp::Delete { key } => { - merkle.remove(key.as_ref())?; - } - BatchOp::DeleteRange { prefix } => { - merkle.remove_prefix(prefix.as_ref())?; - } - } - } - let nodestore = merkle.into_inner(); - let immutable: Arc, FileBacked>> = - Arc::new(nodestore.try_into()?); - self.db.manager.add_proposal(immutable.clone()); - - Ok(Self { - nodestore: immutable, - db: self.db, - }) + self.db.propose_with_parent(batch, &self.nodestore) } } @@ -355,7 +352,7 @@ mod test { CheckOpt, CheckerError, HashedNodeReader, IntoHashType, NodeStore, TrieHash, }; - use crate::db::{Db, Proposal}; + use crate::db::{Db, Proposal, UseParallel}; use crate::root_store::{MockStore, RootStore}; use crate::v2::api::{Db as _, DbView, KeyValuePairIter, Proposal as _}; @@ -606,14 +603,23 @@ mod test { let keys: Vec<[u8; 1]> = vec![[kv; 1]]; let vals: Vec> = vec![Box::new([kv; 1])]; let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); proposal.commit().unwrap(); } // Create, insert, close, open, insert - let db = TestDb::new(); + let db = TestDb::new_with_config( + DbConfig::builder() + .use_parallel(UseParallel::Always) + .build(), + ); insert_commit(&db, 1); - let db = db.reopen(); + let db = db.reopen_with_config( + DbConfig::builder() + .truncate(false) + .use_parallel(UseParallel::Always) + .build(), + ); insert_commit(&db, 2); // Check that the keys are still there after the commits let committed = db.revision(db.root_hash().unwrap().unwrap()).unwrap(); @@ -626,9 +632,17 @@ mod test { drop(db); // Open-db1, insert, open-db2, insert - let db1 = TestDb::new(); + let db1 = TestDb::new_with_config( + DbConfig::builder() + .use_parallel(UseParallel::Always) + .build(), + ); insert_commit(&db1, 1); - let db2 = TestDb::new(); + let db2 = TestDb::new_with_config( + DbConfig::builder() + .use_parallel(UseParallel::Always) + .build(), + ); insert_commit(&db2, 2); let committed1 = db1.revision(db1.root_hash().unwrap().unwrap()).unwrap(); let committed2 = db2.revision(db2.root_hash().unwrap().unwrap()).unwrap(); @@ -644,14 +658,18 @@ mod test { #[test] fn test_propose_parallel() { const N: usize = 100; - let db = TestDb::new(); + let db = TestDb::new_with_config( + DbConfig::builder() + .use_parallel(UseParallel::Always) + .build(), + ); // Test an empty proposal let keys: Vec<[u8; 0]> = Vec::new(); let vals: Vec> = Vec::new(); let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); proposal.commit().unwrap(); // Create a proposal consisting of a single entry and an empty key. @@ -662,7 +680,7 @@ mod test { let vals: Vec> = vec![Box::new([0; 1])]; let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); let kviter = keys.iter().zip(vals.iter()); for (k, v) in kviter { @@ -680,7 +698,7 @@ mod test { // Create a proposal that deletes the previous entry let vals: Vec> = vec![Box::new([0; 0])]; let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); let kviter = keys.iter().zip(vals.iter()); for (k, _v) in kviter { @@ -699,7 +717,7 @@ mod test { .unzip(); let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); let kviter = keys.iter().zip(vals.iter()); for (k, v) in kviter { assert_eq!(&proposal.val(k).unwrap().unwrap(), v); @@ -718,7 +736,7 @@ mod test { .unzip(); let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); let kviter = keys.iter().zip(vals.iter()); for (k, _v) in kviter { assert_eq!(proposal.val(k).unwrap(), None); @@ -729,7 +747,7 @@ mod test { let keys: Vec<[u8; 0]> = vec![[0; 0]]; let vals: Vec> = vec![Box::new([0; 0])]; let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); proposal.commit().unwrap(); // Create N keys and values like (key0, value0)..(keyN, valueN) @@ -746,7 +764,7 @@ mod test { // Looping twice to test that we are reusing the thread pool. for _ in 0..2 { let kviter = keys.iter().zip(vals.iter()).map_into_batch(); - let proposal = db.propose_parallel(kviter).unwrap(); + let proposal = db.propose(kviter).unwrap(); // iterate over the keys and values again, checking that the values are in the correct proposal let kviter = keys.iter().zip(vals.iter()); @@ -1090,11 +1108,14 @@ mod test { impl TestDb { fn new() -> Self { + TestDb::new_with_config(DbConfig::builder().build()) + } + + fn new_with_config(dbconfig: DbConfig) -> Self { let tmpdir = tempfile::tempdir().unwrap(); let dbpath: PathBuf = [tmpdir.path().to_path_buf(), PathBuf::from("testdb")] .iter() .collect(); - let dbconfig = DbConfig::builder().build(); let db = Db::new(dbpath, dbconfig).unwrap(); TestDb { db, tmpdir } } @@ -1110,12 +1131,15 @@ mod test { } fn reopen(self) -> Self { + self.reopen_with_config(DbConfig::builder().truncate(false).build()) + } + + fn reopen_with_config(self, dbconfig: DbConfig) -> Self { let path = self.path(); let TestDb { db, tmpdir } = self; let root_store = db.into_root_store(); - let dbconfig = DbConfig::builder().truncate(false).build(); let db = Db::with_root_store(path, dbconfig, root_store).unwrap(); TestDb { db, tmpdir } }