76
76
use diesel:: backend:: Backend ;
77
77
use diesel:: connection:: { CacheSize , Instrumentation } ;
78
78
use diesel:: query_builder:: { AsQuery , QueryFragment , QueryId } ;
79
- use diesel:: result:: Error ;
80
79
use diesel:: row:: Row ;
81
80
use diesel:: { ConnectionResult , QueryResult } ;
82
- use futures_util:: { Future , Stream } ;
81
+ use futures_util:: future:: BoxFuture ;
82
+ use futures_util:: { Future , FutureExt , Stream } ;
83
83
use std:: fmt:: Debug ;
84
84
85
85
pub use scoped_futures;
@@ -115,21 +115,19 @@ pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
115
115
/// Perform simple operations on a backend.
116
116
///
117
117
/// You should likely use [`AsyncConnection`] instead.
118
- #[ async_trait:: async_trait]
119
118
pub trait SimpleAsyncConnection {
120
119
/// Execute multiple SQL statements within the same string.
121
120
///
122
121
/// This function is used to execute migrations,
123
122
/// which may contain more than one SQL statement.
124
- async fn batch_execute ( & mut self , query : & str ) -> QueryResult < ( ) > ;
123
+ fn batch_execute ( & mut self , query : & str ) -> impl Future < Output = QueryResult < ( ) > > + Send ;
125
124
}
126
125
127
126
/// An async connection to a database
128
127
///
129
128
/// This trait represents a n async database connection. It can be used to query the database through
130
129
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
131
130
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
132
- #[ async_trait:: async_trait]
133
131
pub trait AsyncConnection : SimpleAsyncConnection + Sized + Send {
134
132
/// The future returned by `AsyncConnection::execute`
135
133
type ExecuteFuture < ' conn , ' query > : Future < Output = QueryResult < usize > > + Send ;
@@ -151,7 +149,7 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
151
149
/// The argument to this method and the method's behavior varies by backend.
152
150
/// See the documentation for that backend's connection class
153
151
/// for details about what it accepts and how it behaves.
154
- async fn establish ( database_url : & str ) -> ConnectionResult < Self > ;
152
+ fn establish ( database_url : & str ) -> impl Future < Output = ConnectionResult < Self > > + Send ;
155
153
156
154
/// Executes the given function inside of a database transaction
157
155
///
@@ -230,34 +228,44 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
230
228
/// # Ok(())
231
229
/// # }
232
230
/// ```
233
- async fn transaction < ' a , R , E , F > ( & mut self , callback : F ) -> Result < R , E >
231
+ fn transaction < ' a , ' conn , R , E , F > (
232
+ & ' conn mut self ,
233
+ callback : F ,
234
+ ) -> BoxFuture < ' conn , Result < R , E > >
235
+ // we cannot use `impl Trait` here due to bugs in rustc
236
+ // https://github.com/rust-lang/rust/issues/100013
237
+ //impl Future<Output = Result<R, E>> + Send + 'async_trait
234
238
where
235
239
F : for < ' r > FnOnce ( & ' r mut Self ) -> ScopedBoxFuture < ' a , ' r , Result < R , E > > + Send + ' a ,
236
240
E : From < diesel:: result:: Error > + Send + ' a ,
237
241
R : Send + ' a ,
242
+ ' a : ' conn ,
238
243
{
239
- Self :: TransactionManager :: transaction ( self , callback) . await
244
+ Self :: TransactionManager :: transaction ( self , callback) . boxed ( )
240
245
}
241
246
242
247
/// Creates a transaction that will never be committed. This is useful for
243
248
/// tests. Panics if called while inside of a transaction or
244
249
/// if called with a connection containing a broken transaction
245
- async fn begin_test_transaction ( & mut self ) -> QueryResult < ( ) > {
250
+ fn begin_test_transaction ( & mut self ) -> impl Future < Output = QueryResult < ( ) > > + Send {
246
251
use diesel:: connection:: TransactionManagerStatus ;
247
252
248
- match Self :: TransactionManager :: transaction_manager_status_mut ( self ) {
249
- TransactionManagerStatus :: Valid ( valid_status) => {
250
- assert_eq ! ( None , valid_status. transaction_depth( ) )
251
- }
252
- TransactionManagerStatus :: InError => panic ! ( "Transaction manager in error" ) ,
253
- } ;
254
- Self :: TransactionManager :: begin_transaction ( self ) . await ?;
255
- // set the test transaction flag
256
- // to prevent that this connection gets dropped in connection pools
257
- // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
258
- // to prevent modifications to the schema
259
- Self :: TransactionManager :: transaction_manager_status_mut ( self ) . set_test_transaction_flag ( ) ;
260
- Ok ( ( ) )
253
+ async {
254
+ match Self :: TransactionManager :: transaction_manager_status_mut ( self ) {
255
+ TransactionManagerStatus :: Valid ( valid_status) => {
256
+ assert_eq ! ( None , valid_status. transaction_depth( ) )
257
+ }
258
+ TransactionManagerStatus :: InError => panic ! ( "Transaction manager in error" ) ,
259
+ } ;
260
+ Self :: TransactionManager :: begin_transaction ( self ) . await ?;
261
+ // set the test transaction flag
262
+ // to prevent that this connection gets dropped in connection pools
263
+ // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
264
+ // to prevent modifications to the schema
265
+ Self :: TransactionManager :: transaction_manager_status_mut ( self )
266
+ . set_test_transaction_flag ( ) ;
267
+ Ok ( ( ) )
268
+ }
261
269
}
262
270
263
271
/// Executes the given function inside a transaction, but does not commit
@@ -297,27 +305,33 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
297
305
/// # Ok(())
298
306
/// # }
299
307
/// ```
300
- async fn test_transaction < ' a , R , E , F > ( & ' a mut self , f : F ) -> R
308
+ fn test_transaction < ' conn , ' a , R , E , F > (
309
+ & ' conn mut self ,
310
+ f : F ,
311
+ ) -> impl Future < Output = R > + Send + ' conn
301
312
where
302
313
F : for < ' r > FnOnce ( & ' r mut Self ) -> ScopedBoxFuture < ' a , ' r , Result < R , E > > + Send + ' a ,
303
314
E : Debug + Send + ' a ,
304
315
R : Send + ' a ,
305
- Self : ' a ,
316
+ ' a : ' conn ,
306
317
{
307
318
use futures_util:: TryFutureExt ;
308
-
309
- let mut user_result = None ;
310
- let _ = self
311
- . transaction :: < R , _ , _ > ( |c| {
312
- f ( c) . map_err ( |_| Error :: RollbackTransaction )
313
- . and_then ( |r| {
314
- user_result = Some ( r) ;
315
- futures_util:: future:: ready ( Err ( Error :: RollbackTransaction ) )
316
- } )
317
- . scope_boxed ( )
318
- } )
319
- . await ;
320
- user_result. expect ( "Transaction did not succeed" )
319
+ let ( user_result_tx, user_result_rx) = std:: sync:: mpsc:: channel ( ) ;
320
+ self . transaction :: < R , _ , _ > ( move |conn| {
321
+ f ( conn)
322
+ . map_err ( |_| diesel:: result:: Error :: RollbackTransaction )
323
+ . and_then ( move |r| {
324
+ let _ = user_result_tx. send ( r) ;
325
+ futures_util:: future:: ready ( Err ( diesel:: result:: Error :: RollbackTransaction ) )
326
+ } )
327
+ . scope_boxed ( )
328
+ } )
329
+ . then ( move |_r| {
330
+ let r = user_result_rx
331
+ . try_recv ( )
332
+ . expect ( "Transaction did not succeed" ) ;
333
+ futures_util:: future:: ready ( r)
334
+ } )
321
335
}
322
336
323
337
#[ doc( hidden) ]
0 commit comments