Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# Regression test output
/results/
test/regression/results/
test/regression/regression.conf.gen
test/regression/tmp_uuid_test/
regression.diffs
regression.out
tmp_check/
Expand Down
107 changes: 38 additions & 69 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4", features = ["derive", "env"] }
tokio = { version = "1", features = ["rt", "time", "sync", "macros"] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
duckdb = { version = "=1.4.3" }
duckdb = { version = "=1.10500.0" }
pgwire-replication = { version = "0.2", features = ["tls-rustls"] }
futures-util = "0.3"
rustls = { version = "0.23", default-features = false, features = ["ring"] }
Expand Down
7 changes: 7 additions & 0 deletions duckpipe-core/src/duckdb_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,13 @@ impl FlushWorker {
}
}

impl Drop for FlushWorker {
fn drop(&mut self) {
// DETACH DuckLake before closing the connection to ensure clean state.
let _ = self.db.execute_batch("DETACH lake;");
}
}

/// Result of a DuckDB-based flush.
#[derive(Debug)]
pub struct DuckDbFlushResult {
Expand Down
25 changes: 16 additions & 9 deletions duckpipe-core/src/flush_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl FlushCoordinator {
let (tx, rx) = mpsc::channel();
let max_queued = resolved_config.max_queued_changes as i64;
let max_concurrent = resolved_config.max_concurrent_flushes as usize;
let flush_interval = Duration::from_millis(resolved_config.flush_interval_ms as u64);
FlushCoordinator {
pg_connstr,
ducklake_schema,
Expand All @@ -362,7 +363,7 @@ impl FlushCoordinator {
per_table_flush_duration: HashMap::new(),
per_table_avg_row_bytes: HashMap::new(),
target_to_mapping: HashMap::new(),
flush_gate: Arc::new(FlushGate::new(max_concurrent, Duration::from_millis(5000))),
flush_gate: Arc::new(FlushGate::new(max_concurrent, flush_interval)),
}
}

Expand Down Expand Up @@ -508,6 +509,9 @@ impl FlushCoordinator {

/// Push a change into the shared queue for the given target table.
/// The target queue must have been created via `ensure_queue()`.
///
/// The flush thread wakes on its own via `drain_poll_ms` condvar timeout,
/// so no explicit notification is needed after pushing changes.
pub fn push_change(&self, target_key: &str, change: Change) {
if let Some(entry) = self.threads.get(target_key) {
let mut guard = entry.queue_handle.inner.lock().unwrap();
Expand All @@ -523,7 +527,6 @@ impl FlushCoordinator {
.snapshot_queued
.fetch_add(1, Ordering::Relaxed);
}
entry.queue_handle.condvar.notify_one();
}
}

Expand Down Expand Up @@ -944,18 +947,22 @@ fn flush_thread_main(
let mut next_seq: i32 = 0; // monotonic counter for _seq across appends
let mut last_flush = Instant::now();
let flush_interval = Duration::from_millis(flush_interval_ms);
let drain_poll = Duration::from_millis(resolved_config.drain_poll_ms as u64);

loop {
// Calculate remaining time until next time-based flush
// Calculate wait_timeout: how long the condvar blocks before waking.
// drain_poll controls how frequently the thread checks for new data (cheap).
// flush_interval controls how often data is written to DuckLake (expensive).
let elapsed = last_flush.elapsed();
let wait_timeout = if buffered_count == 0 {
// No buffered changes — wait for full interval
flush_interval
} else if elapsed >= flush_interval {
// Already past interval — don't wait
let wait_timeout = if buffered_count > 0 && elapsed >= flush_interval {
// Time to flush — don't wait
Duration::ZERO
} else if buffered_count > 0 {
// Data buffered but not yet time to flush — drain fast, flush on time
drain_poll.min(flush_interval - elapsed)
} else {
flush_interval - elapsed
// Idle: just check for new data at the drain poll rate
drain_poll
};

// Lock shared queue, drain new changes and check signals
Expand Down
21 changes: 19 additions & 2 deletions duckpipe-core/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ use std::pin::pin;
use std::time::Instant;

use duckdb::{Config, Connection};

/// Guard that DETACHes the DuckLake database on drop.
struct DetachOnDrop(Connection);

impl Drop for DetachOnDrop {
fn drop(&mut self) {
let _ = self.0.execute_batch("DETACH lake;");
}
}

impl std::ops::Deref for DetachOnDrop {
type Target = Connection;
fn deref(&self) -> &Connection {
&self.0
}
}
use futures_util::StreamExt;

use crate::types::{format_lsn, parse_lsn};
Expand Down Expand Up @@ -364,8 +380,9 @@ fn run_duckdb_consumer(
let config = Config::default()
.allow_unsigned_extensions()
.map_err(|e| format!("duckdb config: {}", e))?;
let db =
Connection::open_in_memory_with_flags(config).map_err(|e| format!("duckdb open: {}", e))?;
let db = DetachOnDrop(
Connection::open_in_memory_with_flags(config).map_err(|e| format!("duckdb open: {}", e))?,
);

db.execute_batch("INSTALL ducklake; LOAD ducklake;")
.map_err(|e| format!("duckdb install ducklake: {}", e))?;
Expand Down
Loading
Loading