Skip to content

Commit cce865f

Browse files
committed
Add interrupt handle
Exposes the `duckdb_interrupt` function from the C API to allow consumers to interrupt long-running queries from another thread. Inspired by rusqlite: https://docs.rs/rusqlite/latest/rusqlite/struct.InterruptHandle.html
1 parent ef1432f commit cce865f

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

crates/duckdb/src/inner_connection.rs

+46-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
mem,
44
os::raw::c_char,
55
ptr, str,
6+
sync::{Arc, Mutex}
67
};
78

89
use super::{ffi, Appender, Config, Connection, Result};
@@ -15,6 +16,7 @@ use crate::{
1516
pub struct InnerConnection {
1617
pub db: ffi::duckdb_database,
1718
pub con: ffi::duckdb_connection,
19+
interrupt: Arc<InterruptHandle>,
1820
owned: bool,
1921
}
2022

@@ -30,7 +32,9 @@ impl InnerConnection {
3032
Some("connect error".to_owned()),
3133
));
3234
}
33-
Ok(InnerConnection { db, con, owned })
35+
let interrupt = Arc::new(InterruptHandle::new(con));
36+
37+
Ok(InnerConnection { db, con, interrupt, owned })
3438
}
3539

3640
pub fn open_with_flags(c_path: &CStr, config: Config) -> Result<InnerConnection> {
@@ -57,6 +61,7 @@ impl InnerConnection {
5761
unsafe {
5862
ffi::duckdb_disconnect(&mut self.con);
5963
self.con = ptr::null_mut();
64+
self.interrupt.clear();
6065

6166
if self.owned {
6267
ffi::duckdb_close(&mut self.db);
@@ -106,6 +111,10 @@ impl InnerConnection {
106111
Ok(Appender::new(conn, c_app))
107112
}
108113

114+
pub fn get_interrupt_handle(&self) -> Arc<InterruptHandle> {
115+
self.interrupt.clone()
116+
}
117+
109118
#[inline]
110119
pub fn is_autocommit(&self) -> bool {
111120
true
@@ -126,3 +135,39 @@ impl Drop for InnerConnection {
126135
}
127136
}
128137
}
138+
139+
/// A handle that allows interrupting long-running queries.
140+
pub struct InterruptHandle {
141+
conn: Mutex<ffi::duckdb_connection>,
142+
}
143+
144+
unsafe impl Send for InterruptHandle {}
145+
unsafe impl Sync for InterruptHandle {}
146+
147+
impl InterruptHandle {
148+
fn new(conn: ffi::duckdb_connection) -> Self {
149+
Self {
150+
conn: Mutex::new(conn),
151+
}
152+
}
153+
154+
fn clear(&self) {
155+
*(self.conn.lock().unwrap()) = ptr::null_mut();
156+
}
157+
158+
/// Interrupt the query currently running on the connection this handle was
159+
/// obtained from. The interrupt will cause that query to fail with
160+
/// `Error::DuckDBFailure`. If the connection was dropped after obtaining
161+
/// this interrupt handle, calling this method results in a noop.
162+
///
163+
/// See [`crate::Connection::interrupt_handle`] for an example.
164+
pub fn interrupt(&self) {
165+
let db_handle = self.conn.lock().unwrap();
166+
167+
if !db_handle.is_null() {
168+
unsafe {
169+
ffi::duckdb_interrupt(*db_handle);
170+
}
171+
}
172+
}
173+
}

crates/duckdb/src/lib.rs

+53
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub use crate::{
7979
config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
8080
error::Error,
8181
ffi::ErrorCode,
82+
inner_connection::InterruptHandle,
8283
params::{params_from_iter, Params, ParamsFromIter},
8384
row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
8485
statement::Statement,
@@ -532,6 +533,30 @@ impl Connection {
532533
self.db.borrow_mut().appender(self, table, schema)
533534
}
534535

536+
/// Get a handle to interrupt long-running queries.
537+
///
538+
/// ## Example
539+
///
540+
/// ```rust,no_run
541+
/// # use duckdb::{Connection, Result};
542+
/// fn run_query(conn: Connection) -> Result<()> {
543+
/// let interrupt_handle = conn.interrupt_handle();
544+
/// let join_handle = std::thread::spawn(move || { conn.execute("expensive query", []) });
545+
///
546+
/// // Arbitrary wait for query to start
547+
/// std::thread::sleep(std::time::Duration::from_millis(100));
548+
///
549+
/// interrupt_handle.interrupt();
550+
///
551+
/// let query_result = join_handle.join().unwrap();
552+
/// assert!(query_result.is_err());
553+
///
554+
/// Ok(())
555+
/// }
556+
pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
557+
self.db.borrow().get_interrupt_handle()
558+
}
559+
535560
/// Close the DuckDB connection.
536561
///
537562
/// This is functionally equivalent to the `Drop` implementation for
@@ -1337,6 +1362,34 @@ mod test {
13371362
Ok(())
13381363
}
13391364

1365+
#[test]
1366+
fn test_interrupt() -> Result<()> {
1367+
let db = checked_memory_handle();
1368+
let db_interrupt = db.interrupt_handle();
1369+
1370+
let (tx, rx) = std::sync::mpsc::channel();
1371+
std::thread::spawn(move || {
1372+
let mut stmt = db.prepare("select count(*) from range(10000000) t1, range(1000000) t2").unwrap();
1373+
tx.send(stmt.execute([])).unwrap();
1374+
});
1375+
1376+
std::thread::sleep(std::time::Duration::from_millis(100));
1377+
db_interrupt.interrupt();
1378+
1379+
let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1380+
assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1381+
Ok(())
1382+
}
1383+
1384+
#[test]
1385+
fn test_interrupt_on_dropped_db() {
1386+
let db = checked_memory_handle();
1387+
let db_interrupt = db.interrupt_handle();
1388+
1389+
drop(db);
1390+
db_interrupt.interrupt();
1391+
}
1392+
13401393
#[cfg(feature = "bundled")]
13411394
#[test]
13421395
fn test_version() -> Result<()> {

0 commit comments

Comments
 (0)