Skip to content

Commit cbb1c5a

Browse files
chore: Added helper to reduce code duplication between Db.propose and Proposal.create_proposal (#1343)
Addresses #1342. Created a `propose_with_parent` to allow code reuse between Db.propose and Proposal.create_proposal. `propose_with_parent` will also choose between using parallel or serial proposal creation based on the batch size. Currently the batch size for using parallel creation is 8, but there is a TODO to update the constant once we have some experimental results. We use `size_hint()` on the batch iterator to estimate the batch size. --------- Signed-off-by: bernard-avalabs <[email protected]>
1 parent 8799a24 commit cbb1c5a

File tree

1 file changed

+109
-85
lines changed

1 file changed

+109
-85
lines changed

firewood/src/db.rs

Lines changed: 109 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use crate::merkle::{Merkle, Value};
1111
use crate::root_store::{NoOpStore, RootStore};
1212
pub use crate::v2::api::BatchOp;
1313
use crate::v2::api::{
14-
self, ArcDynDbView, FrozenProof, FrozenRangeProof, HashKey, KeyType, KeyValuePair,
15-
KeyValuePairIter, OptionalHashKeyExt,
14+
self, ArcDynDbView, FrozenProof, FrozenRangeProof, HashKey, KeyType, KeyValuePairIter,
15+
OptionalHashKeyExt,
1616
};
1717

1818
use crate::manager::{ConfigManager, RevisionManager, RevisionManagerConfig};
@@ -95,6 +95,14 @@ where
9595
}
9696
}
9797

98+
#[allow(dead_code)]
99+
#[derive(Clone, Debug)]
100+
pub enum UseParallel {
101+
Never,
102+
BatchSize(usize),
103+
Always,
104+
}
105+
98106
/// Database configuration.
99107
#[derive(Clone, TypedBuilder, Debug)]
100108
#[non_exhaustive]
@@ -109,13 +117,19 @@ pub struct DbConfig {
109117
/// Revision manager configuration.
110118
#[builder(default = RevisionManagerConfig::builder().build())]
111119
pub manager: RevisionManagerConfig,
120+
// Whether to perform parallel proposal creation. If set to BatchSize, then firewood
121+
// performs parallel proposal creation if the batch is >= to the BatchSize value.
122+
// TODO: Experimentally determine the right value for BatchSize.
123+
#[builder(default = UseParallel::BatchSize(8))]
124+
pub use_parallel: UseParallel,
112125
}
113126

114127
#[derive(Debug)]
115128
/// A database instance.
116129
pub struct Db {
117130
metrics: Arc<DbMetrics>,
118131
manager: RevisionManager,
132+
use_parallel: UseParallel,
119133
}
120134

121135
impl api::Db for Db {
@@ -139,45 +153,11 @@ impl api::Db for Db {
139153
Ok(self.manager.all_hashes())
140154
}
141155

142-
#[fastrace::trace(short_name = true)]
143156
fn propose(
144157
&self,
145158
batch: impl IntoIterator<IntoIter: KeyValuePairIter>,
146159
) -> Result<Self::Proposal<'_>, api::Error> {
147-
let parent = self.manager.current_revision();
148-
let proposal = NodeStore::new(&parent)?;
149-
let mut merkle = Merkle::from(proposal);
150-
let span = fastrace::Span::enter_with_local_parent("merkleops");
151-
for op in batch.into_iter().map_into_batch() {
152-
match op {
153-
BatchOp::Put { key, value } => {
154-
merkle.insert(key.as_ref(), value.as_ref().into())?;
155-
}
156-
BatchOp::Delete { key } => {
157-
merkle.remove(key.as_ref())?;
158-
}
159-
BatchOp::DeleteRange { prefix } => {
160-
merkle.remove_prefix(prefix.as_ref())?;
161-
}
162-
}
163-
}
164-
165-
drop(span);
166-
let span = fastrace::Span::enter_with_local_parent("freeze");
167-
168-
let nodestore = merkle.into_inner();
169-
let immutable: Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>> =
170-
Arc::new(nodestore.try_into()?);
171-
172-
drop(span);
173-
self.manager.add_proposal(immutable.clone());
174-
175-
self.metrics.proposals.increment(1);
176-
177-
Ok(Self::Proposal {
178-
nodestore: immutable,
179-
db: self,
180-
})
160+
self.propose_with_parent(batch, &self.manager.current_revision())
181161
}
182162
}
183163

