Skip to content

Commit 524738a

Browse files
committed
Add interrupt handle
Exposes the `duckdb_interrupt` function from the C API to allow consumers to interrupt long-running queries from another thread. Inspired by rusqlite: https://docs.rs/rusqlite/latest/rusqlite/struct.InterruptHandle.html
1 parent ef1432f commit 524738a

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

crates/duckdb/src/inner_connection.rs

+49-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
mem,
44
os::raw::c_char,
55
ptr, str,
6+
sync::{Arc, Mutex},
67
};
78

89
use super::{ffi, Appender, Config, Connection, Result};
@@ -15,6 +16,7 @@ use crate::{
1516
pub struct InnerConnection {
1617
pub db: ffi::duckdb_database,
1718
pub con: ffi::duckdb_connection,
19+
interrupt: Arc<InterruptHandle>,
1820
owned: bool,
1921
}
2022

@@ -30,7 +32,14 @@ impl InnerConnection {
3032
Some("connect error".to_owned()),
3133
));
3234
}
33-
Ok(InnerConnection { db, con, owned })
35+
let interrupt = Arc::new(InterruptHandle::new(con));
36+
37+
Ok(InnerConnection {
38+
db,
39+
con,
40+
interrupt,
41+
owned,
42+
})
3443
}
3544

3645
pub fn open_with_flags(c_path: &CStr, config: Config) -> Result<InnerConnection> {
@@ -57,6 +66,7 @@ impl InnerConnection {
5766
unsafe {
5867
ffi::duckdb_disconnect(&mut self.con);
5968
self.con = ptr::null_mut();
69+
self.interrupt.clear();
6070

6171
if self.owned {
6272
ffi::duckdb_close(&mut self.db);
@@ -106,6 +116,10 @@ impl InnerConnection {
106116
Ok(Appender::new(conn, c_app))
107117
}
108118

119+
pub fn get_interrupt_handle(&self) -> Arc<InterruptHandle> {
120+
self.interrupt.clone()
121+
}
122+
109123
#[inline]
110124
pub fn is_autocommit(&self) -> bool {
111125
true
@@ -126,3 +140,37 @@ impl Drop for InnerConnection {
126140
}
127141
}
128142
}
143+
144+
/// A handle that allows interrupting long-running queries.
145+
pub struct InterruptHandle {
146+
conn: Mutex<ffi::duckdb_connection>,
147+
}
148+
149+
unsafe impl Send for InterruptHandle {}
150+
unsafe impl Sync for InterruptHandle {}
151+
152+
impl InterruptHandle {
153+
fn new(conn: ffi::duckdb_connection) -> Self {
154+
Self { conn: Mutex::new(conn) }
155+
}
156+
157+
fn clear(&self) {
158+
*(self.conn.lock().unwrap()) = ptr::null_mut();
159+
}
160+
161+
/// Interrupt the query currently running on the connection this handle was
162+
/// obtained from. The interrupt will cause that query to fail with
163+
/// `Error::DuckDBFailure`. If the connection was dropped after obtaining
164+
/// this interrupt handle, calling this method results in a noop.
165+
///
166+
/// See [`crate::Connection::interrupt_handle`] for an example.
167+
pub fn interrupt(&self) {
168+
let db_handle = self.conn.lock().unwrap();
169+
170+
if !db_handle.is_null() {
171+
unsafe {
172+
ffi::duckdb_interrupt(*db_handle);
173+
}
174+
}
175+
}
176+
}

crates/duckdb/src/lib.rs

+55
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub use crate::{
7979
config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
8080
error::Error,
8181
ffi::ErrorCode,
82+
inner_connection::InterruptHandle,
8283
params::{params_from_iter, Params, ParamsFromIter},
8384
row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
8485
statement::Statement,
@@ -532,6 +533,30 @@ impl Connection {
532533
self.db.borrow_mut().appender(self, table, schema)
533534
}
534535

536+
/// Get a handle to interrupt long-running queries.
537+
///
538+
/// ## Example
539+
///
540+
/// ```rust,no_run
541+
/// # use duckdb::{Connection, Result};
542+
/// fn run_query(conn: Connection) -> Result<()> {
543+
/// let interrupt_handle = conn.interrupt_handle();
544+
/// let join_handle = std::thread::spawn(move || { conn.execute("expensive query", []) });
545+
///
546+
/// // Arbitrary wait for query to start
547+
/// std::thread::sleep(std::time::Duration::from_millis(100));
548+
///
549+
/// interrupt_handle.interrupt();
550+
///
551+
/// let query_result = join_handle.join().unwrap();
552+
/// assert!(query_result.is_err());
553+
///
554+
/// Ok(())
555+
/// }
556+
pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
557+
self.db.borrow().get_interrupt_handle()
558+
}
559+
535560
/// Close the DuckDB connection.
536561
///
537562
/// This is functionally equivalent to the `Drop` implementation for
@@ -1337,6 +1362,36 @@ mod test {
13371362
Ok(())
13381363
}
13391364

1365+
#[test]
1366+
fn test_interrupt() -> Result<()> {
1367+
let db = checked_memory_handle();
1368+
let db_interrupt = db.interrupt_handle();
1369+
1370+
let (tx, rx) = std::sync::mpsc::channel();
1371+
std::thread::spawn(move || {
1372+
let mut stmt = db
1373+
.prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1374+
.unwrap();
1375+
tx.send(stmt.execute([])).unwrap();
1376+
});
1377+
1378+
std::thread::sleep(std::time::Duration::from_millis(100));
1379+
db_interrupt.interrupt();
1380+
1381+
let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1382+
assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1383+
Ok(())
1384+
}
1385+
1386+
#[test]
1387+
fn test_interrupt_on_dropped_db() {
1388+
let db = checked_memory_handle();
1389+
let db_interrupt = db.interrupt_handle();
1390+
1391+
drop(db);
1392+
db_interrupt.interrupt();
1393+
}
1394+
13401395
#[cfg(feature = "bundled")]
13411396
#[test]
13421397
fn test_version() -> Result<()> {

0 commit comments

Comments
 (0)