diff --git a/Cargo.lock b/Cargo.lock index b3229e98a..cdb1a9bc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1153,6 +1153,7 @@ dependencies = [ "arc-swap", "bitfield", "bitflags 2.9.4", + "bumpalo", "bytemuck", "bytemuck_derive", "bytes", diff --git a/storage/Cargo.toml b/storage/Cargo.toml index cf0c841a8..5277d4d3d 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -47,6 +47,7 @@ io-uring = { version = "0.7.10", optional = true } log = { version = "0.4.28", optional = true } rlp = { version = "0.6.1", optional = true } sha3 = { version = "0.10.8", optional = true } +bumpalo = { version = "3.19.0", features = ["collections", "std"] } [dev-dependencies] # Workspace dependencies diff --git a/storage/src/node/mod.rs b/storage/src/node/mod.rs index 72cb9a95d..c6396c96b 100644 --- a/storage/src/node/mod.rs +++ b/storage/src/node/mod.rs @@ -151,6 +151,21 @@ impl ExtendableBytes for Vec { } } +impl ExtendableBytes for bumpalo::collections::Vec<'_, u8> { + fn extend>(&mut self, other: T) { + std::iter::Extend::extend(self, other); + } + fn reserve(&mut self, reserve: usize) { + self.reserve(reserve); + } + fn push(&mut self, value: u8) { + bumpalo::collections::Vec::push(self, value); + } + fn extend_from_slice(&mut self, other: &[u8]) { + bumpalo::collections::Vec::extend_from_slice(self, other); + } +} + impl Node { /// Returns the partial path of the node. #[must_use] diff --git a/storage/src/nodestore/alloc.rs b/storage/src/nodestore/alloc.rs index 886b064d5..fc43c2dce 100644 --- a/storage/src/nodestore/alloc.rs +++ b/storage/src/nodestore/alloc.rs @@ -358,6 +358,13 @@ impl NodeAllocator<'_, S> { Ok(()) } + + pub fn flush_freelist(&mut self) -> Result<(), FileIoError> { + let free_list_bytes = bytemuck::bytes_of(self.header.free_lists()); + let free_list_offset = NodeStoreHeader::free_lists_offset(); + self.storage.write(free_list_offset, free_list_bytes)?; + Ok(()) + } } /// Iterator over free lists in the nodestore diff --git a/storage/src/nodestore/persist.rs b/storage/src/nodestore/persist.rs index e87845e74..72533d85c 100644 --- a/storage/src/nodestore/persist.rs +++ b/storage/src/nodestore/persist.rs @@ -34,17 +34,17 @@ use crate::nodestore::AreaIndex; use crate::{Child, firewood_counter}; use coarsetime::Instant; +use crate::{FileBacked, MaybePersistedNode, NodeReader, WritableStorage}; + #[cfg(feature = "io-uring")] -use crate::logger::trace; +use crate::ReadableStorage; -use crate::{FileBacked, MaybePersistedNode, NodeReader, WritableStorage}; +#[cfg(feature = "io-uring")] +use crate::logger::trace; #[cfg(test)] use crate::RootReader; -#[cfg(feature = "io-uring")] -use crate::ReadableStorage; - use super::alloc::NodeAllocator; use super::header::NodeStoreHeader; use super::{Committed, NodeStore}; @@ -84,10 +84,22 @@ impl NodeStore { /// # Errors /// /// Returns a [`FileIoError`] if the free list cannot be written to storage. - #[fastrace::trace(short_name = true)] pub fn flush_freelist(&self) -> Result<(), FileIoError> { - // Write the free lists to storage - let free_list_bytes = bytemuck::bytes_of(self.header.free_lists()); + self.flush_freelist_from(&self.header) + } + + /// Persist the freelist from the given header to storage + /// + /// This function is used to ensure that the freelist is advanced after allocating + /// nodes for writing. This allows the database to be recovered from an I/O error while + /// persisting a revision to disk. + /// + /// # Errors + /// + /// Returns a [`FileIoError`] if the free list cannot be written to storage. + #[fastrace::trace(name = "firewood.flush_freelist")] + pub fn flush_freelist_from(&self, header: &NodeStoreHeader) -> Result<(), FileIoError> { + let free_list_bytes = bytemuck::bytes_of(header.free_lists()); let free_list_offset = NodeStoreHeader::free_lists_offset(); self.storage.write(free_list_offset, free_list_bytes)?; Ok(()) @@ -264,6 +276,8 @@ impl NodeStore { self.storage.write_cached_nodes(cached_nodes)?; + allocator.flush_freelist()?; + let flush_time = flush_start.elapsed().as_millis(); firewood_counter!("firewood.flush_nodes", "flushed node amount").increment(flush_time); @@ -298,6 +312,67 @@ impl NodeStore { } } +#[cfg(feature = "io-uring")] +use crate::LinearAddress; +#[cfg(feature = "io-uring")] +use bumpalo::Bump; + +#[cfg(feature = "io-uring")] +struct Batch { + bump: bumpalo::Bump, +} + +#[cfg(feature = "io-uring")] +impl Batch { + const INITIAL_BUMP_SIZE: usize = AreaIndex::MAX_AREA_SIZE as usize; + + fn new() -> Self { + Self { + bump: Bump::with_capacity(Self::INITIAL_BUMP_SIZE), + } + } +} + +#[cfg(feature = "io-uring")] +use std::pin::Pin; + +#[cfg(feature = "io-uring")] +#[derive(Clone, Debug)] +struct PinnedBufferEntry<'a> { + pinned_buffer: Pin<&'a [u8]>, + address: LinearAddress, + node: MaybePersistedNode, +} + +#[cfg(feature = "io-uring")] +/// Helper function to retry `submit_and_wait` on EINTR +fn submit_and_wait_with_retry( + ring: &mut io_uring::IoUring, + wait_nr: u32, + storage: &FileBacked, + operation_name: &str, +) -> Result<(), FileIoError> { + use std::io::ErrorKind::Interrupted; + + loop { + match ring.submit_and_wait(wait_nr as usize) { + Ok(_) => return Ok(()), + Err(e) => { + // Retry if the error is an interrupted system call + if e.kind() == Interrupted { + continue; + } + // For other errors, return the error + return Err(storage.file_io_error( + e, + 0, + Some(format!("io-uring {operation_name}")), + )); + } + } + } +} + impl NodeStore { /// Persist all the nodes of a proposal to storage. /// @@ -307,121 +382,84 @@ impl NodeStore { #[fastrace::trace(short_name = true)] #[cfg(feature = "io-uring")] fn flush_nodes_io_uring(&mut self) -> Result { - use crate::LinearAddress; - use std::io::ErrorKind::Interrupted; - use std::pin::Pin; - - #[derive(Clone, Debug)] - struct PinnedBufferEntry { - pinned_buffer: Pin>, - node: Option<(LinearAddress, MaybePersistedNode)>, - } - - /// Helper function to retry `submit_and_wait` on EINTR - fn submit_and_wait_with_retry( - ring: &mut io_uring::IoUring, - wait_nr: u32, - storage: &FileBacked, - operation_name: &str, - ) -> Result<(), FileIoError> { - loop { - match ring.submit_and_wait(wait_nr as usize) { - Ok(_) => return Ok(()), - Err(e) => { - // Retry if the error is an interrupted system call - if e.kind() == Interrupted { - continue; - } - // For other errors, return the error - return Err(storage.file_io_error( - e, - 0, - Some(format!("io-uring {operation_name}")), - )); - } - } - } - } - - /// Helper function to handle completion queue entries and check for errors - fn handle_completion_queue( - storage: &FileBacked, - completion_queue: io_uring::cqueue::CompletionQueue<'_>, - saved_pinned_buffers: &mut [PinnedBufferEntry], - ) -> Result<(), FileIoError> { - for entry in completion_queue { - let item = entry.user_data() as usize; - let pbe = saved_pinned_buffers - .get_mut(item) - .expect("should be an index into the array"); - - if entry.result() - != pbe - .pinned_buffer - .len() - .try_into() - .expect("buffer should be small enough") - { - let error = if entry.result() >= 0 { - std::io::Error::other("Partial write") - } else { - std::io::Error::from_raw_os_error(0 - entry.result()) - }; - let (addr, _) = pbe.node.as_ref().expect("node should be Some"); - return Err(storage.file_io_error( - error, - addr.get(), - Some("write failure".to_string()), - )); - } - // I/O completed successfully - pbe.node = None; - } - Ok(()) - } - - const RINGSIZE: usize = FileBacked::RINGSIZE as usize; - let flush_start = Instant::now(); let mut header = self.header; let mut node_allocator = NodeAllocator::new(self.storage.as_ref(), &mut header); - // Collect addresses and nodes for caching - let mut cached_nodes = Vec::new(); - - let mut ring = self.storage.ring.lock().expect("poisoned lock"); - let mut saved_pinned_buffers = vec![ - PinnedBufferEntry { - pinned_buffer: Pin::new(Box::new([0; 0])), - node: None, - }; - RINGSIZE - ]; + let mut batch = Batch::new(); + let mut allocated_objects = Vec::new(); // Process each unpersisted node directly from the iterator for node in UnPersistedNodeIterator::new(self) { let shared_node = node.as_shared_node(self).expect("in memory, so no IO"); - let mut serialized = Vec::with_capacity(100); // TODO: better size? we can guess branches are larger - shared_node.as_bytes(AreaIndex::MIN, &mut serialized); - let (persisted_address, area_size_index) = - node_allocator.allocate_node(serialized.as_slice())?; - *serialized.get_mut(0).expect("byte was reserved") = area_size_index.get(); - let mut serialized = serialized.into_boxed_slice(); + // Serialize the node into the bump allocator + let (slice, persisted_address, idx_size) = { + let mut bytes = bumpalo::collections::Vec::new_in(&batch.bump); + shared_node.as_bytes(AreaIndex::MIN, &mut bytes); + let (persisted_address, area_size_index) = + node_allocator.allocate_node(bytes.as_slice())?; + *bytes.get_mut(0).expect("byte was reserved") = area_size_index.get(); + bytes.shrink_to_fit(); + let slice = bytes.into_bump_slice(); + (slice, persisted_address, area_size_index.size() as usize) + }; + + allocated_objects.push((slice, persisted_address, node)); + + // we pause if we can't allocate another node of the same size as the last one + // This isn't a guarantee that we won't exceed INITIAL_BUMP_SIZE + // but it's a good enough approximation + let might_overflow = + batch.bump.allocated_bytes() > Batch::INITIAL_BUMP_SIZE.saturating_sub(idx_size); + if might_overflow { + // must persist freelist before writing anything + node_allocator.flush_freelist()?; + self.ring_writes(allocated_objects)?; + allocated_objects = Vec::new(); + batch.bump.reset(); + } + } + if !allocated_objects.is_empty() { + self.flush_freelist_from(&header)?; + self.ring_writes(allocated_objects)?; + } + let flush_time = flush_start.elapsed().as_millis(); + firewood_counter!("firewood.flush_nodes", "amount flushed nodes").increment(flush_time); + Ok(header) + } + #[cfg(feature = "io-uring")] + fn ring_writes( + &self, + allocated_objects: Vec<(&[u8], LinearAddress, MaybePersistedNode)>, + ) -> Result<(), FileIoError> { + let mut ring = self.storage.ring.lock().expect("poisoned lock"); + + let mut saved_pinned_buffers = + vec![Option::>::None; FileBacked::RINGSIZE as usize]; + + // Collect addresses and nodes for caching + let mut cached_nodes = Vec::new(); + + for (serialized, persisted_address, node) in allocated_objects { loop { // Find the first available write buffer, enumerate to get the position for marking it completed if let Some((pos, pbe)) = saved_pinned_buffers .iter_mut() .enumerate() - .find(|(_, pbe)| pbe.node.is_none()) + .find(|(_, pbe)| pbe.is_none()) { - pbe.pinned_buffer = std::pin::Pin::new(std::mem::take(&mut serialized)); - pbe.node = Some((persisted_address, node.clone())); + let pinned_buffer = std::pin::Pin::new(serialized); + *pbe = Some(PinnedBufferEntry { + pinned_buffer, + address: persisted_address, + node: node.clone(), + }); let submission_queue_entry = self .storage - .make_op(&pbe.pinned_buffer) + .make_op(&pinned_buffer) .offset(persisted_address.get()) .build() .user_data(pos as u64); @@ -453,16 +491,13 @@ impl NodeStore { &self.storage, completion_queue, &mut saved_pinned_buffers, + &mut cached_nodes, )?; } - - // Allocate the node to store the address, then collect for caching and persistence - node.allocate_at(persisted_address); - cached_nodes.push(node); } let pending = saved_pinned_buffers .iter() - .filter(|pbe| pbe.node.is_some()) + .filter(|pbe| pbe.is_some()) .count(); submit_and_wait_with_retry( &mut ring, @@ -471,22 +506,69 @@ impl NodeStore { "final submit_and_wait", )?; - handle_completion_queue(&self.storage, ring.completion(), &mut saved_pinned_buffers)?; + handle_completion_queue( + &self.storage, + ring.completion(), + &mut saved_pinned_buffers, + &mut cached_nodes, + )?; debug_assert!( - !saved_pinned_buffers.iter().any(|pbe| pbe.node.is_some()), + !saved_pinned_buffers + .iter() + .any(std::option::Option::is_some), "Found entry with node still set: {:?}", - saved_pinned_buffers.iter().find(|pbe| pbe.node.is_some()) + saved_pinned_buffers.iter().find(|pbe| pbe.is_some()) ); self.storage.write_cached_nodes(cached_nodes)?; debug_assert!(ring.completion().is_empty()); - let flush_time = flush_start.elapsed().as_millis(); - firewood_counter!("firewood.flush_nodes", "amount flushed nodes").increment(flush_time); + // All references to batch.bump are now dropped, caller can reset it + Ok(()) + } +} - Ok(header) +#[cfg(feature = "io-uring")] +/// Helper function to handle completion queue entries and check for errors +fn handle_completion_queue( + storage: &FileBacked, + completion_queue: io_uring::cqueue::CompletionQueue<'_>, + saved_pinned_buffers: &mut [Option>], + cached_nodes: &mut Vec, +) -> Result<(), FileIoError> { + for entry in completion_queue { + // user data contains the index of the entry in the saved_pinned_buffers array + let item = entry.user_data() as usize; + let pbe = saved_pinned_buffers + .get_mut(item) + .expect("completed item user_data should point to an entry"); + + let pbe_ref = pbe.as_ref().expect("completed items are always in use"); + + let expected_len: i32 = pbe_ref + .pinned_buffer + .len() + .try_into() + .expect("buffer length will fit into an i32"); + if entry.result() != expected_len { + let error = if entry.result() >= 0 { + std::io::Error::other("Partial write") + } else { + std::io::Error::from_raw_os_error(0 - entry.result()) + }; + return Err(storage.file_io_error( + error, + pbe_ref.address.get(), + Some("write failure".to_string()), + )); + } + // I/O completed successfully - mark node as persisted and cache it + let entry = pbe.take().expect("already checked it's Some"); + entry.node.allocate_at(entry.address); + cached_nodes.push(entry.node); } + Ok(()) } #[cfg(test)]