diff --git a/Cargo.lock b/Cargo.lock index 3c3797bc9f..48735d1789 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,9 +71,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.20" +version = "0.6.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" dependencies = [ "anstyle", "anstyle-parse", @@ -86,9 +86,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.11" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" [[package]] name = "anstyle-parse" @@ -244,9 +244,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.32.0" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee74396bee4da70c2e27cf94762714c911725efe69d9e2672f998512a67a4ce4" +checksum = "a2b715a6010afb9e457ca2b7c9d2b9c344baa8baed7b38dc476034c171b32575" dependencies = [ "bindgen", "cc", @@ -384,15 +384,15 @@ checksum = "7575182f7272186991736b70173b0ea045398f984bf5ebbb3804736ce1330c9d" [[package]] name = "bytemuck" -version = "1.23.2" +version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677" +checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" [[package]] name = "bytemuck_derive" -version = "1.10.1" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f154e572231cb6ba2bd1176980827e3d5dc04cc183a75dea38109fbdd672d29" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" dependencies = [ "proc-macro2", "quote", @@ -438,9 +438,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.39" +version = "1.2.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1354349954c6fc9cb0deab020f27f783cf0b604e8bb754dc4658ecf0d29c35f" +checksum = "e1d05d92f4b1fd76aad469d46cdd858ca761576082cd37df81416691e50199fb" dependencies = [ "find-msvc-tools", "jobserver", @@ -595,9 +595,9 @@ dependencies = [ [[package]] name = "const_format" -version = "0.2.34" +version = "0.2.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "126f97965c8ad46d6d9163268ff28432e8f6a1196a55578867832e3049df63dd" +checksum = "7faa7469a93a566e9ccc1c73fe783b4a65c274c5ace346038dca9c39fe0030ad" dependencies = [ "const_format_proc_macros", ] @@ -1006,9 +1006,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" +checksum = "0399f9d26e5191ce32c498bebd31e7a3ceabc2745f0ac54af3f335126c3f24b3" [[package]] name = "findshlibs" @@ -1046,6 +1046,7 @@ dependencies = [ "plain_hasher", "pprof", "rand 0.9.2", + "rayon", "rlp", "sha3", "tempfile", @@ -1997,11 +1998,10 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" [[package]] name = "lock_api" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" dependencies = [ - "autocfg", "scopeguard", ] @@ -2334,9 +2334,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" dependencies = [ "lock_api", "parking_lot_core", @@ -2344,15 +2344,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.11" +version = "0.9.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -2639,9 +2639,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.40" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" dependencies = [ "proc-macro2", ] @@ -2777,9 +2777,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.17" +version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ "bitflags 2.9.4", ] @@ -2954,9 +2954,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.6" +version = "0.103.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8572f3c2cb9934231157b45499fc41e1f58c589fdfb81a844ba873265e80f8eb" +checksum = "e10b3f4191e8a80e6b43eebabfac91e5dcecebb27a71f04e820c47ec41d314bf" dependencies = [ "aws-lc-rs", "ring", @@ -3017,9 +3017,9 @@ checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" [[package]] name = "security-framework" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc198e42d9b7510827939c9a15f5062a0c913f3371d765977e586d2fe6c16f4a" +checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ "bitflags 2.9.4", "core-foundation", @@ -3046,9 +3046,9 @@ checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" [[package]] name = "serde" -version = "1.0.227" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80ece43fc6fbed4eb5392ab50c07334d3e577cbf40997ee896fe7af40bba4245" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ "serde_core", "serde_derive", @@ -3056,18 +3056,18 @@ dependencies = [ [[package]] name = "serde_core" -version = "1.0.227" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a576275b607a2c86ea29e410193df32bc680303c82f31e275bbfcafe8b33be5" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.227" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51e694923b8824cf0e9b382adf0f60d4e05f348f357b38833a3fa5ed7c2ede04" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3371,18 +3371,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.16" +version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.16" +version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", @@ -3700,9 +3700,9 @@ dependencies = [ [[package]] name = "triomphe" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef8f7726da4807b58ea5c96fdc122f80702030edc33b35aff9190a51148ccc85" +checksum = "dd69c5aa8f924c7519d6372789a74eac5b94fb0f8fcf0d4a97eb0bfc3e785f39" dependencies = [ "serde", "stable_deref_trait", @@ -3751,9 +3751,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" [[package]] name = "uint" @@ -3775,9 +3775,9 @@ checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" [[package]] name = "unicode-width" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" [[package]] name = "unicode-xid" @@ -4342,9 +4342,9 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zerotrie" diff --git a/ffi/README.md b/ffi/README.md index da24e79c14..6280b6cffa 100644 --- a/ffi/README.md +++ b/ffi/README.md @@ -72,6 +72,7 @@ A `Config` should be provided when creating the database. A default config is pr FreeListCacheEntries: 40000, Revisions: 100, ReadCacheStrategy: OnlyCacheWrites, + Parallel: false, } ``` @@ -83,6 +84,12 @@ If set to `true`, an empty database will be created, overriding any existing fil *Default*: `false` +#### `Parallel` - `bool` + +If set to `true`, proposals are created in parallel. This can improve performance by supporting both parallel reads and parallel hashing during proposal creation. + +*Default*: `false` + #### `Revisions` - `uint` Indicates the number of committed roots accessible before the diff layer is compressed. Must be explicitly set if the config is specified to at least 2. diff --git a/ffi/firewood.go b/ffi/firewood.go index 66b7240d0f..ab1f4aee34 100644 --- a/ffi/firewood.go +++ b/ffi/firewood.go @@ -59,6 +59,7 @@ type Config struct { FreeListCacheEntries uint Revisions uint ReadCacheStrategy CacheStrategy + Parallel bool } // DefaultConfig returns a sensible default Config. @@ -68,6 +69,7 @@ func DefaultConfig() *Config { FreeListCacheEntries: 40_000, Revisions: 100, ReadCacheStrategy: OnlyCacheWrites, + Parallel: false, } } @@ -113,6 +115,7 @@ func New(filePath string, conf *Config) (*Database, error) { revisions: C.size_t(conf.Revisions), strategy: C.uint8_t(conf.ReadCacheStrategy), truncate: C.bool(conf.Truncate), + parallel: C.bool(conf.Parallel), } return getDatabaseFromHandleResult(C.fwd_open_db(args)) diff --git a/ffi/firewood.h b/ffi/firewood.h index cb13778d19..a948d52510 100644 --- a/ffi/firewood.h +++ b/ffi/firewood.h @@ -729,6 +729,10 @@ typedef struct DatabaseHandleArgs { * Whether to truncate the database file if it exists. */ bool truncate; + /** + * Whether to use parallel insert and hashing for propose + */ + bool parallel; } DatabaseHandleArgs; /** diff --git a/ffi/firewood_test.go b/ffi/firewood_test.go index 61816bfb9d..eb748ed2fd 100644 --- a/ffi/firewood_test.go +++ b/ffi/firewood_test.go @@ -127,11 +127,17 @@ func TestMain(m *testing.M) { } func newTestDatabase(t *testing.T) *Database { + conf := DefaultConfig() + conf.Truncate = true // in tests, we use filepath.Join, which creates an empty file + return newTestDatabaseConfig(conf, t) +} + +func newTestDatabaseConfig(conf *Config, t *testing.T) *Database { t.Helper() r := require.New(t) dbFile := filepath.Join(t.TempDir(), "test.db") - db, closeDB, err := newDatabase(dbFile) + db, closeDB, err := newDatabaseConfig(dbFile, conf) r.NoError(err) t.Cleanup(func() { r.NoError(closeDB()) @@ -142,7 +148,10 @@ func newTestDatabase(t *testing.T) *Database { func newDatabase(dbFile string) (*Database, func() error, error) { conf := DefaultConfig() conf.Truncate = true // in tests, we use filepath.Join, which creates an empty file + return newDatabaseConfig(dbFile, conf) +} +func newDatabaseConfig(dbFile string, conf *Config) (*Database, func() error, error) { f, err := New(dbFile, conf) if err != nil { return nil, nil, fmt.Errorf("failed to create new database at filepath %q: %w", dbFile, err) @@ -377,6 +386,43 @@ func TestInvariants(t *testing.T) { r.Empty(got) } +func TestProposalUsingParallel(t *testing.T) { + r := require.New(t) + conf := DefaultConfig() + conf.Truncate = true + conf.Parallel = true // Turn parallel option on + db := newTestDatabaseConfig(conf, t) + + // Create a proposal with 10 keys + const numKeys = 10 + keys := make([][]byte, numKeys) + vals := make([][]byte, numKeys) + for i := 0; i < numKeys; i++ { + keys[i] = keyForTest(i) + vals[i] = valForTest(i) + } + proposal, err := db.Propose(keys, vals) + r.NoError(err) + + // Check that the keys are in the proposal + for i := 0; i < numKeys; i++ { + got, err := proposal.Get(keyForTest(i)) + r.NoError(err) + r.Equal(valForTest(i), got, "Get(%d)", i) + } + + // Commit the proposal + err = proposal.Commit() + r.NoError(err) + + // Check the keys are in the database + for i := 0; i < numKeys; i++ { + got, err := db.Get(keyForTest(i)) + r.NoError(err) + r.Equal(valForTest(i), got, "Get(%d)", i) + } +} + func TestConflictingProposals(t *testing.T) { r := require.New(t) db := newTestDatabase(t) diff --git a/ffi/src/handle.rs b/ffi/src/handle.rs index f03fa26217..5d83dd24dd 100644 --- a/ffi/src/handle.rs +++ b/ffi/src/handle.rs @@ -50,6 +50,9 @@ pub struct DatabaseHandleArgs<'a> { /// Whether to truncate the database file if it exists. pub truncate: bool, + + /// Whether to use parallel insert and hashing for propose + pub parallel: bool, } impl DatabaseHandleArgs<'_> { @@ -102,6 +105,7 @@ impl DatabaseHandle { let cfg = DbConfig::builder() .truncate(args.truncate) .manager(args.as_rev_manager_config()?) + .parallel(args.parallel) .build(); let path = args diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index 852ee2c30b..f8f44ae169 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -35,6 +35,7 @@ metrics.workspace = true thiserror.workspace = true # Regular dependencies typed-builder = "0.22.0" +rayon = "1.11.0" [features] default = [] diff --git a/firewood/src/db.rs b/firewood/src/db.rs index 6aa6fda503..74a1318a57 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -27,6 +27,8 @@ use std::sync::Arc; use thiserror::Error; use typed_builder::TypedBuilder; +use crate::merkle::parallel::ParallelMerkle; + #[derive(Error, Debug)] #[non_exhaustive] /// Represents the different types of errors that can occur in the database. @@ -106,6 +108,9 @@ pub struct DbConfig { /// Revision manager configuration. #[builder(default = RevisionManagerConfig::builder().build())] pub manager: RevisionManagerConfig, + // Whether to use parallel insert and hashing for propose. + #[builder(default = false)] + pub parallel: bool, } #[derive(Debug)] @@ -142,31 +147,42 @@ impl api::Db for Db { batch: impl IntoIterator, ) -> Result, api::Error> { let parent = self.manager.current_revision(); - let proposal = NodeStore::new(&parent)?; - let mut merkle = Merkle::from(proposal); - let span = fastrace::Span::enter_with_local_parent("merkleops"); - for op in batch.into_iter().map_into_batch() { - match op { - BatchOp::Put { key, value } => { - merkle.insert(key.as_ref(), value.as_ref().into())?; - } - BatchOp::Delete { key } => { - merkle.remove(key.as_ref())?; - } - BatchOp::DeleteRange { prefix } => { - merkle.remove_prefix(prefix.as_ref())?; + + // If threadpool exists, then use the parallel propose implementation + let immutable = if let Some(threadpool) = self.manager.threadpool() { + let mut parallel_merkle = ParallelMerkle::default(); + let span = fastrace::Span::enter_with_local_parent("parallel_merkle"); + let immutable = parallel_merkle.create_proposal(&parent, batch, threadpool)?; + drop(span); + immutable + } else { + let proposal = NodeStore::new(&parent)?; + let mut merkle = Merkle::from(proposal); + let span = fastrace::Span::enter_with_local_parent("merkleops"); + for op in batch.into_iter().map_into_batch() { + match op { + BatchOp::Put { key, value } => { + merkle.insert(key.as_ref(), value.as_ref().into())?; + } + BatchOp::Delete { key } => { + merkle.remove(key.as_ref())?; + } + BatchOp::DeleteRange { prefix } => { + merkle.remove_prefix(prefix.as_ref())?; + } } } - } - drop(span); - let span = fastrace::Span::enter_with_local_parent("freeze"); + drop(span); + let span = fastrace::Span::enter_with_local_parent("freeze"); - let nodestore = merkle.into_inner(); - let immutable: Arc, FileBacked>> = - Arc::new(nodestore.try_into()?); + let nodestore = merkle.into_inner(); + let immutable: Arc, FileBacked>> = + Arc::new(nodestore.try_into()?); - drop(span); + drop(span); + immutable + }; self.manager.add_proposal(immutable.clone()); self.metrics.proposals.increment(1); @@ -189,6 +205,7 @@ impl Db { .create(cfg.create_if_missing) .truncate(cfg.truncate) .manager(cfg.manager) + .parallel(cfg.parallel) .build(); let manager = RevisionManager::new(db_path.as_ref().to_path_buf(), config_manager)?; let db = Self { metrics, manager }; @@ -555,6 +572,123 @@ mod test { assert_eq!(&*value, b"proposal_value"); } + #[test] + fn test_propose_parallel() { + const N: usize = 100; + let db = testdb_config(DbConfig::builder().parallel(true).build()); + + // Test an empty proposal + let keys: Vec<[u8; 0]> = Vec::new(); + let vals: Vec> = Vec::new(); + + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + proposal.commit().unwrap(); + + // Create a proposal consisting of a single entry and an empty key. + let keys: Vec<[u8; 0]> = vec![[0; 0]]; + + // Note that if the value is [], then it is interpreted as a DeleteRange. + // Instead, set value to [0] + let vals: Vec> = vec![Box::new([0; 1])]; + + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + + let kviter = keys.iter().zip(vals.iter()); + for (k, v) in kviter { + assert_eq!(&proposal.val(k).unwrap().unwrap(), v); + } + proposal.commit().unwrap(); + + // Check that the key is still there after the commit + let parent = db.manager.current_revision(); + let kviter = keys.iter().zip(vals.iter()); + for (k, v) in kviter { + assert_eq!(&parent.val(k).unwrap().unwrap(), v); + } + + // Create a proposal that deletes the previous entry + let vals: Vec> = vec![Box::new([0; 0])]; + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + + let kviter = keys.iter().zip(vals.iter()); + for (k, _v) in kviter { + assert_eq!(proposal.val(k).unwrap(), None); + } + proposal.commit().unwrap(); + + // Create a proposal that inserts 0 to 999 + let (keys, vals): (Vec<_>, Vec<_>) = (0..1000) + .map(|i| { + ( + format!("key{i}").into_bytes(), + Box::from(format!("value{i}").as_bytes()), + ) + }) + .unzip(); + + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + let kviter = keys.iter().zip(vals.iter()); + for (k, v) in kviter { + assert_eq!(&proposal.val(k).unwrap().unwrap(), v); + } + proposal.commit().unwrap(); + + // Create a proposal that deletes all of the even entries + let (keys, vals): (Vec<_>, Vec<_>) = (0..1000) + .filter_map(|i| { + if i % 2 != 0 { + Some::<(Vec, Box<[u8]>)>((format!("key{i}").into_bytes(), Box::new([]))) + } else { + None + } + }) + .unzip(); + + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + let kviter = keys.iter().zip(vals.iter()); + for (k, _v) in kviter { + assert_eq!(proposal.val(k).unwrap(), None); + } + proposal.commit().unwrap(); + + // Create a proposal that deletes using empty prefix + let keys: Vec<[u8; 0]> = vec![[0; 0]]; + let vals: Vec> = vec![Box::new([0; 0])]; + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + proposal.commit().unwrap(); + + // Create N keys and values like (key0, value0)..(keyN, valueN) + let rng = firewood_storage::SeededRng::from_env_or_random(); + let (keys, vals): (Vec<_>, Vec<_>) = (0..N) + .map(|i| { + ( + rng.random::<[u8; 32]>(), + Box::from(format!("value{i}").as_bytes()), + ) + }) + .unzip(); + + // Looping twice to test that we are reusing the thread pool. + for _ in 0..2 { + let kviter = keys.iter().zip(vals.iter()).map_into_batch(); + let proposal = db.propose(kviter).unwrap(); + + // iterate over the keys and values again, checking that the values are in the correct proposal + let kviter = keys.iter().zip(vals.iter()); + + for (k, v) in kviter { + assert_eq!(&proposal.val(k).unwrap().unwrap(), v); + } + proposal.commit().unwrap(); + } + } + /// Test that proposing on a proposal works as expected /// /// Test creates two batches and proposes them, and verifies that the values are in the correct proposal. @@ -784,11 +918,14 @@ mod test { } fn testdb() -> TestDb { + testdb_config(DbConfig::builder().build()) + } + + fn testdb_config(dbconfig: DbConfig) -> TestDb { let tmpdir = tempfile::tempdir().unwrap(); let dbpath: PathBuf = [tmpdir.path().to_path_buf(), PathBuf::from("testdb")] .iter() .collect(); - let dbconfig = DbConfig::builder().build(); let db = Db::new(dbpath, dbconfig).unwrap(); TestDb { db, tmpdir } } diff --git a/firewood/src/manager.rs b/firewood/src/manager.rs index 5d2dc9bd61..a7b283ed76 100644 --- a/firewood/src/manager.rs +++ b/firewood/src/manager.rs @@ -13,10 +13,11 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZero; use std::path::PathBuf; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, OnceLock, RwLock}; use firewood_storage::logger::{trace, warn}; use metrics::gauge; +use rayon::ThreadPool; use typed_builder::TypedBuilder; use crate::merkle::Merkle; @@ -59,6 +60,9 @@ pub struct ConfigManager { /// Revision manager configuration. #[builder(default = RevisionManagerConfig::builder().build())] pub manager: RevisionManagerConfig, + /// Whether to turn on parallel insert and hashing for propose. + #[builder(default = false)] + pub parallel: bool, } type CommittedRevision = Arc>; @@ -75,6 +79,7 @@ pub(crate) struct RevisionManager { proposals: Mutex>, // committing_proposals: VecDeque>, by_hash: RwLock>, + threadpool: Option>, } #[derive(Debug, thiserror::Error)] @@ -115,6 +120,11 @@ impl RevisionManager { by_hash: RwLock::new(Default::default()), proposals: Mutex::new(Default::default()), // committing_proposals: Default::default(), + threadpool: if config.parallel { + Some(OnceLock::new()) + } else { + None + }, }; if let Some(hash) = nodestore.root_hash().or_default_root_hash() { @@ -309,6 +319,10 @@ impl RevisionManager { .expect("there is always one revision") .clone() } + + pub const fn threadpool(&self) -> Option<&OnceLock> { + self.threadpool.as_ref() + } } #[cfg(test)] diff --git a/firewood/src/merkle.rs b/firewood/src/merkle/mod.rs similarity index 97% rename from firewood/src/merkle.rs rename to firewood/src/merkle/mod.rs index b198cdbc03..848feb5ff6 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle/mod.rs @@ -4,6 +4,9 @@ #[cfg(test)] pub(crate) mod tests; +/// Parallel merkle +pub mod parallel; + use crate::iter::{MerkleKeyValueIter, PathIterator, TryExtend}; use crate::proof::{Proof, ProofCollection, ProofError, ProofNode}; use crate::range_proof::RangeProof; @@ -587,10 +590,17 @@ impl Merkle> { /// Map `key` to `value` in the trie. /// Each element of key is 2 nibbles. pub fn insert(&mut self, key: &[u8], value: Value) -> Result<(), FileIoError> { - let key = Path::from_nibbles_iterator(NibblesIterator::new(key)); + self.insert_from_iter(NibblesIterator::new(key), value) + } + /// Map `key` to `value` in the trie when `key` is a `NibblesIterator` + pub fn insert_from_iter( + &mut self, + key: NibblesIterator<'_>, + value: Value, + ) -> Result<(), FileIoError> { + let key = Path::from_nibbles_iterator(key); let root = self.nodestore.root_mut(); - let Some(root_node) = std::mem::take(root) else { // The trie is empty. Create a new leaf node with `value` and set // it as the root. @@ -751,8 +761,18 @@ impl Merkle> { /// Otherwise returns `None`. /// Each element of `key` is 2 nibbles. pub fn remove(&mut self, key: &[u8]) -> Result, FileIoError> { - let key = Path::from_nibbles_iterator(NibblesIterator::new(key)); + self.remove_from_iter(NibblesIterator::new(key)) + } + /// Removes the value associated with the given `key` where `key` is a `NibblesIterator` + /// Returns the value that was removed, if any. + /// Otherwise returns `None`. + /// Each element of `key` is 2 nibbles. + pub fn remove_from_iter( + &mut self, + key: NibblesIterator<'_>, + ) -> Result, FileIoError> { + let key = Path::from_nibbles_iterator(key); let root = self.nodestore.root_mut(); let Some(root_node) = std::mem::take(root) else { // The trie is empty. There is nothing to remove. @@ -985,8 +1005,16 @@ impl Merkle> { /// Removes any key-value pairs with keys that have the given `prefix`. /// Returns the number of key-value pairs removed. pub fn remove_prefix(&mut self, prefix: &[u8]) -> Result { - let prefix = Path::from_nibbles_iterator(NibblesIterator::new(prefix)); + self.remove_prefix_from_iter(NibblesIterator::new(prefix)) + } + /// Removes any key-value pairs with keys that have the given `prefix` where `prefix` is a `NibblesIterator` + /// Returns the number of key-value pairs removed. + pub fn remove_prefix_from_iter( + &mut self, + prefix: NibblesIterator<'_>, + ) -> Result { + let prefix = Path::from_nibbles_iterator(prefix); let root = self.nodestore.root_mut(); let Some(root_node) = std::mem::take(root) else { // The trie is empty. There is nothing to remove. diff --git a/firewood/src/merkle/parallel.rs b/firewood/src/merkle/parallel.rs new file mode 100644 index 0000000000..9a27459320 --- /dev/null +++ b/firewood/src/merkle/parallel.rs @@ -0,0 +1,485 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE.md for licensing terms. + +use crate::db::BatchOp; +use crate::merkle::{Key, Merkle, Value}; +use crate::v2::api::KeyValuePairIter; +use firewood_storage::{ + BranchNode, Child, FileBacked, FileIoError, ImmutableProposal, LeafNode, MaybePersistedNode, + MutableProposal, NibblesIterator, Node, NodeReader, NodeStore, Parentable, Path, +}; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::iter::once; +use std::ops::Deref; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, OnceLock, mpsc}; + +#[derive(Debug)] +struct WorkerSender(mpsc::Sender); + +impl std::ops::Deref for WorkerSender { + type Target = mpsc::Sender; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A request to the worker. +#[derive(Debug)] +enum Request { + Insert { key: Key, value: Value }, + Delete { key: Key }, + DeleteRange { prefix: Key }, + Done, +} + +/// Response returned from a worker to the main thread. Includes the new root of the subtrie +/// at the given nibble and the deleted nodes. +#[derive(Debug)] +struct Response { + nibble: u8, + root: Option, + deleted_nodes: Vec, +} + +/// `ParallelMerkle` safely performs parallel modifications to a Merkle trie. It does this +/// by creating a worker for each subtrie from the root, and allowing the the workers to +/// perform inserts and removes to their subtries. +#[derive(Debug)] +pub struct ParallelMerkle { + workers: [Option; BranchNode::MAX_CHILDREN], +} + +impl Default for ParallelMerkle { + fn default() -> Self { + Self::new() + } +} + +impl ParallelMerkle { + /// Default constructor + #[must_use] + pub fn new() -> Self { + ParallelMerkle { + workers: [(); BranchNode::MAX_CHILDREN].map(|()| None), + } + } + + /// Normalize the root to allow clean separation of the trie into an array of subtries that + /// can be operated on independently by the worker threads. + fn normalize_root(&self, proposal: &mut NodeStore) { + // There are 3 different cases to handle depending on the value of the root node. + // + // 1. If root is None, create a branch node with an empty partial path and a None for + // value. Create Nones for all of its children. + // 2. If the existing root has a partial path, then create a new root with an empty + // partial path and a None for a value. Push down the previous root as a child. Note + // that this modified Merkle trie is not currently valid and may need to be updated + // during the post-processing step. + // 3. If the existing root does not have a partial path, then there is nothing we need + // to do if it is a branch. If it is a leaf, then convert it into a branch. + // + // Cases 2 and 3 are handled by `normalize_for_insert`. The result after normalization + // is that there is a branch node at the root with an empty partial path. + let root_node = proposal.root_mut().take(); + if let Some(node) = root_node { + *proposal.root_mut() = Some(node.normalize_for_insert()); + } else { + // Empty trie. Create a branch node with an empty partial path and a None for a value. + let branch = BranchNode { + partial_path: Path::new(), + value: None, + children: BranchNode::empty_children(), + }; + *proposal.root_mut() = Some(branch.into()); + } + } + + /// After performing parallel modifications, it may be necessary to perform post processing + /// to return the Merkle trie to the correct canonical form. + fn postprocess_trie( + &self, + nodestore: &mut NodeStore, + mut branch: Box, + ) -> Result, FileIoError> { + // Check if the Merkle trie has an extra root node. If it does, apply transform to + // return trie to a valid state by following the steps below: + // + // If the root node has: + // 0 children and no value, the trie is empty. Just delete the root. + // 0 children and a value (from an empty key), the root should be a leaf + // 1 child and no value, the child should be the root (need to update partial path) + // In all other cases, the root is already correct. + + let mut children_iter = branch + .children + .iter_mut() + .enumerate() + .filter_map(|(index, child)| child.as_mut().map(|child| (index, child))); + + let first_child = children_iter.next(); + match first_child { + None => { + if let Some(value) = branch.value.take() { + // There is a value for the empty key. Create a leaf with the value and return. + Ok(Some(Node::Leaf(LeafNode { + value, + partial_path: Path::new(), // Partial path should be empty + }))) + } else { + Ok(None) // No value. Just delete the root + } + } + Some((child_index, child)) => { + // Check if the root has a value or if there is more than one child. If yes, then + // just return the root unmodified + if branch.value.is_some() || children_iter.next().is_some() { + return Ok(Some((*branch).into())); + } + + // Return the child as the new root. Update its partial path to include the index value. + let mut child = match child { + Child::Node(child_node) => std::mem::take(child_node), + Child::AddressWithHash(addr, _) => nodestore.read_for_update((*addr).into())?, + Child::MaybePersisted(maybe_persisted, _) => { + nodestore.read_for_update(maybe_persisted.clone())? + } + }; + + // The child's partial path is the concatenation of its (now removed) parent, which + // should always be empty because of our prepare step, its (former) child index, and + // its partial path. Because the parent's partial path should always be empty, we + // can omit it and start with the `child_index`. + let partial_path = Path::from_nibbles_iterator( + once(child_index as u8).chain(child.partial_path().iter().copied()), + ); + child.update_partial_path(partial_path); + Ok(Some(child)) + } + } + } + + /// Creates a worker for performing operations on a subtrie, with the subtrie being determined + /// by the value of the `first_nibble`. + fn create_worker( + pool: &ThreadPool, + proposal: &NodeStore, + root_branch: &mut Box, + first_nibble: u8, + worker_sender: Sender>, + ) -> Result { + // Create a channel for the coordinator (main thread) to send messages to this worker. + let (child_sender, child_receiver) = mpsc::channel(); + + // The root's child becomes the root node of the worker + let child_root = root_branch + .take_child(first_nibble) + .map(|child| match child { + Child::Node(node) => Ok(node), + Child::AddressWithHash(address, _) => { + Ok(proposal.read_node(address)?.deref().clone()) + } + Child::MaybePersisted(maybe_persisted, _) => { + Ok(maybe_persisted.as_shared_node(proposal)?.deref().clone()) + } + }) + .transpose()?; + + // Build a nodestore from the child node + let worker_nodestore = NodeStore::from_root(proposal, child_root); + + // Spawn a worker from the threadpool for this nibble. The worker will send messages to the coordinator + // using `worker_sender`. + pool.spawn(move || { + let mut merkle = Merkle::from(worker_nodestore); + + // Wait for a message on the receiver child channel. Break out of loop if there is an error. + while let Ok(request) = child_receiver.recv() { + match request { + // insert a key-value pair into the subtrie + Request::Insert { key, value } => { + let mut nibbles_iter = NibblesIterator::new(&key); + nibbles_iter.next(); // Skip the first nibble + if let Err(err) = + merkle.insert_from_iter(nibbles_iter, value.as_ref().into()) + { + worker_sender + .send(Err(err)) + .expect("send from worker error"); + } + } + Request::Delete { key } => { + let mut nibbles_iter = NibblesIterator::new(&key); + nibbles_iter.next(); // Skip the first nibble + if let Err(err) = merkle.remove_from_iter(nibbles_iter) { + worker_sender + .send(Err(err)) + .expect("send from worker error"); + } + } + Request::DeleteRange { prefix } => { + let mut nibbles_iter = NibblesIterator::new(&prefix); + nibbles_iter.next(); // Skip the first nibble + if let Err(err) = merkle.remove_prefix_from_iter(nibbles_iter) { + worker_sender + .send(Err(err)) + .expect("send from worker error"); + } + } + // Sent from the coordinator to the workers to signal that the batch is done. + Request::Done => { + // Hash this subtrie and return a worker response where the root is a + // Child::MaybePersisted. + let mut nodestore = merkle.into_inner(); + let response = nodestore + .root_mut() + .take() + .map(|root| { + #[cfg(not(feature = "ethhash"))] + let (root_node, root_hash) = + NodeStore::::hash_helper( + root, + Path::from(&[first_nibble]), + )?; + #[cfg(feature = "ethhash")] + let (root_node, root_hash) = + nodestore.hash_helper(root, Path::from(&[first_nibble]))?; + Ok(Child::MaybePersisted(root_node, root_hash)) + }) + .transpose() + .map(|hashed_root| Response { + nibble: first_nibble, + root: hashed_root, + deleted_nodes: nodestore.take_deleted_nodes(), + }); + + worker_sender + .send(response) + .expect("send from worker error"); + break; // Allow the worker to return to the thread pool. + } + } + } + }); + Ok(WorkerSender(child_sender)) + } + + // Send a done message to all of the workers. Collect the responses, each representing the root of a + // subtrie, and merge them into the root node of the main trie. + fn merge_children( + &mut self, + response_channel: Receiver>, + proposal: &mut NodeStore, + root_branch: &mut Box, + ) -> Result<(), FileIoError> { + // We have processed all the ops in the batch, so send a Done message to each worker + for worker in self.workers.iter().flatten() { + worker.send(Request::Done).expect("send to worker error"); + } + + while let Ok(response) = response_channel.recv() { + match response { + Ok(response) => { + // Adding deleted nodes (from calling read_for_update) from the child's nodestore. + proposal.delete_nodes(response.deleted_nodes.as_slice()); + + // Set the child at index to response.root which is the root of the child's subtrie. + *root_branch + .children + .get_mut(response.nibble as usize) + .expect("index error") = response.root; + } + Err(err) => { + return Err(err); // Early termination. + } + } + } + Ok(()) + } + + /// Get a worker from the worker pool based on the `first_nibble` value. Create a worker if + /// it doesn't exist already. + fn get_worker( + &mut self, + pool: &ThreadPool, + proposal: &NodeStore, + root_branch: &mut Box, + first_nibble: u8, + worker_sender: Sender>, + ) -> Result<&mut WorkerSender, FileIoError> { + // Find the worker's state corresponding to the first nibble which are stored in an array. + let worker_option = self + .workers + .get_mut(first_nibble as usize) + .expect("index out of bounds"); + + // Create a new worker if it doesn't exist. Not using `get_or_insert_with` with worker_option + // because it is possible to generate a FileIoError within the closure. + match worker_option { + Some(worker) => Ok(worker), + None => Ok(worker_option.insert(ParallelMerkle::create_worker( + pool, + proposal, + root_branch, + first_nibble, + worker_sender, + )?)), + } + } + + /// Removes all of the entries in the trie. For the root entry, the value is removed but the + /// root itself will remain. An empty root will only be removed during post processing. + fn remove_all_entries(&self, root_branch: &mut Box) { + for worker in self.workers.iter().flatten() { + worker + .send(Request::DeleteRange { + prefix: Box::default(), // Empty prefix + }) + .expect("TODO: handle error"); + } + // Also set the root value to None but does not delete the root. + root_branch.value = None; + } + + /// Creates a parallel proposal in 4 steps: Prepare, Split, Merge, and Post-process. In the + /// Prepare step, the trie is modified to ensure that the root is a branch node with no + /// partial path. In the split step, entries from the batch are sent to workers that + /// independently modify their sub-tries. In the merge step, the sub-tries are merged back + /// to the main trie. Finally, in the post-processing step, the trie is returned to its + /// canonical form. + /// + /// # Errors + /// + /// Can return a `FileIoError` when it fetches nodes from storage. + /// + /// # Panics + /// + /// Panics on errors when sending or receiving messages from workers. Can also panic if the + /// thread pool cannot be created, or logic errors regarding the state of the trie. + pub fn create_proposal( + &mut self, + parent: &NodeStore, + batch: impl IntoIterator, + threadpool: &OnceLock, + ) -> Result, FileBacked>>, FileIoError> { + // Get (or create) a threadpool + let pool = threadpool.get_or_init(|| { + ThreadPoolBuilder::new() + .num_threads(BranchNode::MAX_CHILDREN) + .build() + .expect("Error in creating threadpool") + }); + + // Create a proposal from the parent + let mut proposal = NodeStore::new(parent)?; + + // Prepare step: normalize the root in preparation for performing parallel modifications + // to the trie. + self.normalize_root(&mut proposal); + + let mut root_branch = proposal + .root_mut() + .take() + .expect("Should have a root node after prepare step") + .into_branch() + .expect("Root should be a branch after prepare step"); + + // Create a response channel the workers use to send messages back to the coordinator (us) + let (response_sender, response_receiver) = mpsc::channel(); + + // Split step: for each operation in the batch, send a request to the worker that is + // responsible for the sub-trie corresponding to the operation's first nibble. + for op in batch.into_iter().map_into_batch() { + // Get the first nibble of the key to determine which worker to send the request to. + // + // Need to handle an empty key. Since the partial_path of the root must be empty, an + // empty key should always be for the root node. There are 3 cases to consider. + // + // Insert: The main thread modifies the value of the root. + // + // Remove: The main thread removes any value at the root. However, it should not delete + // the root node, which, if necessary later, will be done in post processing. + // + // Remove Prefix: + // For a remove prefix, we would need to remove everything. We do this by sending + // a remove prefix with an empty prefix to all of the children, then removing the + // value of the root node. + let mut key_nibbles = NibblesIterator::new(op.key().as_ref()); + let Some(first_nibble) = key_nibbles.next() else { + match &op { + BatchOp::Put { key: _, value } => { + root_branch.value = Some(value.as_ref().into()); + } + BatchOp::Delete { key: _ } => { + root_branch.value = None; + } + BatchOp::DeleteRange { prefix: _ } => { + // Calling remove prefix with an empty prefix is equivalent to a remove all. + self.remove_all_entries(&mut root_branch); + } + } + continue; // Done with this empty key operation. + }; + + // Get the worker that is responsible for this nibble. The worker will be created if it + // doesn't already exist. + let worker = self.get_worker( + pool, + &proposal, + &mut root_branch, + first_nibble, + response_sender.clone(), + )?; + + // Send the current operation to the worker. + // TODO: Currently the key from the BatchOp is copied to a Box<[u8]> before it is sent + // to the worker. It may be possible to send a nibble iterator instead of a + // Box<[u8]> to the worker if we use rayon scoped threads. This change would + // eliminate a memory copy but may require some code refactoring. + match &op { + BatchOp::Put { key: _, value } => { + worker + .send(Request::Insert { + key: op.key().as_ref().into(), + value: value.as_ref().into(), + }) + .expect("send to worker error"); + } + BatchOp::Delete { key: _ } => { + worker + .send(Request::Delete { + key: op.key().as_ref().into(), + }) + .expect("send to worker error"); + } + BatchOp::DeleteRange { prefix: _ } => { + worker + .send(Request::DeleteRange { + prefix: op.key().as_ref().into(), + }) + .expect("send to worker error"); + } + } + } + + // Drop the sender response channel from the parent thread. + drop(response_sender); + + // Merge step: send a done message to all of the workers to indicate that the batch is complete. + // Collect the results from the workers and merge them as children to the root. + self.merge_children(response_receiver, &mut proposal, &mut root_branch)?; + + // Post-process step: return the trie to its canonical form. + *proposal.root_mut() = self.postprocess_trie(&mut proposal, root_branch)?; + + // Done with these worker senders. Setting the workers to None will allow the next create + // proposal from the same ParallelMerkle to reuse the thread pool. + self.workers = [(); BranchNode::MAX_CHILDREN].map(|()| None); + + let immutable: Arc, FileBacked>> = + Arc::new(proposal.try_into().expect("error creating immutable")); + + Ok(immutable) + } +} diff --git a/storage/src/node/branch.rs b/storage/src/node/branch.rs index 0813d5168f..ed2cd54947 100644 --- a/storage/src/node/branch.rs +++ b/storage/src/node/branch.rs @@ -437,6 +437,15 @@ impl BranchNode { .expect("child_index is in bounds") } + /// Takes the child at the given index. + /// Panics if `child_index` >= [`BranchNode::MAX_CHILDREN`]. + pub fn take_child(&mut self, child_index: u8) -> Option { + self.children + .get_mut(child_index as usize) + .expect("index error") + .take() + } + /// Update the child at `child_index` to be `new_child_addr`. /// If `new_child_addr` is None, the child is removed. pub fn update_child(&mut self, child_index: u8, new_child: Option) { diff --git a/storage/src/node/mod.rs b/storage/src/node/mod.rs index 72cb9a95de..e1e74059a1 100644 --- a/storage/src/node/mod.rs +++ b/storage/src/node/mod.rs @@ -402,6 +402,45 @@ impl Node { } } } + + /// Normalizes the node (which should be a root for a trie) in preparation for a parallel + /// insert. In its normalized form, a node will always be a branch with no partial path. + /// There are two cases to handle depending on whether the node has a partial path. + /// + /// 1. If the node has a partial path, then create a new node with an empty partial path + /// and a None for a value. Push down the previous node as a child and return the + /// branch. + /// 2. If the existing node does not have a partial path, then there is nothing we need + /// to do if it is a branch. If it is a leaf, then convert it into a branch. + #[must_use] + pub fn normalize_for_insert(mut self) -> Self { + // If the `partial_path` is non-empty, then create a branch that will be the new + // root with the previous root as the child at the index returned from split_first. + if let Some((child_index, child_path)) = self + .partial_path() + .split_first() + .map(|(index, path)| (*index, path.into())) + { + let mut branch = BranchNode { + partial_path: Path::new(), + value: None, + children: BranchNode::empty_children(), + }; + self.update_partial_path(child_path); + branch.update_child(child_index, Some(Child::Node(self))); + branch.into() + } else if let Node::Leaf(mut leaf) = self { + // Root is a leaf with an empty partial path. Replace it with a branch. + BranchNode { + partial_path: Path::new(), + value: Some(std::mem::take(&mut leaf.value)), + children: BranchNode::empty_children(), + } + .into() + } else { + self + } + } } /// A path iterator item, which has the key nibbles up to this point, diff --git a/storage/src/nodestore/hash.rs b/storage/src/nodestore/hash.rs index f9ed6b60ad..2210993e1b 100644 --- a/storage/src/nodestore/hash.rs +++ b/storage/src/nodestore/hash.rs @@ -113,13 +113,18 @@ where ) } - /// Hashes the given `node` and the subtree rooted at it. - /// Returns the hashed node and its hash. - pub(super) fn hash_helper( + /// Hashes the given `node` and the subtree rooted at it. The `root_path` should be empty + /// if this is called from the root, or it should include the partial path if this is called + /// on a subtrie. Returns the hashed node and its hash. + /// + /// # Errors + /// + /// Can return a `FileIoError` if it is unable to read a node that it is hashing. + pub fn hash_helper( #[cfg(feature = "ethhash")] &self, node: Node, + mut root_path: Path, ) -> Result<(MaybePersistedNode, HashType), FileIoError> { - let mut root_path = Path::new(); #[cfg(not(feature = "ethhash"))] let res = Self::hash_helper_inner(node, PathGuard::from_path(&mut root_path))?; #[cfg(feature = "ethhash")] diff --git a/storage/src/nodestore/mod.rs b/storage/src/nodestore/mod.rs index 0afffb636b..aa4e9fe418 100644 --- a/storage/src/nodestore/mod.rs +++ b/storage/src/nodestore/mod.rs @@ -255,6 +255,16 @@ impl NodeStore { self.kind.deleted.push(node); } + /// Take the nodes that have been marked as deleted in this proposal. + pub fn take_deleted_nodes(&mut self) -> Vec { + take(&mut self.kind.deleted) + } + + /// Adds to the nodes deleted in this proposal. + pub fn delete_nodes(&mut self, nodes: &[MaybePersistedNode]) { + self.kind.deleted.extend_from_slice(nodes); + } + /// Reads a node for update, marking it as deleted in this proposal. /// We get an arc from cache (reading it from disk if necessary) then /// copy/clone the node and return it. @@ -274,6 +284,28 @@ impl NodeStore { } } +impl NodeStore { + /// Creates a new [`NodeStore`] from a root node. + #[must_use] + pub fn from_root(parent: &NodeStore, root: Option) -> Self { + NodeStore { + header: parent.header, + kind: MutableProposal { + root, + deleted: Vec::default(), + parent: parent.kind.parent.clone(), + }, + storage: parent.storage.clone(), + } + } + + /// Consumes the `NodeStore` and returns the root of the trie + #[must_use] + pub fn into_root(self) -> Option { + self.kind.root + } +} + impl NodeStore { /// Creates a new, empty, [`NodeStore`] and clobbers the underlying `storage` with an empty header. /// This is used during testing and during the creation of an in-memory merkle for proofs @@ -539,11 +571,11 @@ impl TryFrom> return Ok(nodestore); }; - // Hashes the trie and returns the address of the new root. + // Hashes the trie with an empty path and returns the address of the new root. #[cfg(feature = "ethhash")] - let (root, root_hash) = nodestore.hash_helper(root)?; + let (root, root_hash) = nodestore.hash_helper(root, Path::new())?; #[cfg(not(feature = "ethhash"))] - let (root, root_hash) = NodeStore::::hash_helper(root)?; + let (root, root_hash) = NodeStore::::hash_helper(root, Path::new())?; let immutable_proposal = Arc::into_inner(nodestore.kind).expect("no other references to the proposal");