diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 05f57c1..193fb6c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,19 +21,16 @@ jobs: fail-fast: false matrix: rust: ["stable", "beta", "nightly"] - backend: ["postgres", "mysql"] - os: [ubuntu-latest, macos-latest, windows-latest] + backend: ["postgres", "mysql", "sqlite"] + os: [ubuntu-latest, macos-latest, macos-14, windows-2019] runs-on: ${{ matrix.os }} steps: - name: Checkout sources - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Cache cargo registry - uses: actions/cache@v2 + uses: Swatinem/rust-cache@v2 with: - path: | - ~/.cargo/registry - ~/.cargo/git key: ${{ runner.os }}-${{ matrix.backend }}-cargo-${{ hashFiles('**/Cargo.toml') }} - name: Set environment variables @@ -66,8 +63,44 @@ jobs: mysql -e "create database diesel_test; create database diesel_unit_test; grant all on \`diesel_%\`.* to 'root'@'localhost';" -uroot -proot echo "DATABASE_URL=mysql://root:root@localhost/diesel_test" >> $GITHUB_ENV + - name: Install sqlite (Linux) + if: runner.os == 'Linux' && matrix.backend == 'sqlite' + run: | + curl -fsS --retry 3 -o sqlite-autoconf-3400100.tar.gz https://www.sqlite.org/2022/sqlite-autoconf-3400100.tar.gz + tar zxf sqlite-autoconf-3400100.tar.gz + cd sqlite-autoconf-3400100 + CFLAGS="$CFLAGS -O2 -fno-strict-aliasing \ + -DSQLITE_DEFAULT_FOREIGN_KEYS=1 \ + -DSQLITE_SECURE_DELETE \ + -DSQLITE_ENABLE_COLUMN_METADATA \ + -DSQLITE_ENABLE_FTS3_PARENTHESIS \ + -DSQLITE_ENABLE_RTREE=1 \ + -DSQLITE_SOUNDEX=1 \ + -DSQLITE_ENABLE_UNLOCK_NOTIFY \ + -DSQLITE_OMIT_LOOKASIDE=1 \ + -DSQLITE_ENABLE_DBSTAT_VTAB \ + -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT=1 \ + -DSQLITE_ENABLE_LOAD_EXTENSION \ + -DSQLITE_ENABLE_JSON1 \ + -DSQLITE_LIKE_DOESNT_MATCH_BLOBS \ + -DSQLITE_THREADSAFE=1 \ + -DSQLITE_ENABLE_FTS3_TOKENIZER=1 \ + -DSQLITE_MAX_SCHEMA_RETRY=25 \ + -DSQLITE_ENABLE_PREUPDATE_HOOK \ + -DSQLITE_ENABLE_SESSION \ + -DSQLITE_ENABLE_STMTVTAB \ + -DSQLITE_MAX_VARIABLE_NUMBER=250000" \ + ./configure --prefix=/usr \ + --enable-threadsafe \ + --enable-dynamic-extensions \ + --libdir=/usr/lib/x86_64-linux-gnu \ + --libexecdir=/usr/lib/x86_64-linux-gnu/sqlite3 + sudo make + sudo make install + echo "DATABASE_URL=/tmp/test.db" >> $GITHUB_ENV + - name: Install postgres (MacOS) - if: runner.os == 'macOS' && matrix.backend == 'postgres' + if: matrix.os == 'macos-latest' && matrix.backend == 'postgres' run: | initdb -D /usr/local/var/postgres pg_ctl -D /usr/local/var/postgres start @@ -75,15 +108,41 @@ jobs: createuser -s postgres echo "DATABASE_URL=postgres://postgres@localhost/" >> $GITHUB_ENV + - name: Install postgres (MacOS M1) + if: matrix.os == 'macos-14' && matrix.backend == 'postgres' + run: | + brew install postgresql + brew services start postgresql@14 + sleep 3 + createuser -s postgres + echo "DATABASE_URL=postgres://postgres@localhost/" >> $GITHUB_ENV + - name: Install sqlite (MacOS) + if: runner.os == 'macOS' && matrix.backend == 'sqlite' + run: | + brew install sqlite + echo "DATABASE_URL=/tmp/test.db" >> $GITHUB_ENV + - name: Install mysql (MacOS) - if: runner.os == 'macOS' && matrix.backend == 'mysql' + if: matrix.os == 'macos-latest' && matrix.backend == 'mysql' + run: | + brew install mariadb@11.3 + /usr/local/opt/mariadb@11.3/bin/mysql_install_db + /usr/local/opt/mariadb@11.3/bin/mysql.server start + sleep 3 + /usr/local/opt/mariadb@11.3/bin/mysqladmin -u runner password diesel + /usr/local/opt/mariadb@11.3/bin/mysql -e "create database diesel_test; create database diesel_unit_test; grant all on \`diesel_%\`.* to 'runner'@'localhost';" -urunner + echo "DATABASE_URL=mysql://runner:diesel@localhost/diesel_test" >> $GITHUB_ENV + + - name: Install mysql (MacOS M1) + if: matrix.os == 'macos-14' && matrix.backend == 'mysql' run: | - brew install --overwrite mariadb@10.8 - /usr/local/opt/mariadb@10.8/bin/mysql_install_db - /usr/local/opt/mariadb@10.8/bin/mysql.server start + brew install mariadb@11.3 + ls /opt/homebrew/opt/mariadb@11.3 + /opt/homebrew/opt/mariadb@11.3/bin/mysql_install_db + /opt/homebrew/opt/mariadb@11.3/bin/mysql.server start sleep 3 - /usr/local/opt/mariadb@10.8/bin/mysql -e "ALTER USER 'runner'@'localhost' IDENTIFIED BY 'diesel';" -urunner - /usr/local/opt/mariadb@10.8/bin/mysql -e "create database diesel_test; create database diesel_unit_test; grant all on \`diesel_%\`.* to 'runner'@'localhost';" -urunner -pdiesel + /opt/homebrew/opt/mariadb@11.3/bin/mysqladmin -u runner password diesel + /opt/homebrew/opt/mariadb@11.3/bin/mysql -e "create database diesel_test; create database diesel_unit_test; grant all on \`diesel_%\`.* to 'runner'@'localhost';" -urunner echo "DATABASE_URL=mysql://runner:diesel@localhost/diesel_test" >> $GITHUB_ENV - name: Install postgres (Windows) @@ -106,6 +165,22 @@ jobs: run: | echo "DATABASE_URL=mysql://root@localhost/diesel_test" >> $GITHUB_ENV + - name: Install sqlite (Windows) + if: runner.os == 'Windows' && matrix.backend == 'sqlite' + shell: cmd + run: | + choco install sqlite + cd /D C:\ProgramData\chocolatey\lib\SQLite\tools + call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\vcvars64.bat" + lib /machine:x64 /def:sqlite3.def /out:sqlite3.lib + - name: Set variables for sqlite (Windows) + if: runner.os == 'Windows' && matrix.backend == 'sqlite' + shell: bash + run: | + echo "C:\ProgramData\chocolatey\lib\SQLite\tools" >> $GITHUB_PATH + echo "SQLITE3_LIB_DIR=C:\ProgramData\chocolatey\lib\SQLite\tools" >> $GITHUB_ENV + echo "DATABASE_URL=C:\test.db" >> $GITHUB_ENV + - name: Install rust toolchain uses: dtolnay/rust-toolchain@master with: @@ -115,26 +190,29 @@ jobs: - name: Test diesel_async run: cargo +${{ matrix.rust }} test --manifest-path Cargo.toml --no-default-features --features "${{ matrix.backend }} deadpool bb8 mobc" - - name: Run examples + + - name: Run examples (Postgres) if: matrix.backend == 'postgres' run: | cargo +${{ matrix.rust }} check --manifest-path examples/postgres/pooled-with-rustls/Cargo.toml cargo +${{ matrix.rust }} check --manifest-path examples/postgres/run-pending-migrations-with-rustls/Cargo.toml + - name: Run examples (Sqlite) + if: matrix.backend == 'sqlite' + run: | + cargo +${{ matrix.rust }} check --manifest-path examples/sync-wrapper/Cargo.toml + rustfmt_and_clippy: name: Check rustfmt style && run clippy runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: components: clippy, rustfmt - name: Cache cargo registry - uses: actions/cache@v2 + uses: Swatinem/rust-cache@v2 with: - path: | - ~/.cargo/registry - ~/.cargo/git key: clippy-cargo-${{ hashFiles('**/Cargo.toml') }} - name: Remove potential newer clippy.toml from dependencies @@ -152,7 +230,7 @@ jobs: name: Check Minimal supported rust version (1.65.0) runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@1.65.0 - uses: dtolnay/rust-toolchain@nightly - uses: taiki-e/install-action@cargo-hack @@ -160,4 +238,6 @@ jobs: - name: Check diesel-async # cannot test mysql yet as that crate # has broken min-version dependencies + # cannot test sqlite yet as that crate + # as broken min-version dependencies as well run: cargo +stable minimal-versions check -p diesel-async --features "postgres bb8 deadpool mobc" diff --git a/Cargo.toml b/Cargo.toml index fcacca0..22a4a7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,9 @@ diesel_migrations = "2.1.0" default = [] mysql = ["diesel/mysql_backend", "mysql_async", "mysql_common", "futures-channel", "tokio"] postgres = ["diesel/postgres_backend", "tokio-postgres", "tokio", "tokio/rt"] -async-connection-wrapper = [] +sqlite = ["diesel/sqlite", "sync-connection-wrapper"] +sync-connection-wrapper = ["tokio/rt"] +async-connection-wrapper = ["tokio/net"] r2d2 = ["diesel/r2d2"] [[test]] @@ -47,7 +49,7 @@ path = "tests/lib.rs" harness = true [package.metadata.docs.rs] -features = ["postgres", "mysql", "deadpool", "bb8", "mobc", "async-connection-wrapper", "r2d2"] +features = ["postgres", "mysql", "sqlite", "deadpool", "bb8", "mobc", "async-connection-wrapper", "sync-connection-wrapper", "r2d2"] no-default-features = true rustc-args = ["--cfg", "doc_cfg"] rustdoc-args = ["--cfg", "doc_cfg"] @@ -57,4 +59,8 @@ members = [ ".", "examples/postgres/pooled-with-rustls", "examples/postgres/run-pending-migrations-with-rustls", + "examples/sync-wrapper", ] + +[patch.crates-io] +diesel = { git = "http://github.com/diesel-rs/diesel", rev = "793de72" } diff --git a/examples/sync-wrapper/Cargo.toml b/examples/sync-wrapper/Cargo.toml new file mode 100644 index 0000000..451a73e --- /dev/null +++ b/examples/sync-wrapper/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sync-wrapper" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +diesel = { version = "2.1.0", default-features = false } +diesel-async = { version = "0.4.0", path = "../../", features = ["sync-connection-wrapper", "async-connection-wrapper"] } +diesel_migrations = "2.1.0" +futures-util = "0.3.21" +tokio = { version = "1.2.0", default-features = false, features = ["macros", "rt-multi-thread"] } + +[features] +default = ["sqlite"] +sqlite = ["diesel-async/sqlite"] diff --git a/examples/sync-wrapper/diesel.toml b/examples/sync-wrapper/diesel.toml new file mode 100644 index 0000000..c028f4a --- /dev/null +++ b/examples/sync-wrapper/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId"] + +[migrations_directory] +dir = "migrations" diff --git a/examples/sync-wrapper/migrations/.keep b/examples/sync-wrapper/migrations/.keep new file mode 100644 index 0000000..e69de29 diff --git a/examples/sync-wrapper/migrations/00000000000000_diesel_initial_setup/down.sql b/examples/sync-wrapper/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000..365a210 --- /dev/null +++ b/examples/sync-wrapper/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS users; \ No newline at end of file diff --git a/examples/sync-wrapper/migrations/00000000000000_diesel_initial_setup/up.sql b/examples/sync-wrapper/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000..7599844 --- /dev/null +++ b/examples/sync-wrapper/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,3 @@ +CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT); + +INSERT INTO users(id, name) VALUES(123, 'hello world'); diff --git a/examples/sync-wrapper/src/main.rs b/examples/sync-wrapper/src/main.rs new file mode 100644 index 0000000..dc83486 --- /dev/null +++ b/examples/sync-wrapper/src/main.rs @@ -0,0 +1,89 @@ +use diesel::prelude::*; +use diesel::sqlite::{Sqlite, SqliteConnection}; +use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::sync_connection_wrapper::SyncConnectionWrapper; +use diesel_async::{AsyncConnection, RunQueryDsl, SimpleAsyncConnection}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; + +// ordinary diesel model setup + +table! { + users { + id -> Integer, + name -> Text, + } +} + +#[derive(Debug, Queryable, Selectable)] +#[diesel(table_name = users)] +struct User { + id: i32, + name: String, +} + +const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + +type InnerConnection = SqliteConnection; + +type InnerDB = Sqlite; + +async fn establish(db_url: &str) -> ConnectionResult> { + // It is necessary to specify the specific inner connection type because of inference issues + SyncConnectionWrapper::::establish(db_url).await +} + +async fn run_migrations(async_connection: A) -> Result<(), Box> +where + A: AsyncConnection + 'static, +{ + let mut async_wrapper: AsyncConnectionWrapper = + AsyncConnectionWrapper::from(async_connection); + + tokio::task::spawn_blocking(move || { + async_wrapper.run_pending_migrations(MIGRATIONS).unwrap(); + }) + .await + .map_err(|e| Box::new(e) as Box) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let db_url = std::env::var("DATABASE_URL").expect("Env var `DATABASE_URL` not set"); + + // create an async connection for the migrations + let sync_wrapper: SyncConnectionWrapper = establish(&db_url).await?; + run_migrations(sync_wrapper).await?; + + let mut sync_wrapper: SyncConnectionWrapper = establish(&db_url).await?; + + sync_wrapper.batch_execute("DELETE FROM users").await?; + + sync_wrapper + .batch_execute("INSERT INTO users(id, name) VALUES (3, 'toto')") + .await?; + + let data: Vec = users::table + .select(User::as_select()) + .load(&mut sync_wrapper) + .await?; + println!("{data:?}"); + + diesel::delete(users::table) + .execute(&mut sync_wrapper) + .await?; + + diesel::insert_into(users::table) + .values((users::id.eq(1), users::name.eq("iLuke"))) + .execute(&mut sync_wrapper) + .await?; + + let data: Vec = users::table + .filter(users::id.gt(0)) + .or_filter(users::name.like("%Luke")) + .select(User::as_select()) + .load(&mut sync_wrapper) + .await?; + println!("{data:?}"); + + Ok(()) +} diff --git a/src/async_connection_wrapper.rs b/src/async_connection_wrapper.rs index 0d25550..0ff0899 100644 --- a/src/async_connection_wrapper.rs +++ b/src/async_connection_wrapper.rs @@ -100,13 +100,14 @@ pub type AsyncConnectionWrapper = pub use self::implementation::AsyncConnectionWrapper; mod implementation { - use diesel::connection::SimpleConnection; + use diesel::connection::{Instrumentation, SimpleConnection}; use super::*; pub struct AsyncConnectionWrapper { inner: C, runtime: B, + instrumentation: Option>, } impl From for AsyncConnectionWrapper @@ -118,6 +119,7 @@ mod implementation { Self { inner, runtime: B::get_runtime(), + instrumentation: None, } } } @@ -148,7 +150,11 @@ mod implementation { let runtime = B::get_runtime(); let f = C::establish(database_url); let inner = runtime.block_on(f)?; - Ok(Self { inner, runtime }) + Ok(Self { + inner, + runtime, + instrumentation: None, + }) } fn execute_returning_count(&mut self, source: &T) -> diesel::QueryResult @@ -164,6 +170,14 @@ mod implementation { ) -> &mut >::TransactionStateData{ self.inner.transaction_state() } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.instrumentation = Some(Box::new(instrumentation)); + } } impl diesel::connection::LoadConnection for AsyncConnectionWrapper diff --git a/src/doctest_setup.rs b/src/doctest_setup.rs index b970a0b..38af519 100644 --- a/src/doctest_setup.rs +++ b/src/doctest_setup.rs @@ -160,6 +160,81 @@ cfg_if::cfg_if! { create_tables(&mut connection).await; + connection + } + } else if #[cfg(feature = "sqlite")] { + use diesel_async::sync_connection_wrapper::SyncConnectionWrapper; + use diesel::sqlite::SqliteConnection; + #[allow(dead_code)] + type DB = diesel::sqlite::Sqlite; + #[allow(dead_code)] + type DbConnection = SyncConnectionWrapper; + + fn database_url() -> String { + database_url_from_env("SQLITE_DATABASE_URL") + } + + async fn connection_no_data() -> SyncConnectionWrapper { + use diesel_async::AsyncConnection; + let connection_url = database_url(); + SyncConnectionWrapper::::establish(&connection_url).await.unwrap() + } + + async fn create_tables(connection: &mut SyncConnectionWrapper) { + use diesel_async::RunQueryDsl; + use diesel_async::AsyncConnection; + diesel::sql_query("CREATE TEMPORARY TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL + )").execute(connection).await.unwrap(); + + + diesel::sql_query("CREATE TEMPORARY TABLE IF NOT EXISTS animals ( + id INTEGER PRIMARY KEY, + species TEXT NOT NULL, + legs INTEGER NOT NULL, + name TEXT + )").execute(connection).await.unwrap(); + + diesel::sql_query("CREATE TEMPORARY TABLE IF NOT EXISTS posts ( + id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL, + title TEXT NOT NULL + )").execute(connection).await.unwrap(); + + diesel::sql_query("CREATE TEMPORARY TABLE IF NOT EXISTS comments ( + id INTEGER PRIMARY KEY, + post_id INTEGER NOT NULL, + body TEXT NOT NULL + )").execute(connection).await.unwrap(); + diesel::sql_query("CREATE TEMPORARY TABLE IF NOT EXISTS brands ( + id INTEGER PRIMARY KEY, + color VARCHAR(255) NOT NULL DEFAULT 'Green', + accent VARCHAR(255) DEFAULT 'Blue' + )").execute(connection).await.unwrap(); + + connection.begin_test_transaction().await.unwrap(); + diesel::sql_query("INSERT INTO users (name) VALUES ('Sean'), ('Tess')").execute(connection).await.unwrap(); + diesel::sql_query("INSERT INTO posts (user_id, title) VALUES + (1, 'My first post'), + (1, 'About Rust'), + (2, 'My first post too')").execute(connection).await.unwrap(); + diesel::sql_query("INSERT INTO comments (post_id, body) VALUES + (1, 'Great post'), + (2, 'Yay! I am learning Rust'), + (3, 'I enjoyed your post')").execute(connection).await.unwrap(); + diesel::sql_query("INSERT INTO animals (species, legs, name) VALUES + ('dog', 4, 'Jack'), + ('spider', 8, null)").execute(connection).await.unwrap(); + + } + + #[allow(dead_code)] + async fn establish_connection() -> SyncConnectionWrapper { + let mut connection = connection_no_data().await; + create_tables(&mut connection).await; + + connection } } else { diff --git a/src/lib.rs b/src/lib.rs index a7d521a..b7d75f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,11 +14,12 @@ //! //! These traits closely mirror their diesel counter parts while providing async functionality. //! -//! In addition to these core traits 2 fully async connection implementations are provided +//! In addition to these core traits 3 fully async connection implementations are provided //! by diesel-async: //! //! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature) //! * [`AsyncPgConnection`] (enabled by the `postgres` feature) +//! * [`SyncConnectionWrapper`] (enabled by the `sync-connection-wrapper` feature) //! //! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait //! method calls and connections with their async counterparts. @@ -92,7 +93,10 @@ pub mod pg; ))] pub mod pooled_connection; mod run_query_dsl; +#[cfg(any(feature = "postgres", feature = "mysql"))] mod stmt_cache; +#[cfg(feature = "sync-connection-wrapper")] +pub mod sync_connection_wrapper; mod transaction_manager; #[cfg(feature = "mysql")] diff --git a/src/pooled_connection/bb8.rs b/src/pooled_connection/bb8.rs index 28ee7a6..bb994b0 100644 --- a/src/pooled_connection/bb8.rs +++ b/src/pooled_connection/bb8.rs @@ -27,6 +27,13 @@ //! # config //! # } //! # +//! # #[cfg(feature = "sqlite")] +//! # fn get_config() -> AsyncDieselConnectionManager> { +//! # let db_url = database_url_from_env("SQLITE_DATABASE_URL"); +//! # let config = AsyncDieselConnectionManager::>::new(db_url); +//! # config +//! # } +//! # //! # async fn run_test() -> Result<(), Box> { //! # use schema::users::dsl::*; //! # let config = get_config(); diff --git a/src/pooled_connection/deadpool.rs b/src/pooled_connection/deadpool.rs index 4c48efe..33b843f 100644 --- a/src/pooled_connection/deadpool.rs +++ b/src/pooled_connection/deadpool.rs @@ -27,6 +27,13 @@ //! # config //! # } //! # +//! # #[cfg(feature = "sqlite")] +//! # fn get_config() -> AsyncDieselConnectionManager> { +//! # let db_url = database_url_from_env("SQLITE_DATABASE_URL"); +//! # let config = AsyncDieselConnectionManager::>::new(db_url); +//! # config +//! # } +//! # //! # async fn run_test() -> Result<(), Box> { //! # use schema::users::dsl::*; //! # let config = get_config(); diff --git a/src/pooled_connection/mobc.rs b/src/pooled_connection/mobc.rs index 24f51db..a6c8652 100644 --- a/src/pooled_connection/mobc.rs +++ b/src/pooled_connection/mobc.rs @@ -27,6 +27,13 @@ //! # config //! # } //! # +//! # #[cfg(feature = "sqlite")] +//! # fn get_config() -> AsyncDieselConnectionManager> { +//! # let db_url = database_url_from_env("SQLITE_DATABASE_URL"); +//! # let config = AsyncDieselConnectionManager::>::new(db_url); +//! # config +//! # } +//! # //! # async fn run_test() -> Result<(), Box> { //! # use schema::users::dsl::*; //! # let config = get_config(); diff --git a/src/run_query_dsl/mod.rs b/src/run_query_dsl/mod.rs index 6e12f02..f3767ee 100644 --- a/src/run_query_dsl/mod.rs +++ b/src/run_query_dsl/mod.rs @@ -208,10 +208,12 @@ pub trait RunQueryDsl: Sized { /// .await?; /// assert_eq!(1, inserted_rows); /// + /// # #[cfg(not(feature = "sqlite"))] /// let inserted_rows = insert_into(users) /// .values(&vec![name.eq("Jim"), name.eq("James")]) /// .execute(connection) /// .await?; + /// # #[cfg(not(feature = "sqlite"))] /// assert_eq!(2, inserted_rows); /// # Ok(()) /// # } @@ -604,10 +606,12 @@ pub trait RunQueryDsl: Sized { /// # async fn run_test() -> QueryResult<()> { /// # use schema::users::dsl::*; /// # let connection = &mut establish_connection().await; - /// diesel::insert_into(users) - /// .values(&vec![name.eq("Sean"), name.eq("Pascal")]) - /// .execute(connection) - /// .await?; + /// for n in &["Sean", "Pascal"] { + /// diesel::insert_into(users) + /// .values(name.eq(n)) + /// .execute(connection) + /// .await?; + /// } /// /// let first_name = users.order(id) /// .select(name) @@ -678,6 +682,7 @@ impl RunQueryDsl for T {} /// # use self::animals::dsl::*; /// # let connection = &mut establish_connection().await; /// let form = AnimalForm { id: 2, name: "Super scary" }; +/// # #[cfg(not(feature = "sqlite"))] /// let changed_animal = form.save_changes(connection).await?; /// let expected_animal = Animal { /// id: 2, @@ -685,6 +690,7 @@ impl RunQueryDsl for T {} /// legs: 8, /// name: Some(String::from("Super scary")), /// }; +/// # #[cfg(not(feature = "sqlite"))] /// assert_eq!(expected_animal, changed_animal); /// # Ok(()) /// # } diff --git a/src/sync_connection_wrapper.rs b/src/sync_connection_wrapper.rs new file mode 100644 index 0000000..ba40086 --- /dev/null +++ b/src/sync_connection_wrapper.rs @@ -0,0 +1,301 @@ +//! This module contains a wrapper type +//! that provides a [`crate::AsyncConnection`] +//! implementation for types that implement +//! [`diesel::Connection`]. Using this type +//! might be useful for the following usecases: +//! +//! * using a sync Connection implementation in async context +//! * using the same code base for async crates needing multiple backends + +use crate::{AsyncConnection, SimpleAsyncConnection, TransactionManager}; +use diesel::backend::{Backend, DieselReserveSpecialization}; +use diesel::connection::{ + Connection, LoadConnection, TransactionManagerStatus, WithMetadataLookup, +}; +use diesel::query_builder::{ + AsQuery, CollectedQuery, MoveableBindCollector, QueryBuilder, QueryFragment, QueryId, +}; +use diesel::row::IntoOwnedRow; +use diesel::{ConnectionResult, QueryResult}; +use futures_util::future::BoxFuture; +use futures_util::stream::BoxStream; +use futures_util::{FutureExt, StreamExt, TryFutureExt}; +use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinError; + +fn from_tokio_join_error(join_error: JoinError) -> diesel::result::Error { + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UnableToSendCommand, + Box::new(join_error.to_string()), + ) +} + +/// A wrapper of a [`diesel::connection::Connection`] usable in async context. +/// +/// It implements AsyncConnection if [`diesel::connection::Connection`] fullfils requirements: +/// * it's a [`diesel::connection::LoadConnection`] +/// * its [`diesel::connection::Connection::Backend`] has a [`diesel::query_builder::BindCollector`] implementing [`diesel::query_builder::MoveableBindCollector`] +/// * its [`diesel::connection::LoadConnection::Row`] implements [`diesel::row::IntoOwnedRow`] +/// +/// Internally this wrapper type will use `spawn_blocking` on tokio +/// to execute the request on the inner connection. This implies a +/// dependency on tokio and that the runtime is running. +/// +/// Note that only SQLite is supported at the moment. +/// +/// # Examples +/// +/// ```rust +/// # include!("doctest_setup.rs"); +/// use diesel_async::RunQueryDsl; +/// use schema::users; +/// +/// async fn some_async_fn() { +/// # let database_url = database_url(); +/// use diesel_async::AsyncConnection; +/// use diesel::sqlite::SqliteConnection; +/// let mut conn = +/// SyncConnectionWrapper::::establish(&database_url).await.unwrap(); +/// # create_tables(&mut conn).await; +/// +/// let all_users = users::table.load::<(i32, String)>(&mut conn).await.unwrap(); +/// # assert_eq!(all_users.len(), 2); +/// } +/// +/// # #[cfg(feature = "sqlite")] +/// # #[tokio::main] +/// # async fn main() { +/// # some_async_fn().await; +/// # } +/// ``` +pub struct SyncConnectionWrapper { + inner: Arc>, +} + +#[async_trait::async_trait] +impl SimpleAsyncConnection for SyncConnectionWrapper +where + C: diesel::connection::Connection + 'static, +{ + async fn batch_execute(&mut self, query: &str) -> QueryResult<()> { + let query = query.to_string(); + self.spawn_blocking(move |inner| inner.batch_execute(query.as_str())) + .await + } +} + +#[async_trait::async_trait] +impl AsyncConnection for SyncConnectionWrapper +where + // Backend bounds + ::Backend: std::default::Default + DieselReserveSpecialization, + ::QueryBuilder: std::default::Default, + // Connection bounds + C: Connection + LoadConnection + WithMetadataLookup + 'static, + ::TransactionManager: Send, + // BindCollector bounds + MD: Send + 'static, + for<'a> ::BindCollector<'a>: + MoveableBindCollector + std::default::Default, + // Row bounds + O: 'static + Send + for<'conn> diesel::row::Row<'conn, C::Backend>, + for<'conn, 'query> ::Row<'conn, 'query>: + IntoOwnedRow<'conn, ::Backend, OwnedRow = O>, +{ + type LoadFuture<'conn, 'query> = BoxFuture<'query, QueryResult>>; + type ExecuteFuture<'conn, 'query> = BoxFuture<'query, QueryResult>; + type Stream<'conn, 'query> = BoxStream<'static, QueryResult>>; + type Row<'conn, 'query> = O; + type Backend = ::Backend; + type TransactionManager = SyncTransactionManagerWrapper<::TransactionManager>; + + async fn establish(database_url: &str) -> ConnectionResult { + let database_url = database_url.to_string(); + tokio::task::spawn_blocking(move || C::establish(&database_url)) + .await + .unwrap_or_else(|e| Err(diesel::ConnectionError::BadConnection(e.to_string()))) + .map(|c| SyncConnectionWrapper::new(c)) + } + + fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query> + where + T: AsQuery + 'query, + T::Query: QueryFragment + QueryId + 'query, + { + self.execute_with_prepared_query(source.as_query(), |conn, query| { + use diesel::row::IntoOwnedRow; + conn.load(&query).map(|c| { + c.map(|row| row.map(IntoOwnedRow::into_owned)) + .collect::>>() + }) + }) + .map_ok(|rows| futures_util::stream::iter(rows).boxed()) + .boxed() + } + + fn execute_returning_count<'conn, 'query, T>( + &'conn mut self, + source: T, + ) -> Self::ExecuteFuture<'conn, 'query> + where + T: QueryFragment + QueryId, + { + self.execute_with_prepared_query(source, |conn, query| conn.execute_returning_count(&query)) + } + + fn transaction_state( + &mut self, + ) -> &mut >::TransactionStateData { + self.exclusive_connection().transaction_state() + } +} + +/// A wrapper of a diesel transaction manager usable in async context. +pub struct SyncTransactionManagerWrapper(PhantomData); + +#[async_trait::async_trait] +impl TransactionManager> for SyncTransactionManagerWrapper +where + SyncConnectionWrapper: AsyncConnection, + C: Connection + 'static, + T: diesel::connection::TransactionManager + Send, +{ + type TransactionStateData = T::TransactionStateData; + + async fn begin_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { + conn.spawn_blocking(move |inner| T::begin_transaction(inner)) + .await + } + + async fn commit_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { + conn.spawn_blocking(move |inner| T::commit_transaction(inner)) + .await + } + + async fn rollback_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { + conn.spawn_blocking(move |inner| T::rollback_transaction(inner)) + .await + } + + fn transaction_manager_status_mut( + conn: &mut SyncConnectionWrapper, + ) -> &mut TransactionManagerStatus { + T::transaction_manager_status_mut(conn.exclusive_connection()) + } +} + +impl SyncConnectionWrapper { + /// Builds a wrapper with this underlying sync connection + pub fn new(connection: C) -> Self + where + C: Connection, + { + SyncConnectionWrapper { + inner: Arc::new(Mutex::new(connection)), + } + } + + pub(self) fn spawn_blocking<'a, R>( + &mut self, + task: impl FnOnce(&mut C) -> QueryResult + Send + 'static, + ) -> BoxFuture<'a, QueryResult> + where + C: Connection + 'static, + R: Send + 'static, + { + let inner = self.inner.clone(); + tokio::task::spawn_blocking(move || { + let mut inner = inner + .lock() + .expect("Mutex is poisoned, a thread must have panicked holding it."); + task(&mut inner) + }) + .unwrap_or_else(|err| QueryResult::Err(from_tokio_join_error(err))) + .boxed() + } + + fn execute_with_prepared_query<'a, MD, Q, R>( + &mut self, + query: Q, + callback: impl FnOnce(&mut C, &CollectedQuery) -> QueryResult + Send + 'static, + ) -> BoxFuture<'a, QueryResult> + where + // Backend bounds + ::Backend: std::default::Default + DieselReserveSpecialization, + ::QueryBuilder: std::default::Default, + // Connection bounds + C: Connection + LoadConnection + WithMetadataLookup + 'static, + ::TransactionManager: Send, + // BindCollector bounds + MD: Send + 'static, + for<'b> ::BindCollector<'b>: + MoveableBindCollector + std::default::Default, + // Arguments/Return bounds + Q: QueryFragment + QueryId, + R: Send + 'static, + { + let backend = C::Backend::default(); + + let (collect_bind_result, collector_data) = { + let exclusive = self.inner.clone(); + let mut inner = exclusive + .lock() + .expect("Mutex is poisoned, a thread must have panicked holding it."); + let mut bind_collector = + <::BindCollector<'_> as Default>::default(); + let metadata_lookup = inner.metadata_lookup(); + let result = query.collect_binds(&mut bind_collector, metadata_lookup, &backend); + let collector_data = bind_collector.moveable(); + + (result, collector_data) + }; + + let mut query_builder = <::QueryBuilder as Default>::default(); + let sql = query + .to_sql(&mut query_builder, &backend) + .map(|_| query_builder.finish()); + let is_safe_to_cache_prepared = query.is_safe_to_cache_prepared(&backend); + + self.spawn_blocking(|inner| { + collect_bind_result?; + let query = CollectedQuery::new(sql?, is_safe_to_cache_prepared?, collector_data); + callback(inner, &query) + }) + } + + /// Gets an exclusive access to the underlying diesel Connection + /// + /// It panics in case of shared access. + /// This is typically used only used during transaction. + pub(self) fn exclusive_connection(&mut self) -> &mut C + where + C: Connection, + { + // there should be no other pending future when this is called + // that means there is only one instance of this Arc and + // we can simply access the inner data + if let Some(conn_mutex) = Arc::get_mut(&mut self.inner) { + conn_mutex + .get_mut() + .expect("Mutex is poisoned, a thread must have panicked holding it.") + } else { + panic!("Cannot access shared transaction state") + } + } +} + +#[cfg(any( + feature = "deadpool", + feature = "bb8", + feature = "mobc", + feature = "r2d2" +))] +impl crate::pooled_connection::PoolableConnection for SyncConnectionWrapper +where + Self: AsyncConnection, +{ + fn is_broken(&mut self) -> bool { + Self::TransactionManager::is_broken_transaction_manager(self) + } +} diff --git a/tests/lib.rs b/tests/lib.rs index 5601234..e65c10e 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -93,6 +93,9 @@ struct User { type TestConnection = AsyncMysqlConnection; #[cfg(feature = "postgres")] type TestConnection = AsyncPgConnection; +#[cfg(feature = "sqlite")] +type TestConnection = + sync_connection_wrapper::SyncConnectionWrapper; #[allow(dead_code)] type TestBackend = ::Backend; @@ -100,11 +103,17 @@ type TestBackend = ::Backend; #[tokio::test] async fn test_basic_insert_and_load() -> QueryResult<()> { let conn = &mut connection().await; + // Insertion split into 2 since Sqlite batch insert isn't supported for diesel_async yet let res = diesel::insert_into(users::table) - .values([users::name.eq("John Doe"), users::name.eq("Jane Doe")]) + .values(users::name.eq("John Doe")) .execute(conn) .await; - assert_eq!(res, Ok(2), "User count does not match"); + assert_eq!(res, Ok(1), "User count does not match"); + let res = diesel::insert_into(users::table) + .values(users::name.eq("Jane Doe")) + .execute(conn) + .await; + assert_eq!(res, Ok(1), "User count does not match"); let users = users::table.load::(conn).await?; assert_eq!(&users[0].name, "John Doe", "User name [0] does not match"); assert_eq!(&users[1].name, "Jane Doe", "User name [1] does not match"); @@ -128,7 +137,7 @@ async fn setup(connection: &mut TestConnection) { } #[cfg(feature = "postgres")] -diesel::sql_function!(fn pg_sleep(interval: diesel::sql_types::Double)); +diesel::define_sql_function!(fn pg_sleep(interval: diesel::sql_types::Double)); #[cfg(feature = "postgres")] #[tokio::test] @@ -179,6 +188,19 @@ async fn setup(connection: &mut TestConnection) { .unwrap(); } +#[cfg(feature = "sqlite")] +async fn setup(connection: &mut TestConnection) { + diesel::sql_query( + "CREATE TEMPORARY TABLE users ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL + )", + ) + .execute(connection) + .await + .unwrap(); +} + async fn connection() -> TestConnection { let db_url = std::env::var("DATABASE_URL").unwrap(); let mut conn = TestConnection::establish(&db_url).await.unwrap(); @@ -187,7 +209,7 @@ async fn connection() -> TestConnection { conn.begin_test_transaction().await.unwrap(); } setup(&mut conn).await; - if cfg!(feature = "mysql") { + if cfg!(feature = "mysql") || cfg!(feature = "sqlite") { // mysql does not allow this and does even automatically close // any open transaction. As of this we open a transaction **after** // we setup the schema diff --git a/tests/pooling.rs b/tests/pooling.rs index b748e99..e129a48 100644 --- a/tests/pooling.rs +++ b/tests/pooling.rs @@ -1,6 +1,8 @@ use super::{users, User}; use diesel::prelude::*; -use diesel_async::{RunQueryDsl, SaveChangesDsl}; +use diesel_async::RunQueryDsl; +#[cfg(not(feature = "sqlite"))] +use diesel_async::SaveChangesDsl; #[tokio::test] #[cfg(feature = "bb8")] @@ -23,13 +25,17 @@ async fn save_changes_bb8() { .await .unwrap(); - let mut u = users::table.first::(&mut conn).await.unwrap(); + let u = users::table.first::(&mut conn).await.unwrap(); assert_eq!(u.name, "John"); - u.name = "Jane".into(); - let u2: User = u.save_changes(&mut conn).await.unwrap(); + #[cfg(not(feature = "sqlite"))] + { + let mut u = u; + u.name = "Jane".into(); + let u2: User = u.save_changes(&mut conn).await.unwrap(); - assert_eq!(u2.name, "Jane"); + assert_eq!(u2.name, "Jane"); + } } #[tokio::test] @@ -53,13 +59,17 @@ async fn save_changes_deadpool() { .await .unwrap(); - let mut u = users::table.first::(&mut conn).await.unwrap(); + let u = users::table.first::(&mut conn).await.unwrap(); assert_eq!(u.name, "John"); - u.name = "Jane".into(); - let u2: User = u.save_changes(&mut conn).await.unwrap(); + #[cfg(not(feature = "sqlite"))] + { + let mut u = u; + u.name = "Jane".into(); + let u2: User = u.save_changes(&mut conn).await.unwrap(); - assert_eq!(u2.name, "Jane"); + assert_eq!(u2.name, "Jane"); + } } #[tokio::test] @@ -83,11 +93,15 @@ async fn save_changes_mobc() { .await .unwrap(); - let mut u = users::table.first::(&mut conn).await.unwrap(); + let u = users::table.first::(&mut conn).await.unwrap(); assert_eq!(u.name, "John"); - u.name = "Jane".into(); - let u2: User = u.save_changes(&mut conn).await.unwrap(); + #[cfg(not(feature = "sqlite"))] + { + let mut u = u; + u.name = "Jane".into(); + let u2: User = u.save_changes(&mut conn).await.unwrap(); - assert_eq!(u2.name, "Jane"); + assert_eq!(u2.name, "Jane"); + } }