Skip to content

chore: reord wal to be stateful #2355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 04-13-chore_impl_explicit_journal_mocking
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/common/sqlite-vfs-fdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod sqlite;
pub mod utils;
pub mod vfs;
pub mod wal;
pub mod wal_parser;

// Re-export foundationdb for convenience
pub use foundationdb;
Expand Down
25 changes: 16 additions & 9 deletions packages/common/sqlite-vfs-fdb/src/vfs/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use tracing;

use super::super::fdb::metadata::FdbFileMetadata;
use super::general::FdbVfs;
// Import from fdb is now handled through the WalManager
use crate::metrics;
use crate::utils::{LockState, SQLITE_IOERR, SQLITE_OK};
use crate::wal::WalManager;

// Import submodules
pub mod db;
Expand Down Expand Up @@ -80,6 +82,8 @@ pub struct FdbFileExt {
pub shm_region_size: usize,
/// Number of active SHM maps (for reference counting)
pub shm_map_count: u32,
/// WAL manager for WAL operations
pub wal_manager: WalManager,
}

/// The I/O methods for FdbFile
Expand Down Expand Up @@ -283,15 +287,18 @@ pub unsafe extern "C" fn fdb_file_write(

// Dispatch to appropriate write handler based on file type
match file_type {
SqliteFileType::WAL => wal::write_wal_file(
&file_path,
file_id,
offset,
&buf_data,
page_size as usize,
&db,
&keyspace,
),
SqliteFileType::WAL => {
// Get a reference to the file extension to use the WAL manager
let ext = fdb_file.ext.assume_init_ref();
wal::write_wal_file(
&file_path,
file_id,
offset,
&buf_data,
page_size as usize,
ext,
)
},
SqliteFileType::Journal => {
// For journal files, we provide a minimal implementation that just logs operations
// In WAL mode, SQLite only uses the journal file temporarily during initialization
Expand Down
19 changes: 7 additions & 12 deletions packages/common/sqlite-vfs-fdb/src/vfs/file/wal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libsqlite3_sys::*;
use std::os::raw::{c_int, c_void};
use std::slice;
use std::sync::Arc;
// Arc is now handled through FdbFileExt
use tracing;
use uuid::Uuid;

Expand All @@ -15,16 +15,13 @@ pub fn write_wal_file(
offset: i64,
buf_data: &[u8],
page_size: usize,
db: &Arc<foundationdb::Database>,
keyspace: &crate::fdb::keyspace::FdbKeySpace,
extension: &crate::vfs::file::FdbFileExt,
) -> c_int {
tracing::debug!("Writing to WAL file");

// Create WAL manager
let wal_manager = crate::wal::WalManager::new(db.clone(), keyspace.clone());

// Use the stored WAL manager from extension
// Process the WAL write
match wal_manager.process_wal_write(&file_id, offset, &buf_data, page_size) {
match extension.wal_manager.process_wal_write(&file_id, offset, &buf_data, page_size) {
Ok(bytes_written) => {
metrics::record_write_operation(file_path, bytes_written, 1, 0);
SQLITE_OK
Expand All @@ -44,15 +41,13 @@ pub unsafe fn read_wal_file(
count: c_int,
offset: i64,
extension: &crate::vfs::file::FdbFileExt,
vfs: &crate::vfs::general::FdbVfs,
_vfs: &crate::vfs::general::FdbVfs,
) -> c_int {
tracing::info!("Reading from WAL file: {}", file_path);

// Create WAL manager
let wal_manager = crate::wal::WalManager::new(extension.db.clone(), vfs.keyspace.clone());

// Use the stored WAL manager from extension
// Use WAL manager to read data
match wal_manager.read_wal_data(&extension.metadata.file_id, offset, count as usize) {
match extension.wal_manager.read_wal_data(&extension.metadata.file_id, offset, count as usize) {
Ok(data) => {
// We already zeroed the buffer, so now just copy the actual data
if !data.is_empty() {
Expand Down
3 changes: 3 additions & 0 deletions packages/common/sqlite-vfs-fdb/src/vfs/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::utils::{run_fdb_tx, FdbVfsError, LockState, DEFAULT_PAGE_SIZE, FDB_VF
use crate::utils::{
SQLITE_CANTOPEN, SQLITE_IOERR, SQLITE_OK, SQLITE_OPEN_CREATE, SQLITE_OPEN_READONLY,
};
use crate::wal::WalManager;

/// Main FoundationDB VFS implementation
pub struct FdbVfs {
Expand Down Expand Up @@ -419,6 +420,8 @@ pub unsafe extern "C" fn vfs_open(
shm_region_size: super::file::SHM_REGION_SIZE,
// Initial map count is 0
shm_map_count: 0,
// Initialize WAL manager - will be used for WAL operations
wal_manager: WalManager::new((*vfs_instance).db.clone(), (*vfs_instance).keyspace.clone()),
};
fdb_file.ext = MaybeUninit::new(ext);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::fdb::keyspace::FdbKeySpace;
use crate::utils::{run_fdb_tx, FdbVfsError};
use crate::wal_parser::{WalFrame, WalHeader, WalParser};
use crate::wal::parser::{WalFrame, WalHeader, WalParser};
use foundationdb::{Database, FdbBindingError};
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -102,15 +102,15 @@ impl WalManager {

// If the write is to offset 0, it might include the WAL header
if offset == 0 {
if data.len() >= crate::wal_parser::WAL_HEADER_SIZE {
if data.len() >= super::parser::WAL_HEADER_SIZE {
// Try to parse and store the header
match WalHeader::from_bytes(&data[0..crate::wal_parser::WAL_HEADER_SIZE]) {
match WalHeader::from_bytes(&data[0..super::parser::WAL_HEADER_SIZE]) {
Ok(header) => {
self.store_wal_header(file_id, &header)?;

// If the data only contains the header, we're done
if data.len() == crate::wal_parser::WAL_HEADER_SIZE {
return Ok(crate::wal_parser::WAL_HEADER_SIZE);
if data.len() == super::parser::WAL_HEADER_SIZE {
return Ok(super::parser::WAL_HEADER_SIZE);
}
}
Err(e) => {
Expand Down Expand Up @@ -138,8 +138,8 @@ impl WalManager {
};

// Skip header if it's included in the data and we're at offset 0
let data_to_parse = if offset == 0 && data.len() >= crate::wal_parser::WAL_HEADER_SIZE {
&data[crate::wal_parser::WAL_HEADER_SIZE..]
let data_to_parse = if offset == 0 && data.len() >= super::parser::WAL_HEADER_SIZE {
&data[super::parser::WAL_HEADER_SIZE..]
} else {
data
};
Expand All @@ -148,12 +148,12 @@ impl WalManager {
parser.add_data(data_to_parse);

// Calculate the frame index based on the offset
let frame_size = crate::wal_parser::FRAME_HEADER_SIZE + header.page_size as usize;
let frame_size = super::parser::FRAME_HEADER_SIZE + header.page_size as usize;
let first_frame_idx = if offset == 0 {
0
} else {
// If offset > header size, calculate frame index
((offset - crate::wal_parser::WAL_HEADER_SIZE as i64) / frame_size as i64) as u32
((offset - super::parser::WAL_HEADER_SIZE as i64) / frame_size as i64) as u32
};

// Process frames
Expand All @@ -175,8 +175,8 @@ impl WalManager {
};

// Return total bytes processed including header if present
if offset == 0 && data.len() >= crate::wal_parser::WAL_HEADER_SIZE {
Ok(crate::wal_parser::WAL_HEADER_SIZE + bytes_processed)
if offset == 0 && data.len() >= super::parser::WAL_HEADER_SIZE {
Ok(super::parser::WAL_HEADER_SIZE + bytes_processed)
} else {
Ok(bytes_processed)
}
Expand All @@ -190,7 +190,7 @@ impl WalManager {
let mut result = vec![0u8; count];

// If the read includes the WAL header
if offset < crate::wal_parser::WAL_HEADER_SIZE as i64 {
if offset < super::parser::WAL_HEADER_SIZE as i64 {
// Get the header
let header = match self.get_wal_header(file_id)? {
Some(h) => h,
Expand All @@ -206,7 +206,7 @@ impl WalManager {
// Calculate how much of the header to copy
let header_start = offset as usize;
let header_count = std::cmp::min(
crate::wal_parser::WAL_HEADER_SIZE - header_start,
super::parser::WAL_HEADER_SIZE - header_start,
count
);

Expand Down
5 changes: 5 additions & 0 deletions packages/common/sqlite-vfs-fdb/src/wal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod manager;
pub mod parser;

pub use manager::WalManager;
pub use parser::{WalFrame, WalHeader, WalParser, WalIterator, WalParseError, FRAME_HEADER_SIZE, WAL_HEADER_SIZE};
2 changes: 1 addition & 1 deletion packages/common/sqlite-vfs-fdb/tests/wal_parser_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io::Read;
use std::path::Path;

use insta::assert_debug_snapshot;
use sqlite_vfs_fdb::wal_parser::{WalFrame, WalIterator, WalParser};
use sqlite_vfs_fdb::wal::{WalFrame, WalIterator, WalParser};

#[test]
fn test_wal_parser_callback() -> Result<(), Box<dyn std::error::Error>> {
Expand Down
Loading