@@ -204,7 +184,11 @@ impl Db {
204184

205185
let manager =
206186
RevisionManager::new(db_path.as_ref().to_path_buf(), config_manager, root_store)?;
207-
let db = Self { metrics, manager };
187+
let db = Self {
188+
metrics,
189+
manager,
190+
use_parallel: cfg.use_parallel,
191+
};
208192
Ok(db)
209193
}
210194

@@ -231,20 +215,57 @@ impl Db {
231215
latest_rev_nodestore.check(opt)
232216
}
233217

234-
/// Propose a new batch that is processed in parallel.
218+
/// Create a proposal with a specified parent. A proposal is created in parallel if `use_parallel`
219+
/// is `Always` or if `use_parallel` is `BatchSize` and the batch is >= to the `BatchSize` value.
235220
///
236221
/// # Panics
237222
///
238223
/// Panics if the revision manager cannot create a thread pool.
239-
pub fn propose_parallel(
224+
#[fastrace::trace(name = "propose")]
225+
fn propose_with_parent<F: Parentable>(
240226
&self,
241227
batch: impl IntoIterator<IntoIter: KeyValuePairIter>,
228+
parent: &NodeStore<F, FileBacked>,
242229
) -> Result<Proposal<'_>, api::Error> {
243-
let parent = self.manager.current_revision();
244-
let mut parallel_merkle = ParallelMerkle::default();
245-
let immutable =
246-
parallel_merkle.create_proposal(&parent, batch, self.manager.threadpool())?;
230+
// If use_parallel is BatchSize, then perform parallel proposal creation if the batch
231+
// size is >= BatchSize.
232+
let batch = batch.into_iter();
233+
let use_parallel = match self.use_parallel {
234+
UseParallel::Never => false,
235+
UseParallel::Always => true,
236+
UseParallel::BatchSize(required_size) => batch.size_hint().0 >= required_size,
237+
};
238+
let immutable = if use_parallel {
239+
let mut parallel_merkle = ParallelMerkle::default();
240+
let _span = fastrace::Span::enter_with_local_parent("parallel_merkle");
241+
parallel_merkle.create_proposal(parent, batch, self.manager.threadpool())?
242+
} else {
243+
let proposal = NodeStore::new(parent)?;
244+
let mut merkle = Merkle::from(proposal);
245+
let span = fastrace::Span::enter_with_local_parent("merkleops");
246+
for op in batch.into_iter().map_into_batch() {
247+
match op {
248+
BatchOp::Put { key, value } => {
249+
merkle.insert(key.as_ref(), value.as_ref().into())?;
250+
}
251+
BatchOp::Delete { key } => {
252+
merkle.remove(key.as_ref())?;
253+
}
254+
BatchOp::DeleteRange { prefix } => {
255+
merkle.remove_prefix(prefix.as_ref())?;
256+
}
257+
}
258+
}
259+
260+
drop(span);
261+
let _span = fastrace::Span::enter_with_local_parent("freeze");
262+
let nodestore = merkle.into_inner();
263+
Arc::new(nodestore.try_into()?)
264+
};
247265
self.manager.add_proposal(immutable.clone());
266+
267+
self.metrics.proposals.increment(1);
268+
248269
Ok(Proposal {
249270
nodestore: immutable,
250271
db: self,
@@ -313,31 +334,7 @@ impl Proposal<'_> {
313334
&self,
314335
batch: impl IntoIterator<IntoIter: KeyValuePairIter>,
315336
) -> Result<Self, api::Error> {
316-
let parent = self.nodestore.clone();
317-
let proposal = NodeStore::new(&parent)?;
318-
let mut merkle = Merkle::from(proposal);
319-
for op in batch {
320-
match op.into_batch() {
321-
BatchOp::Put { key, value } => {
322-
merkle.insert(key.as_ref(), value.as_ref().into())?;
323-
}
324-
BatchOp::Delete { key } => {
325-
merkle.remove(key.as_ref())?;
326-
}
327-
BatchOp::DeleteRange { prefix } => {
328-
merkle.remove_prefix(prefix.as_ref())?;
329-
}
330-
}
331-
}
332-
let nodestore = merkle.into_inner();
333-
let immutable: Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>> =
334-
Arc::new(nodestore.try_into()?);
335-
self.db.manager.add_proposal(immutable.clone());
336-
337-
Ok(Self {
338-
nodestore: immutable,
339-
db: self.db,
340-
})
337+
self.db.propose_with_parent(batch, &self.nodestore)
341338
}
342339
}
343340

@@ -355,7 +352,7 @@ mod test {
355352
CheckOpt, CheckerError, HashedNodeReader, IntoHashType, NodeStore, TrieHash,
356353
};
357354

358-
use crate::db::{Db, Proposal};
355+
use crate::db::{Db, Proposal, UseParallel};
359356
use crate::root_store::{MockStore, RootStore};
360357
use crate::v2::api::{Db as _, DbView, KeyValuePairIter, Proposal as _};
361358

@@ -606,14 +603,23 @@ mod test {
606603
let keys: Vec<[u8; 1]> = vec![[kv; 1]];
607604
let vals: Vec<Box<[u8]>> = vec![Box::new([kv; 1])];
608605
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
609-
let proposal = db.propose_parallel(kviter).unwrap();
606+
let proposal = db.propose(kviter).unwrap();
610607
proposal.commit().unwrap();
611608
}
612609

613610
// Create, insert, close, open, insert
614-
let db = TestDb::new();
611+
let db = TestDb::new_with_config(
612+
DbConfig::builder()
613+
.use_parallel(UseParallel::Always)
614+
.build(),
615+
);
615616
insert_commit(&db, 1);
616-
let db = db.reopen();
617+
let db = db.reopen_with_config(
618+
DbConfig::builder()
619+
.truncate(false)
620+
.use_parallel(UseParallel::Always)
621+
.build(),
622+
);
617623
insert_commit(&db, 2);
618624
// Check that the keys are still there after the commits
619625
let committed = db.revision(db.root_hash().unwrap().unwrap()).unwrap();
@@ -626,9 +632,17 @@ mod test {
626632
drop(db);
627633

628634
// Open-db1, insert, open-db2, insert
629-
let db1 = TestDb::new();
635+
let db1 = TestDb::new_with_config(
636+
DbConfig::builder()
637+
.use_parallel(UseParallel::Always)
638+
.build(),
639+
);
630640
insert_commit(&db1, 1);
631-
let db2 = TestDb::new();
641+
let db2 = TestDb::new_with_config(
642+
DbConfig::builder()
643+
.use_parallel(UseParallel::Always)
644+
.build(),
645+
);
632646
insert_commit(&db2, 2);
633647
let committed1 = db1.revision(db1.root_hash().unwrap().unwrap()).unwrap();
634648
let committed2 = db2.revision(db2.root_hash().unwrap().unwrap()).unwrap();
@@ -644,14 +658,18 @@ mod test {
644658
#[test]
645659
fn test_propose_parallel() {
646660
const N: usize = 100;
647-
let db = TestDb::new();
661+
let db = TestDb::new_with_config(
662+
DbConfig::builder()
663+
.use_parallel(UseParallel::Always)
664+
.build(),
665+
);
648666

649667
// Test an empty proposal
650668
let keys: Vec<[u8; 0]> = Vec::new();
651669
let vals: Vec<Box<[u8]>> = Vec::new();
652670

653671
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
654-
let proposal = db.propose_parallel(kviter).unwrap();
672+
let proposal = db.propose(kviter).unwrap();
655673
proposal.commit().unwrap();
656674

657675
// Create a proposal consisting of a single entry and an empty key.
@@ -662,7 +680,7 @@ mod test {
662680
let vals: Vec<Box<[u8]>> = vec![Box::new([0; 1])];
663681

664682
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
665-
let proposal = db.propose_parallel(kviter).unwrap();
683+
let proposal = db.propose(kviter).unwrap();
666684

667685
let kviter = keys.iter().zip(vals.iter());
668686
for (k, v) in kviter {
@@ -680,7 +698,7 @@ mod test {
680698
// Create a proposal that deletes the previous entry
681699
let vals: Vec<Box<[u8]>> = vec![Box::new([0; 0])];
682700
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
683-
let proposal = db.propose_parallel(kviter).unwrap();
701+
let proposal = db.propose(kviter).unwrap();
684702

685703
let kviter = keys.iter().zip(vals.iter());
686704
for (k, _v) in kviter {
@@ -699,7 +717,7 @@ mod test {
699717
.unzip();
700718

701719
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
702-
let proposal = db.propose_parallel(kviter).unwrap();
720+
let proposal = db.propose(kviter).unwrap();
703721
let kviter = keys.iter().zip(vals.iter());
704722
for (k, v) in kviter {
705723
assert_eq!(&proposal.val(k).unwrap().unwrap(), v);
@@ -718,7 +736,7 @@ mod test {
718736
.unzip();
719737

720738
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
721-
let proposal = db.propose_parallel(kviter).unwrap();
739+
let proposal = db.propose(kviter).unwrap();
722740
let kviter = keys.iter().zip(vals.iter());
723741
for (k, _v) in kviter {
724742
assert_eq!(proposal.val(k).unwrap(), None);
@@ -729,7 +747,7 @@ mod test {
729747
let keys: Vec<[u8; 0]> = vec![[0; 0]];
730748
let vals: Vec<Box<[u8]>> = vec![Box::new([0; 0])];
731749
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
732-
let proposal = db.propose_parallel(kviter).unwrap();
750+
let proposal = db.propose(kviter).unwrap();
733751
proposal.commit().unwrap();
734752

735753
// Create N keys and values like (key0, value0)..(keyN, valueN)
@@ -746,7 +764,7 @@ mod test {
746764
// Looping twice to test that we are reusing the thread pool.
747765
for _ in 0..2 {
748766
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
749-
let proposal = db.propose_parallel(kviter).unwrap();
767+
let proposal = db.propose(kviter).unwrap();
750768

751769
// iterate over the keys and values again, checking that the values are in the correct proposal
752770
let kviter = keys.iter().zip(vals.iter());
@@ -1090,11 +1108,14 @@ mod test {
10901108

10911109
impl TestDb {
10921110
fn new() -> Self {
1111+
TestDb::new_with_config(DbConfig::builder().build())
1112+
}
1113+
1114+
fn new_with_config(dbconfig: DbConfig) -> Self {
10931115
let tmpdir = tempfile::tempdir().unwrap();
10941116
let dbpath: PathBuf = [tmpdir.path().to_path_buf(), PathBuf::from("testdb")]
10951117
.iter()
10961118
.collect();
1097-
let dbconfig = DbConfig::builder().build();
10981119
let db = Db::new(dbpath, dbconfig).unwrap();
10991120
TestDb { db, tmpdir }
11001121
}
@@ -1110,12 +1131,15 @@ mod test {
11101131
}
11111132

11121133
fn reopen(self) -> Self {
1134+
self.reopen_with_config(DbConfig::builder().truncate(false).build())
1135+
}
1136+
1137+
fn reopen_with_config(self, dbconfig: DbConfig) -> Self {
11131138
let path = self.path();
11141139
let TestDb { db, tmpdir } = self;
11151140

11161141
let root_store = db.into_root_store();
11171142

1118-
let dbconfig = DbConfig::builder().truncate(false).build();
11191143
let db = Db::with_root_store(path, dbconfig, root_store).unwrap();
11201144
TestDb { db, tmpdir }
11211145
}

0 commit comments

Comments
 (0)